diff options
author | Suren A. Chilingaryan <csa@suren.me> | 2020-09-03 03:00:30 +0200 |
---|---|---|
committer | Suren A. Chilingaryan <csa@suren.me> | 2020-09-03 03:00:30 +0200 |
commit | 5172421d248250b4ab3b69eb57fd83656e23a4da (patch) | |
tree | a499d9f1dd0b74b754816884a59927b3171656fc /src/ufo-roof-buffer.c | |
parent | 7b2e6168b049be9e7852b2d364d897592eff69fc (diff) | |
download | ufo-roof-temp-master.tar.gz ufo-roof-temp-master.tar.bz2 ufo-roof-temp-master.tar.xz ufo-roof-temp-master.zip |
Diffstat (limited to 'src/ufo-roof-buffer.c')
-rw-r--r-- | src/ufo-roof-buffer.c | 209 |
1 files changed, 0 insertions, 209 deletions
diff --git a/src/ufo-roof-buffer.c b/src/ufo-roof-buffer.c deleted file mode 100644 index 0e0a890..0000000 --- a/src/ufo-roof-buffer.c +++ /dev/null @@ -1,209 +0,0 @@ -#include <stdio.h> -#include <stdint.h> -#include <time.h> - -#include "glib.h" - -#include "ufo-roof.h" -#include "ufo-roof-buffer.h" - -// This is currently not thread safe. With dual-filter architecture this will be called sequentially. - -UfoRoofBuffer *ufo_roof_buffer_new(UfoRoofConfig *cfg, guint n_dims, guint max_datasets, GError **error) { - if ((n_dims < 1)||(n_dims > 2)) - roof_new_error(error, "Unsupported number of dimmensions %u (only plain and 2D ROOF structure is currently supported)", n_dims); - - UfoRoofBuffer *buffer = (UfoRoofBuffer*)calloc(1, sizeof(UfoRoofBuffer)); - if (!buffer) roof_new_error(error, "Can't allocate UfoRoofBuffer"); - - buffer->max_datasets = max_datasets; - buffer->ring_size = cfg->buffer_size; - buffer->drop_buffers = cfg->drop_buffers; - buffer->latency_buffers = cfg->latency_buffers; - buffer->n_dims = n_dims; - buffer->dataset_size = cfg->dataset_size; - buffer->dataset_dims[0] = cfg->fan_bins * cfg->bit_depth / 8; - buffer->dataset_dims[1] = cfg->fan_projections; - buffer->fragment_size = cfg->payload_size; - buffer->fragment_dims[0] = cfg->channels_per_module * cfg->bit_depth / 8; - buffer->fragment_dims[1] = buffer->fragment_size / buffer->fragment_dims[0]; - - buffer->fragments_per_dataset = buffer->dataset_size / buffer->fragment_size; - buffer->fragments_per_stream = buffer->fragments_per_dataset / cfg->n_streams; -// printf("Configuration: dataset: %u - %u fragments (%u streams x %u) x %u bytes\n", buffer->dataset_size, buffer->fragments_per_dataset, cfg->n_streams, buffer->fragments_per_stream, buffer->fragment_size); - - buffer->ring_buffer = malloc(buffer->ring_size * buffer->dataset_size); - buffer->n_fragments = (_Atomic guint*)calloc(buffer->ring_size, sizeof(_Atomic int)); - buffer->stream_fragment = (guint*)calloc(cfg->n_streams, sizeof(guint)); - -#ifdef UFO_ROOF_INDEPENDENT_STREAMS - buffer->first_id = malloc(buffer->ring_size * sizeof(guint64)); - if (!buffer->first_id) roof_new_error(error, "Can't allocate first_id buffer for ROOF datasets"); - for (guint i = 0; i < buffer->ring_size; i++) - buffer->first_id[i] = (guint64)-1; -#else - buffer->first_id = (guint64)-1; -#endif - - if ((!buffer->ring_buffer)||(!buffer->n_fragments)||(!buffer->stream_fragment)) { - ufo_roof_buffer_free(buffer); - roof_new_error(error, "Can't allocate ring buffer for ROOF datasets, total size %u", buffer->ring_size * buffer->dataset_size); - } - - return buffer; -} - -void ufo_roof_buffer_free(UfoRoofBuffer *buffer) { - if (buffer) { -#ifdef UFO_ROOF_INDEPENDENT_STREAMS - if (buffer->first_id) - free(buffer->first_id); -#endif - if (buffer->ring_buffer) - free(buffer->ring_buffer); - if (buffer->n_fragments) - free(buffer->n_fragments); - if (buffer->stream_fragment) - free(buffer->stream_fragment); - - free(buffer); - } -} - - // fragment_id is numbered from 1 (0 - means auto) -gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, guint64 fragment_id, gconstpointer fragment, GError **error) { - gboolean ready = FALSE; - guint buffer_id; - guint64 first_id; - guint64 dataset_id; - - if (!fragment_id) { - fragment_id = ++buffer->stream_fragment[stream_id]; - } - - // If we have packets of arbitrary size, we would need dataset_id transferred along with packet_id (otherwise complex guessing is required) - dataset_id = (fragment_id - 1) / buffer->fragments_per_stream; - fragment_id = (fragment_id - 1) % buffer->fragments_per_stream; - -#ifdef UFO_ROOF_INDEPENDENT_STREAMS - if (buffer->first_id[stream_id] == (guint64)-1) - buffer->first_id[stream_id] = dataset_id; - first_id = buffer->first_id[stream_id]; -#else - if (buffer->first_id == (guint64)-1) - buffer->first_id = dataset_id; - first_id = buffer->first_id; -#endif - if (dataset_id < first_id) - return FALSE; - - dataset_id -= first_id; - buffer_id = dataset_id % buffer->ring_size; - - // FIXME: Currently, this produces too much output. Introduce some kind of debugging mode? - if (dataset_id < buffer->current_id) { - roof_network_error_with_retval(error, FALSE, "Late arrived packet for dataset %li, currently processing %li", dataset_id, buffer->current_id); - return FALSE; - } - - if ((buffer->max_datasets)&&(dataset_id >= buffer->max_datasets)) { -// printf("Stream %i: dataset %li < %li, first_id: %li\n", stream_id, dataset_id, buffer->max_datasets, first_id); - return FALSE; - } - // We are not fast enough, new packets are arrvining to fast - if (dataset_id >= (buffer->current_id + buffer->ring_size)) { - // FIXME: Broken packets sanity checks? Allocate additional buffers on demand? - - root_set_network_error(error, "Ring buffer exhausted. Get packet for dataset %li. Dropping datasets from %li to %li, dataset %li has %i parts of %i completed", - dataset_id, buffer->current_id, dataset_id - (buffer->ring_size - buffer->drop_buffers), buffer->current_id, buffer->n_fragments[buffer_id], buffer->fragments_per_dataset); - - // FIXME: Send semi-complete buffers further? - // FIXME: Or shall we drop more if larger buffers are allocated? - if ((dataset_id - buffer->current_id) > 2 * buffer->ring_size) { - memset(buffer->n_fragments, 0, buffer->ring_size * sizeof(_Atomic guint)); - buffer->current_id = dataset_id; - } else { - for (guint i = buffer->current_id; i <= (dataset_id - (buffer->ring_size - buffer->drop_buffers)); i++) - buffer->n_fragments[i%buffer->ring_size] = 0; - buffer->current_id = dataset_id - (buffer->ring_size - buffer->drop_buffers) + 1; - } - - if (buffer->n_fragments[buffer->current_id%buffer->ring_size] == buffer->fragments_per_dataset) - ready = TRUE; - - // FIXME: In mult-threaded case, we need to ensure that all threads are stopped writting here (and generator is not reading) before we can reassign buffer to the new dataset. - // To avoid locking, we can store per-thread 'current_id' and only proceed to writting when all per-threads current_ids are equal or above the global value - // The updates may happen after writting/reading is finished. - } - -/* - printf("dataset: %lu (%u) is %u of %u complete, new packet: %lu (%ux%u %lu), packet_size: %u [%x]\n", - dataset_id, buffer_id, - buffer->n_fragments[buffer_id] + 1, buffer->fragments_per_dataset, - stream_id * buffer->fragments_per_stream + fragment_id, stream_id, buffer->fragments_per_stream, fragment_id, - buffer->fragment_size, ((uint32_t*)fragment)[0] - ); -*/ - - uint8_t *dataset_buffer = buffer->ring_buffer + buffer_id * buffer->dataset_size; - if (buffer->n_dims == 2) { - uint8_t *fragment_buffer = dataset_buffer + - stream_id * buffer->fragment_dims[0] + // x-coordinate - (fragment_id * buffer->fragment_dims[1]) * buffer->dataset_dims[0]; // y-coordinate - - for (guint i = 0; i < buffer->fragment_dims[1]; ++i) { - memcpy(fragment_buffer + i * buffer->dataset_dims[0], (uint8_t*)fragment + i * buffer->fragment_dims[0], buffer->fragment_dims[0]); - } - } else { - // 1D stracture, simply putting fragment at the appropriate position in the stream - uint8_t *fragment_buffer = dataset_buffer + (stream_id * buffer->fragments_per_stream + fragment_id) * buffer->fragment_size; - memcpy(fragment_buffer, fragment, buffer->fragment_size); - } - - // FIXME: Sanity checks: verify is not a dublicate fragment? - atomic_fetch_add(&buffer->n_fragments[buffer_id], 1); - - if (buffer->n_fragments[buffer_id] == buffer->fragments_per_dataset) { - // FIXME: what about a complete dataset blocked by earlier one with only a few framents missing? - if (dataset_id == buffer->current_id) - ready = TRUE; - else if ((buffer->latency_buffers)&&(dataset_id >= (buffer->current_id + buffer->latency_buffers))) - ready = ufo_roof_buffer_skip_to_ready(buffer); - } - - return ready; -} - -gboolean ufo_roof_buffer_skip_to_ready(UfoRoofBuffer *buffer) { - for (guint i = 0; i < buffer->ring_size; i++) { - guint64 id = buffer->current_id + i; - guint buffer_id = id % buffer->ring_size; - - if (buffer->n_fragments[buffer_id] == buffer->fragments_per_dataset) { - buffer->current_id = id; - return TRUE; - } - -// printf("Skipping event %lu (%u), only %u of %u fragments are ready\n", id, buffer_id, buffer->n_fragments[buffer_id], buffer->fragments_per_dataset); - buffer->n_fragments[buffer_id] = 0; - } - - return FALSE; -} - - -gboolean ufo_roof_buffer_get_dataset(UfoRoofBuffer *buffer, gpointer output_buffer, gulong *seqid, GError **error) { - guint buffer_id = buffer->current_id % buffer->ring_size; - void *dataset_buffer = buffer->ring_buffer + buffer_id * buffer->dataset_size; - - // FIXME: what about a complete dataset blocked by earlier one with only a few framents missing? - if (buffer->n_fragments[buffer_id] < buffer->fragments_per_dataset) return FALSE; - - memcpy(output_buffer, dataset_buffer, buffer->dataset_size); - if (seqid) *seqid = buffer->current_id; - - buffer->n_fragments[buffer_id] = 0; - buffer->current_id += 1; - - return TRUE; -} |