diff options
Diffstat (limited to 'src/roof-thread.c')
-rw-r--r-- | src/roof-thread.c | 95 |
1 files changed, 95 insertions, 0 deletions
diff --git a/src/roof-thread.c b/src/roof-thread.c new file mode 100644 index 0000000..570300f --- /dev/null +++ b/src/roof-thread.c @@ -0,0 +1,95 @@ +#include <stdio.h> + +#include "roof.h" +#include "roof-thread.h" + + +RoofThreadContext *roof_thread_context_new(RoofConfig *cfg, Roof *roof, guint from, guint to, GError **error) { + GError *gerr = NULL; + + RoofThreadContext *rdt = (RoofThreadContext*)calloc(1, sizeof(RoofThreadContext)); + if (!rdt) roof_new_error(error, "Can't allocate RoofThreadContext"); + + rdt->cfg = cfg; + rdt->rdi = rdi; + + return rdt; + +} + +void roof_thread_context_free(RoofThreadContext *rdt) { + if (rdt) { + free(rdt); + } +} + + +int roof_thread_read_socket(Roof * + +int roof_thread_read(HWThread thr, void *hwctx, int id, void *data) { + Roof *ctx = (Roof*)data; + + RoofConfig *cfg = ctx->cfg; + RoofThreadContext *rdt = ctx->rdt[id]; + + guint dataset_dims[2] = { cfg->fan_bins * cfg->bit_depth / 8, cfg->fan_projections }; + guint fragment_dims[2] = { cfg->channels_per_module * cfg->bit_depth / 8, ????cfg->payload_size / buffer->fragment_dims[0] }; + + packet_id + fragment_id = + + for (guint stream_id = from; stream_id < to; stream_id++) { + + uint8_t *rdbuf; + guint packets = priv->rdi[stream_id]->read(priv->rdi[stream_id], &rdbuf, &gerr); + if (gerr) roof_print_error(gerr); + + for (guint i = 0; i < packets; i++) { + guint64 packet_id = 0; + + // Otherwise considered consecutive and handled by the buffer + if (cfg->header_size >= sizeof(RoofPacketHeader)) { + RoofPacketHeader *pheader = ROOF_PACKET_HEADER(fragment); + packet_id = be64toh(pheader->packet_id) + 1; + } else { + // FIXME: consider consecutive + //fragment_id = ++buffer->stream_fragment[stream_id]; + } + + // FIXME: packet may contain fragments for multiple datasets + fragment_id = packet_id * fragments_per_packet; + guint64 dataset_id = (packet_id - 1) / cfg->fragments_per_stream; + guint64 fragment_id = (packet_id - 1) % cfg->fragments_per_stream; + + // FIXME: verify that packet is consecutive + // if + + // Drop packets of already skipped datasets + if (dataset_id < priv->current_id) + continue; + + // FIXME: stop processing and return.... + if (dataset_id < priv->current_id) + + uint8_t *fragment_buffer = dataset_buffer + + stream_id * fragment_dims[0] + // x-coordinate + (fragment_id * fragment_dims[1]) * dataset_dims[0]; // y-coordinate + + for (guint i = 0; i < buffer->fragment_dims[1]; ++i) { + memcpy(fragment_buffer + i * buffer->dataset_dims[0], (uint8_t*)fragment + i * buffer->fragment_dims[0], buffer->fragment_dims[0]); + + +// buffer->last_id[stream_id] = dataset_id; + + ready |= roof_buffer_set_fragment(buf, sid, packet_id, fragment, &gerr); + if (gerr) roof_print_error(gerr); + + fragment += cfg->max_packet_size; + + + } + + } + +} + |