summaryrefslogtreecommitdiffstats
path: root/src/roof-buffer.c
diff options
context:
space:
mode:
authorSuren A. Chilingaryan <csa@suren.me>2020-09-03 03:00:30 +0200
committerSuren A. Chilingaryan <csa@suren.me>2020-09-03 03:00:30 +0200
commit5172421d248250b4ab3b69eb57fd83656e23a4da (patch)
treea499d9f1dd0b74b754816884a59927b3171656fc /src/roof-buffer.c
parent7b2e6168b049be9e7852b2d364d897592eff69fc (diff)
downloadufo-roof-temp-5172421d248250b4ab3b69eb57fd83656e23a4da.tar.gz
ufo-roof-temp-5172421d248250b4ab3b69eb57fd83656e23a4da.tar.bz2
ufo-roof-temp-5172421d248250b4ab3b69eb57fd83656e23a4da.tar.xz
ufo-roof-temp-5172421d248250b4ab3b69eb57fd83656e23a4da.zip
This is unfinished work implemeting out-of-UFO network serversHEADmaster
Diffstat (limited to 'src/roof-buffer.c')
-rw-r--r--src/roof-buffer.c252
1 files changed, 252 insertions, 0 deletions
diff --git a/src/roof-buffer.c b/src/roof-buffer.c
new file mode 100644
index 0000000..4ca0386
--- /dev/null
+++ b/src/roof-buffer.c
@@ -0,0 +1,252 @@
+#include <stdio.h>
+#include <stdint.h>
+#include <time.h>
+
+#include "glib.h"
+
+#include "ufo-roof.h"
+#include "ufo-roof-buffer.h"
+
+// This is currently not thread safe. With dual-filter architecture this will be called sequentially.
+
+RoofBuffer *roof_buffer_new(RoofConfig *cfg, guint n_dims, guint max_datasets, GError **error) {
+ if ((n_dims < 1)||(n_dims > 2))
+ roof_new_error(error, "Unsupported number of dimmensions %u (only plain and 2D ROOF structure is currently supported)", n_dims);
+
+ RoofBuffer *buffer = (RoofBuffer*)calloc(1, sizeof(RoofBuffer));
+ if (!buffer) roof_new_error(error, "Can't allocate RoofBuffer");
+
+ buffer->max_datasets = max_datasets;
+ buffer->n_streams = cfg->n_streams;
+ buffer->ring_size = cfg->buffer_size;
+ buffer->drop_buffers = cfg->drop_buffers;
+ buffer->latency_buffers = cfg->latency_buffers;
+ buffer->n_dims = n_dims;
+ buffer->dataset_size = cfg->dataset_size;
+ buffer->dataset_dims[0] = cfg->fan_bins * cfg->bit_depth / 8;
+ buffer->dataset_dims[1] = cfg->fan_projections;
+ buffer->fragment_size = cfg->payload_size;
+ buffer->fragment_dims[0] = cfg->channels_per_module * cfg->bit_depth / 8;
+ buffer->fragment_dims[1] = buffer->fragment_size / buffer->fragment_dims[0];
+
+ buffer->fragments_per_dataset = buffer->dataset_size / buffer->fragment_size;
+ buffer->fragments_per_stream = buffer->fragments_per_dataset / cfg->n_streams;
+// printf("Configuration: dataset: %u - %u fragments (%u streams x %u) x %u bytes\n", buffer->dataset_size, buffer->fragments_per_dataset, cfg->n_streams, buffer->fragments_per_stream, buffer->fragment_size);
+
+ buffer->ring_buffer = malloc(buffer->ring_size * buffer->dataset_size);
+ buffer->n_fragments = (_Atomic guint*)calloc(buffer->ring_size, sizeof(_Atomic int));
+ buffer->stream_fragment = (guint*)calloc(cfg->n_streams, sizeof(guint));
+
+#ifdef ROOF_INDEPENDENT_STREAMS
+ buffer->first_id = malloc(buffer->n_streams * sizeof(guint64));
+ if (!buffer->first_id) roof_new_error(error, "Can't allocate first_id buffer for ROOF datasets");
+ for (guint i = 0; i < buffer->n_streams; i++)
+ buffer->first_id[i] = (guint64)-1;
+#else
+ buffer->first_id = (guint64)-1;
+#endif
+ buffer->last_id = malloc(buffer->n_streams * sizeof(guint64));
+
+ if ((!buffer->ring_buffer)||(!buffer->n_fragments)||(!buffer->stream_fragment)) {
+ roof_buffer_free(buffer);
+ roof_new_error(error, "Can't allocate ring buffer for ROOF datasets, total size %u", buffer->ring_size * buffer->dataset_size);
+ }
+
+ return buffer;
+}
+
+void roof_buffer_free(RoofBuffer *buffer) {
+ if (buffer) {
+#ifdef ROOF_INDEPENDENT_STREAMS
+ if (buffer->first_id)
+ free(buffer->first_id);
+#endif
+ if (buffer->ring_buffer)
+ free(buffer->ring_buffer);
+ if (buffer->n_fragments)
+ free(buffer->n_fragments);
+ if (buffer->stream_fragment)
+ free(buffer->stream_fragment);
+
+ free(buffer);
+ }
+}
+
+ // fragment_id is numbered from 1 (0 - means auto)
+gboolean roof_buffer_set_fragment(RoofBuffer *buffer, guint stream_id, guint64 fragment_id, gconstpointer fragment, GError **error) {
+ gboolean ready = FALSE;
+ guint buffer_id;
+ guint64 first_id;
+ guint64 dataset_id;
+
+ if (!fragment_id) {
+ fragment_id = ++buffer->stream_fragment[stream_id];
+ }
+
+ // If we have packets of arbitrary size, we would need dataset_id transferred along with packet_id (otherwise complex guessing is required)
+ dataset_id = (fragment_id - 1) / buffer->fragments_per_stream;
+ fragment_id = (fragment_id - 1) % buffer->fragments_per_stream;
+
+#ifdef ROOF_INDEPENDENT_STREAMS
+ if (buffer->first_id[stream_id] == (guint64)-1)
+ buffer->first_id[stream_id] = dataset_id;
+ first_id = buffer->first_id[stream_id];
+#else
+ if (buffer->first_id == (guint64)-1)
+ buffer->first_id = dataset_id;
+ first_id = buffer->first_id;
+#endif
+ if (dataset_id < first_id)
+ return FALSE;
+
+ dataset_id -= first_id;
+ buffer_id = dataset_id % buffer->ring_size;
+
+ // FIXME: Currently, this produces too much output. Introduce some kind of debugging mode?
+ if (dataset_id < buffer->current_id) {
+// roof_network_error_with_retval(error, FALSE, "Late arrived packet for dataset %li, currently processing %li", dataset_id, buffer->current_id);
+ return FALSE;
+ }
+
+ if ((buffer->max_datasets)&&(dataset_id >= buffer->max_datasets)) {
+// printf("Stream %i: dataset %li < %li, first_id: %li\n", stream_id, dataset_id, buffer->max_datasets, first_id);
+ return FALSE;
+ }
+
+ buffer->last_id[stream_id] = dataset_id;
+
+ // We are not fast enough, new packets are arrvining to fast
+ if (dataset_id >= (buffer->current_id + buffer->ring_size)) {
+ // FIXME: Broken packets sanity checks? Allocate additional buffers on demand?
+
+ guint64 min_id = (guint64)-1;
+ guint64 max_id = 0;
+ for (guint i = 0; i < buffer->n_streams; i++) {
+ if (min_id > buffer->last_id[stream_id]) min_id = buffer->last_id[stream_id];
+ if (max_id < buffer->last_id[stream_id]) max_id = buffer->last_id[stream_id];
+ }
+ printf("Desync: %lu (Min: %lu, Max: %lu), stream: %u (of %u), dataset: %lu\n", max_id - min_id, min_id, max_id, stream_id, buffer->n_streams, dataset_id);
+ printf("Parts: %i %i %i %i\n", buffer->n_fragments[buffer_id], buffer->n_fragments[(buffer_id + 1)%buffer->ring_size], buffer->n_fragments[(buffer_id + 2)%buffer->ring_size], buffer->n_fragments[(buffer_id + 3)%buffer->ring_size]);
+
+ root_set_network_error(error, "Ring buffer exhausted. Get packet for dataset %li. Dropping datasets from %li to %li, dataset %li has %i parts of %i completed, already reported %lu",
+ dataset_id, buffer->current_id, dataset_id - (buffer->ring_size - buffer->drop_buffers), buffer->current_id, buffer->n_fragments[buffer_id], buffer->fragments_per_dataset, buffer->n_ready);
+
+ // FIXME: Send semi-complete buffers further?
+ // FIXME: Or shall we drop more if larger buffers are allocated?
+ if ((dataset_id - buffer->current_id) > 2 * buffer->ring_size) {
+ memset(buffer->n_fragments, 0, buffer->ring_size * sizeof(_Atomic guint));
+ // FIXME: Can finally received old buffer hit the counter here? Locking?
+ buffer->current_id = dataset_id;
+ } else {
+ for (guint i = buffer->current_id; i <= (dataset_id - (buffer->ring_size - buffer->drop_buffers)); i++)
+ buffer->n_fragments[i%buffer->ring_size] = 0;
+ // FIXME: Can finally received old buffers hit the counters here? Locking?
+ buffer->current_id = dataset_id - (buffer->ring_size - buffer->drop_buffers) + 1;
+ }
+
+ // FIXME: Can other threads obsolete the buffer meanwhile? This is not single threaded any more
+ if (buffer->n_fragments[buffer->current_id%buffer->ring_size] == buffer->fragments_per_dataset)
+ ready = TRUE;
+
+ // FIXME: In mult-threaded case, we need to ensure that all threads are stopped writting here (and generator is not reading) before we can reassign buffer to the new dataset.
+ // To avoid locking, we can store per-thread 'current_id' and only proceed to writting when all per-threads current_ids are equal or above the global value
+ // The updates may happen after writting/reading is finished.
+ }
+
+/*
+ printf("dataset: %lu (%u) is %u of %u complete, new packet: %lu (%ux%u %lu), packet_size: %u [%x]\n",
+ dataset_id, buffer_id,
+ buffer->n_fragments[buffer_id] + 1, buffer->fragments_per_dataset,
+ stream_id * buffer->fragments_per_stream + fragment_id, stream_id, buffer->fragments_per_stream, fragment_id,
+ buffer->fragment_size, ((uint32_t*)fragment)[0]
+ );
+*/
+
+ // FIXME: What if buffer obsolete meanwhile by other threads. We are not single threaded
+ uint8_t *dataset_buffer = buffer->ring_buffer + buffer_id * buffer->dataset_size;
+ if (buffer->n_dims == 2) {
+ uint8_t *fragment_buffer = dataset_buffer +
+ stream_id * buffer->fragment_dims[0] + // x-coordinate
+ (fragment_id * buffer->fragment_dims[1]) * buffer->dataset_dims[0]; // y-coordinate
+
+ for (guint i = 0; i < buffer->fragment_dims[1]; ++i) {
+ memcpy(fragment_buffer + i * buffer->dataset_dims[0], (uint8_t*)fragment + i * buffer->fragment_dims[0], buffer->fragment_dims[0]);
+ }
+ } else {
+ // 1D stracture, simply putting fragment at the appropriate position in the stream
+ uint8_t *fragment_buffer = dataset_buffer + (stream_id * buffer->fragments_per_stream + fragment_id) * buffer->fragment_size;
+ memcpy(fragment_buffer, fragment, buffer->fragment_size);
+ }
+
+ // FIXME: Sanity checks: verify is not a dublicate fragment?
+ // This stuff is anyway is single-threaded which is a problem.
+ //atomic_fetch_add(&buffer->n_fragments[buffer_id], 1);
+ buffer->n_fragments[buffer_id]++;
+
+ if (buffer->n_fragments[buffer_id] == buffer->fragments_per_dataset) {
+ // FIXME: what about a complete dataset blocked by earlier one with only a few framents missing?
+ if (dataset_id == buffer->current_id)
+ ready = TRUE;
+ else if ((buffer->latency_buffers)&&(dataset_id >= (buffer->current_id + buffer->latency_buffers))) {
+ ready = roof_buffer_skip_to_ready(buffer);
+ }
+
+ if (ready) buffer->n_ready++;
+ }
+
+ return ready;
+}
+
+gboolean roof_buffer_skip_to_ready(RoofBuffer *buffer) {
+ for (guint i = 0; i < buffer->ring_size; i++) {
+ guint64 id = buffer->current_id + i;
+ guint buffer_id = id % buffer->ring_size;
+
+ if (buffer->n_fragments[buffer_id] == buffer->fragments_per_dataset) {
+ buffer->current_id = id;
+ return TRUE;
+ }
+
+ printf("Skipping dataset %lu (%u), only %u of %u fragments are ready\n", id, buffer_id, buffer->n_fragments[buffer_id], buffer->fragments_per_dataset);
+ buffer->n_fragments[buffer_id] = 0;
+ }
+
+ return FALSE;
+}
+
+
+gboolean roof_buffer_get_dataset(RoofBuffer *buffer, gpointer output_buffer, gulong *seqid, GError **error) {
+ guint buffer_id = buffer->current_id % buffer->ring_size;
+ void *dataset_buffer = buffer->ring_buffer + buffer_id * buffer->dataset_size;
+
+ // FIXME: what about a complete dataset blocked by earlier one with only a few framents missing?
+ if (buffer->n_fragments[buffer_id] < buffer->fragments_per_dataset) return FALSE;
+
+ memcpy(output_buffer, dataset_buffer, buffer->dataset_size);
+ if (seqid) *seqid = buffer->current_id;
+
+ buffer->n_fragments[buffer_id] = 0;
+ buffer->current_id += 1;
+
+ return TRUE;
+}
+
+gboolean roof_buffer_wait_dataset(RoofBuffer *buffer, gpointer output_buffer, gulong *seqid, gulong timeout, GError **error) {
+ struct timespec start_time, current_time;
+ clock_gettime(CLOCK_REALTIME, &start_time);
+
+ guint buffer_id = buffer->current_id % buffer->ring_size;
+
+ // FIXME: what about a complete dataset blocked by earlier one with only a few framents missing?
+ while (buffer->n_fragments[buffer_id] < buffer->fragments_per_dataset) {
+ // FIXME: get some sleep? How we can interrupt it?
+ // just small nanosleep?
+ if (timeout) {
+ clock_gettime(CLOCK_REALTIME, &current_time);
+ gulong wait = (current_time.tv_sec - start_time.tv_sec) * 1000000 + (current_time.tv_nsec - start_time.tv_nsec) / 1000;
+ if (wait > timeout) return FALSE;
+ }
+ }
+
+ return roof_buffer_get_dataset(buffer, output_buffer, seqid, error);
+}