summaryrefslogtreecommitdiffstats
path: root/src/roof-thread.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/roof-thread.c')
-rw-r--r--src/roof-thread.c95
1 files changed, 95 insertions, 0 deletions
diff --git a/src/roof-thread.c b/src/roof-thread.c
new file mode 100644
index 0000000..570300f
--- /dev/null
+++ b/src/roof-thread.c
@@ -0,0 +1,95 @@
+#include <stdio.h>
+
+#include "roof.h"
+#include "roof-thread.h"
+
+
+RoofThreadContext *roof_thread_context_new(RoofConfig *cfg, Roof *roof, guint from, guint to, GError **error) {
+ GError *gerr = NULL;
+
+ RoofThreadContext *rdt = (RoofThreadContext*)calloc(1, sizeof(RoofThreadContext));
+ if (!rdt) roof_new_error(error, "Can't allocate RoofThreadContext");
+
+ rdt->cfg = cfg;
+ rdt->rdi = rdi;
+
+ return rdt;
+
+}
+
+void roof_thread_context_free(RoofThreadContext *rdt) {
+ if (rdt) {
+ free(rdt);
+ }
+}
+
+
+int roof_thread_read_socket(Roof *
+
+int roof_thread_read(HWThread thr, void *hwctx, int id, void *data) {
+ Roof *ctx = (Roof*)data;
+
+ RoofConfig *cfg = ctx->cfg;
+ RoofThreadContext *rdt = ctx->rdt[id];
+
+ guint dataset_dims[2] = { cfg->fan_bins * cfg->bit_depth / 8, cfg->fan_projections };
+ guint fragment_dims[2] = { cfg->channels_per_module * cfg->bit_depth / 8, ????cfg->payload_size / buffer->fragment_dims[0] };
+
+ packet_id
+ fragment_id =
+
+ for (guint stream_id = from; stream_id < to; stream_id++) {
+
+ uint8_t *rdbuf;
+ guint packets = priv->rdi[stream_id]->read(priv->rdi[stream_id], &rdbuf, &gerr);
+ if (gerr) roof_print_error(gerr);
+
+ for (guint i = 0; i < packets; i++) {
+ guint64 packet_id = 0;
+
+ // Otherwise considered consecutive and handled by the buffer
+ if (cfg->header_size >= sizeof(RoofPacketHeader)) {
+ RoofPacketHeader *pheader = ROOF_PACKET_HEADER(fragment);
+ packet_id = be64toh(pheader->packet_id) + 1;
+ } else {
+ // FIXME: consider consecutive
+ //fragment_id = ++buffer->stream_fragment[stream_id];
+ }
+
+ // FIXME: packet may contain fragments for multiple datasets
+ fragment_id = packet_id * fragments_per_packet;
+ guint64 dataset_id = (packet_id - 1) / cfg->fragments_per_stream;
+ guint64 fragment_id = (packet_id - 1) % cfg->fragments_per_stream;
+
+ // FIXME: verify that packet is consecutive
+ // if
+
+ // Drop packets of already skipped datasets
+ if (dataset_id < priv->current_id)
+ continue;
+
+ // FIXME: stop processing and return....
+ if (dataset_id < priv->current_id)
+
+ uint8_t *fragment_buffer = dataset_buffer +
+ stream_id * fragment_dims[0] + // x-coordinate
+ (fragment_id * fragment_dims[1]) * dataset_dims[0]; // y-coordinate
+
+ for (guint i = 0; i < buffer->fragment_dims[1]; ++i) {
+ memcpy(fragment_buffer + i * buffer->dataset_dims[0], (uint8_t*)fragment + i * buffer->fragment_dims[0], buffer->fragment_dims[0]);
+
+
+// buffer->last_id[stream_id] = dataset_id;
+
+ ready |= roof_buffer_set_fragment(buf, sid, packet_id, fragment, &gerr);
+ if (gerr) roof_print_error(gerr);
+
+ fragment += cfg->max_packet_size;
+
+
+ }
+
+ }
+
+}
+