From 7b2e6168b049be9e7852b2d364d897592eff69fc Mon Sep 17 00:00:00 2001 From: "Suren A. Chilingaryan" Date: Sun, 8 Mar 2020 16:23:41 +0100 Subject: Fix 64-bit ids, ROOF ids are big-endian, skip incomplete datasets at the end and if with specified latency, try to reconstruct from desynchronized streams (harmful, debugging only) --- docs/hardware.txt | 2 ++ src/CMakeLists.txt | 2 +- src/ufo-roof-buffer.c | 79 ++++++++++++++++++++++++++++++++++++++-------- src/ufo-roof-buffer.h | 17 ++++++++-- src/ufo-roof-build-task.c | 21 ++++++++---- src/ufo-roof-config.c | 4 +-- src/ufo-roof-config.h | 1 + src/ufo-roof-filter-task.c | 2 +- tests/roof.yaml | 1 + 9 files changed, 102 insertions(+), 27 deletions(-) diff --git a/docs/hardware.txt b/docs/hardware.txt index 50c3a0c..006829d 100644 --- a/docs/hardware.txt +++ b/docs/hardware.txt @@ -1,3 +1,4 @@ + - The packet ids are 64-bit and in big-endian data format - Jumbo frames are not currently supported, max packet size is 1500 bytes. * The maximum number of samples per packet can be computed as n = (1500 - header_size) / sample_size (pixels_per_module * bpp) i.e. 46 = | 1492 / 32 | @@ -9,3 +10,4 @@ Questions ========= - Do we need to compute 'flats' and 'darks' for each plane separately? Or just one set will work for all? + - How do FPGA syncrhonize IDs? Is it reliable. I currently have desync in receiving (but this could be caused by networking stack problems) \ No newline at end of file diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index bbbc5fb..720c18f 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -5,7 +5,7 @@ set(ufofilter_SRCS ufo-roof-read-task.c ufo-roof-build-task.c ufo-roof-filter-task.c - ufo-roof-flat-field-correct.c + ufo-roof-flat-field-correct-task.c ) set(common_SRCS diff --git a/src/ufo-roof-buffer.c b/src/ufo-roof-buffer.c index 32598c9..0e0a890 100644 --- a/src/ufo-roof-buffer.c +++ b/src/ufo-roof-buffer.c @@ -19,6 +19,7 @@ UfoRoofBuffer *ufo_roof_buffer_new(UfoRoofConfig *cfg, guint n_dims, guint max_d buffer->max_datasets = max_datasets; buffer->ring_size = cfg->buffer_size; buffer->drop_buffers = cfg->drop_buffers; + buffer->latency_buffers = cfg->latency_buffers; buffer->n_dims = n_dims; buffer->dataset_size = cfg->dataset_size; buffer->dataset_dims[0] = cfg->fan_bins * cfg->bit_depth / 8; @@ -35,6 +36,15 @@ UfoRoofBuffer *ufo_roof_buffer_new(UfoRoofConfig *cfg, guint n_dims, guint max_d buffer->n_fragments = (_Atomic guint*)calloc(buffer->ring_size, sizeof(_Atomic int)); buffer->stream_fragment = (guint*)calloc(cfg->n_streams, sizeof(guint)); +#ifdef UFO_ROOF_INDEPENDENT_STREAMS + buffer->first_id = malloc(buffer->ring_size * sizeof(guint64)); + if (!buffer->first_id) roof_new_error(error, "Can't allocate first_id buffer for ROOF datasets"); + for (guint i = 0; i < buffer->ring_size; i++) + buffer->first_id[i] = (guint64)-1; +#else + buffer->first_id = (guint64)-1; +#endif + if ((!buffer->ring_buffer)||(!buffer->n_fragments)||(!buffer->stream_fragment)) { ufo_roof_buffer_free(buffer); roof_new_error(error, "Can't allocate ring buffer for ROOF datasets, total size %u", buffer->ring_size * buffer->dataset_size); @@ -45,6 +55,10 @@ UfoRoofBuffer *ufo_roof_buffer_new(UfoRoofConfig *cfg, guint n_dims, guint max_d void ufo_roof_buffer_free(UfoRoofBuffer *buffer) { if (buffer) { +#ifdef UFO_ROOF_INDEPENDENT_STREAMS + if (buffer->first_id) + free(buffer->first_id); +#endif if (buffer->ring_buffer) free(buffer->ring_buffer); if (buffer->n_fragments) @@ -57,10 +71,11 @@ void ufo_roof_buffer_free(UfoRoofBuffer *buffer) { } // fragment_id is numbered from 1 (0 - means auto) -gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, guint fragment_id, gconstpointer fragment, GError **error) { +gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, guint64 fragment_id, gconstpointer fragment, GError **error) { gboolean ready = FALSE; guint buffer_id; - guint dataset_id; + guint64 first_id; + guint64 dataset_id; if (!fragment_id) { fragment_id = ++buffer->stream_fragment[stream_id]; @@ -69,21 +84,37 @@ gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, gu // If we have packets of arbitrary size, we would need dataset_id transferred along with packet_id (otherwise complex guessing is required) dataset_id = (fragment_id - 1) / buffer->fragments_per_stream; fragment_id = (fragment_id - 1) % buffer->fragments_per_stream; + +#ifdef UFO_ROOF_INDEPENDENT_STREAMS + if (buffer->first_id[stream_id] == (guint64)-1) + buffer->first_id[stream_id] = dataset_id; + first_id = buffer->first_id[stream_id]; +#else + if (buffer->first_id == (guint64)-1) + buffer->first_id = dataset_id; + first_id = buffer->first_id; +#endif + if (dataset_id < first_id) + return FALSE; + + dataset_id -= first_id; buffer_id = dataset_id % buffer->ring_size; // FIXME: Currently, this produces too much output. Introduce some kind of debugging mode? - if (dataset_id < buffer->current_id) + if (dataset_id < buffer->current_id) { + roof_network_error_with_retval(error, FALSE, "Late arrived packet for dataset %li, currently processing %li", dataset_id, buffer->current_id); return FALSE; -// roof_network_error_with_retval(error, FALSE, "Late arrived packet for dataset %i, currently processing %i", dataset_id, buffer->current_id); + } - if ((buffer->max_datasets)&&(dataset_id >= buffer->max_datasets)) + if ((buffer->max_datasets)&&(dataset_id >= buffer->max_datasets)) { +// printf("Stream %i: dataset %li < %li, first_id: %li\n", stream_id, dataset_id, buffer->max_datasets, first_id); return FALSE; - + } // We are not fast enough, new packets are arrvining to fast if (dataset_id >= (buffer->current_id + buffer->ring_size)) { // FIXME: Broken packets sanity checks? Allocate additional buffers on demand? - root_set_network_error(error, "Ring buffer exhausted. Get packet for dataset %i. Dropping datasets from %i to %i, dataset %i has %i parts of %i completed", + root_set_network_error(error, "Ring buffer exhausted. Get packet for dataset %li. Dropping datasets from %li to %li, dataset %li has %i parts of %i completed", dataset_id, buffer->current_id, dataset_id - (buffer->ring_size - buffer->drop_buffers), buffer->current_id, buffer->n_fragments[buffer_id], buffer->fragments_per_dataset); // FIXME: Send semi-complete buffers further? @@ -105,10 +136,14 @@ gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, gu // The updates may happen after writting/reading is finished. } -/* printf("buffer: %u (%u), packet: %u (%ux%u %u), packet_size: %u [%x]\n", - buffer_id, dataset_id, stream_id * buffer->fragments_per_stream + fragment_id, stream_id, buffer->fragments_per_stream, fragment_id, buffer->fragment_size, - ((uint32_t*)fragment)[0] - );*/ +/* + printf("dataset: %lu (%u) is %u of %u complete, new packet: %lu (%ux%u %lu), packet_size: %u [%x]\n", + dataset_id, buffer_id, + buffer->n_fragments[buffer_id] + 1, buffer->fragments_per_dataset, + stream_id * buffer->fragments_per_stream + fragment_id, stream_id, buffer->fragments_per_stream, fragment_id, + buffer->fragment_size, ((uint32_t*)fragment)[0] + ); +*/ uint8_t *dataset_buffer = buffer->ring_buffer + buffer_id * buffer->dataset_size; if (buffer->n_dims == 2) { @@ -116,8 +151,8 @@ gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, gu stream_id * buffer->fragment_dims[0] + // x-coordinate (fragment_id * buffer->fragment_dims[1]) * buffer->dataset_dims[0]; // y-coordinate - for (int i = 0; i < buffer->fragment_dims[1]; ++i) { - memcpy(fragment_buffer + i * buffer->dataset_dims[0], fragment + i * buffer->fragment_dims[0], buffer->fragment_dims[0]); + for (guint i = 0; i < buffer->fragment_dims[1]; ++i) { + memcpy(fragment_buffer + i * buffer->dataset_dims[0], (uint8_t*)fragment + i * buffer->fragment_dims[0], buffer->fragment_dims[0]); } } else { // 1D stracture, simply putting fragment at the appropriate position in the stream @@ -132,11 +167,29 @@ gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, gu // FIXME: what about a complete dataset blocked by earlier one with only a few framents missing? if (dataset_id == buffer->current_id) ready = TRUE; + else if ((buffer->latency_buffers)&&(dataset_id >= (buffer->current_id + buffer->latency_buffers))) + ready = ufo_roof_buffer_skip_to_ready(buffer); } return ready; } +gboolean ufo_roof_buffer_skip_to_ready(UfoRoofBuffer *buffer) { + for (guint i = 0; i < buffer->ring_size; i++) { + guint64 id = buffer->current_id + i; + guint buffer_id = id % buffer->ring_size; + + if (buffer->n_fragments[buffer_id] == buffer->fragments_per_dataset) { + buffer->current_id = id; + return TRUE; + } + +// printf("Skipping event %lu (%u), only %u of %u fragments are ready\n", id, buffer_id, buffer->n_fragments[buffer_id], buffer->fragments_per_dataset); + buffer->n_fragments[buffer_id] = 0; + } + + return FALSE; +} gboolean ufo_roof_buffer_get_dataset(UfoRoofBuffer *buffer, gpointer output_buffer, gulong *seqid, GError **error) { diff --git a/src/ufo-roof-buffer.h b/src/ufo-roof-buffer.h index 3d7ad2d..8e9c00b 100644 --- a/src/ufo-roof-buffer.h +++ b/src/ufo-roof-buffer.h @@ -1,20 +1,30 @@ #ifndef __UFO_ROOF_BUFFER_H #define __UFO_ROOF_BUUFER_H + // This IS harmful! Just for testing +//#define UFO_ROOF_INDEPENDENT_STREAMS + #include + struct _UfoRoofBuffer { - guint current_id; // The ID of the first (active) dataset in the buffer + guint64 current_id; // The ID of the first (active) dataset in the buffer +#ifdef UFO_ROOF_INDEPENDENT_STREAMS + guint64 *first_id; // The ID of the first received dataset (used for numbering), -1 means not yet known +#else + guint64 first_id; // The ID of the first received dataset (used for numbering), -1 means not yet known +#endif guint ring_size; // Number of datasets to buffer guint drop_buffers; // If we need to catch up + guint latency_buffers; // we skip incomplete buffers if current_id + latency_buffers is ready uint8_t *ring_buffer; // The ring buffer _Atomic guint *n_fragments; // Number of completed fragments in each buffer guint *stream_fragment; // Currently processed fragment in the stream (for ordered streams) // int *fragments; // Mark individual completed fragments (if we care for partial data) - guint max_datasets; // Only the specified number of datasets will be buffered, the rest will be silently dropped + guint64 max_datasets; // Only the specified number of datasets will be buffered, the rest will be silently dropped guint n_dims; // Indicates if we just assemble one fragment after another or there is 2D/3D data structure (ROOF) guint dataset_size; // Size (in bytes) of a full dataset guint dataset_dims[2]; // x (in bytes), y (in rows) @@ -30,7 +40,8 @@ typedef struct _UfoRoofBuffer UfoRoofBuffer; UfoRoofBuffer *ufo_roof_buffer_new(UfoRoofConfig *cfg, guint n_dims, guint max_datasets, GError **error); void ufo_roof_buffer_free(UfoRoofBuffer *buf); -gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, guint fragment_id, gconstpointer fragment, GError **error); +gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, guint64 fragment_id, gconstpointer fragment, GError **error); +gboolean ufo_roof_buffer_skip_to_ready(UfoRoofBuffer *buffer); gboolean ufo_roof_buffer_get_dataset(UfoRoofBuffer *buffer, gpointer output_buffer, gulong *seqid, GError **error); #endif diff --git a/src/ufo-roof-build-task.c b/src/ufo-roof-build-task.c index 9d55d38..a39ed11 100644 --- a/src/ufo-roof-build-task.c +++ b/src/ufo-roof-build-task.c @@ -18,6 +18,7 @@ */ #include +#include #ifdef __APPLE__ #include @@ -46,7 +47,7 @@ struct _UfoRoofBuildTaskPrivate { gboolean stop; // Stop flag gboolean simulate; // Indicates if we are running in network or simulation modes - guint announced; // For debugging + guint64 announced; // For debugging struct timespec last_fragment_timestamp; }; @@ -210,12 +211,12 @@ ufo_roof_build_task_process (UfoTask *task, const uint8_t *fragment = data; for (guint i = 0; i < header->n_packets; i++) { - guint packet_id = 0; + guint64 packet_id = 0; // Otherwise considered consecutive and handled by the buffer if (cfg->header_size >= sizeof(UfoRoofPacketHeader)) { UfoRoofPacketHeader *pheader = UFO_ROOF_PACKET_HEADER(fragment); - packet_id = pheader->packet_id + 1; + packet_id = be64toh(pheader->packet_id) + 1; } // FIXME: Can we kill here the dataset finished during the previous step of iteration @@ -235,8 +236,14 @@ ufo_roof_build_task_process (UfoTask *task, // No new accepted events within timeout if (((current_time.tv_sec - priv->last_fragment_timestamp.tv_sec) * 1000000 + (current_time.tv_nsec - priv->last_fragment_timestamp.tv_nsec) / 1000) > cfg->network_timeout) { - priv->stop = TRUE; - g_object_notify_by_pspec (G_OBJECT(task), properties[PROP_STOP]); + ready = ufo_roof_buffer_skip_to_ready(buf); + if (ready) { + // FIXME: shall we really reset timer here? + clock_gettime(CLOCK_REALTIME, &priv->last_fragment_timestamp); + } else { + priv->stop = TRUE; + g_object_notify_by_pspec (G_OBJECT(task), properties[PROP_STOP]); + } } } @@ -306,13 +313,13 @@ ufo_roof_build_task_generate (UfoTask *task, // FIXME: Or shall we start from counting from the ID of the first registerd dataset if ((priv->number)&&(buf->current_id >= priv->number)) { -// printf("%u datasets processed, stopping\n", buf->current_id); +// printf("%lu datasets processed, stopping\n", buf->current_id); priv->stop = TRUE; g_object_notify_by_pspec (G_OBJECT(task), properties[PROP_STOP]); } if (((priv->number > 0)&&(priv->number <= 100))||((buf->current_id - priv->announced) > 1000)) { - printf("Generating dataset %i (%s), next: %u out of %u)\n", buf->current_id, ready?"yes":" no", buf->n_fragments[buf->current_id%buf->ring_size], buf->fragments_per_dataset); + printf("Processing dataset %li (%s), next: %u out of %u\n", buf->current_id + (ready?0:1), (ready?"ready ":"timeout "), buf->n_fragments[buf->current_id%buf->ring_size], buf->fragments_per_dataset); priv->announced = buf->current_id; } diff --git a/src/ufo-roof-config.c b/src/ufo-roof-config.c index 17f4b30..944ee31 100644 --- a/src/ufo-roof-config.c +++ b/src/ufo-roof-config.c @@ -94,6 +94,7 @@ UfoRoofConfig *ufo_roof_config_new(const char *config, UfoRoofConfigFlags flags, cfg->max_packets = 100; cfg->dataset_size = 0; cfg->buffer_size = 2; + cfg->latency_buffers = 0; cfg->drop_buffers = 0; @@ -193,6 +194,7 @@ UfoRoofConfig *ufo_roof_config_new(const char *config, UfoRoofConfigFlags flags, roof_config_node_get(cfg->max_packets, performance, int, "packets_at_once"); roof_config_node_get(cfg->buffer_size, performance, int, "buffer_size"); roof_config_node_get(cfg->drop_buffers, performance, int, "drop_buffers"); + roof_config_node_get(cfg->latency_buffers, performance, int, "latency_buffers"); } @@ -237,7 +239,5 @@ UfoRoofConfig *ufo_roof_config_new(const char *config, UfoRoofConfigFlags flags, cfg->drop_buffers = cfg->buffer_size / 2; } - printf("dataset size: %i\n", cfg->dataset_size); - return cfg; } diff --git a/src/ufo-roof-config.h b/src/ufo-roof-config.h index 34bef0b..8718381 100644 --- a/src/ufo-roof-config.h +++ b/src/ufo-roof-config.h @@ -37,6 +37,7 @@ typedef struct { guint max_packet_size; // payload_size + header_size + ... (we don't care if tail is variable length provided that the complete packet does not exceed max_packet_size bytes) guint buffer_size; // How many datasets we can buffer. There is no sense to have more than 2 for odered protocols (default), but having larger number could help for UDP if significant order disturbances are expected guint drop_buffers; // If we are slow and lost some buffers, we may drop more than minimally necessary to catch up. + guint latency_buffers; // We skip incomplete buffers if later (at least latency_buffer in future) dataset is already ready, 0 - never skip guint network_timeout; // Maximum time (us) to wait for data on the socket diff --git a/src/ufo-roof-filter-task.c b/src/ufo-roof-filter-task.c index 01bc742..3cee601 100644 --- a/src/ufo-roof-filter-task.c +++ b/src/ufo-roof-filter-task.c @@ -103,7 +103,7 @@ ufo_roof_filter_task_process (UfoTask *task, priv = UFO_ROOF_FILTER_TASK_GET_PRIVATE (task); if (priv->plane) { - int buf_plane; + guint buf_plane; GValue *value; value = ufo_buffer_get_metadata(inputs[0], "plane"); diff --git a/tests/roof.yaml b/tests/roof.yaml index 0a0ce1d..c9754ec 100644 --- a/tests/roof.yaml +++ b/tests/roof.yaml @@ -26,6 +26,7 @@ network: performance: buffer_size: 10 # drop_buffers: 0 +# latency_buffers: 0 packets_at_once: 100 data: base_path: "/home/csa/roof2_data/test_data" -- cgit v1.2.3