From ea9626b60092f2d2c79431718c3ca8bc383429a6 Mon Sep 17 00:00:00 2001 From: "Suren A. Chilingaryan" Date: Sun, 17 Nov 2019 16:58:02 +0100 Subject: Networking setup --- src/ufo-roof-buffer.c | 36 ++++++++++++++++++++++-------------- 1 file changed, 22 insertions(+), 14 deletions(-) (limited to 'src/ufo-roof-buffer.c') diff --git a/src/ufo-roof-buffer.c b/src/ufo-roof-buffer.c index eaf9b35..f83885e 100644 --- a/src/ufo-roof-buffer.c +++ b/src/ufo-roof-buffer.c @@ -13,6 +13,7 @@ UfoRoofBuffer *ufo_roof_buffer_new(UfoRoofConfig *cfg, GError **error) { if (!buffer) roof_new_error(error, "Can't allocate UfoRoofBuffer"); buffer->ring_size = cfg->buffer_size; + buffer->drop_buffers = cfg->drop_buffers; buffer->fragment_size = cfg->payload_size; buffer->dataset_size = cfg->dataset_size; buffer->fragments_per_dataset = buffer->dataset_size / buffer->fragment_size; @@ -46,6 +47,7 @@ void ufo_roof_buffer_free(UfoRoofBuffer *buffer) { // fragment_id is numbered from 1 (0 - means auto) gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, guint fragment_id, gconstpointer fragment, GError **error) { + gboolean ready = FALSE; guint buffer_id; guint dataset_id; @@ -58,25 +60,32 @@ gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, gu fragment_id = (fragment_id - 1) % buffer->fragments_per_stream; buffer_id = dataset_id % buffer->ring_size; - // Late arrived packed -// printf("data set: %i, channel: %i, fragment: %i (buffer: %i)\n", dataset_id, stream_id, fragment_id, buffer_id); + // FIXME: Currently, this produces too much output. Introduce some kind of debugging mode? if (dataset_id < buffer->current_id) - roof_network_error_with_retval(error, FALSE, "Late arrived packet for dataset %i, currently processing %i", dataset_id, buffer->current_id); + return FALSE; +// roof_network_error_with_retval(error, FALSE, "Late arrived packet for dataset %i, currently processing %i", dataset_id, buffer->current_id); // We are not fast enough, new packets are arrvining to fast if (dataset_id >= (buffer->current_id + buffer->ring_size)) { // FIXME: Broken packets sanity checks? Allocate additional buffers on demand? - if (error) - root_set_network_error(error, "Ring buffer exhausted. Dropping datasets from %i to %i, current dataset has %i parts of %i completed", - buffer->current_id, dataset_id - buffer->ring_size, buffer->n_fragments[buffer_id], buffer->fragments_per_dataset); + root_set_network_error(error, "Ring buffer exhausted. Get packet for dataset %i. Dropping datasets from %i to %i, dataset %i has %i parts of %i completed", + dataset_id, buffer->current_id, dataset_id - (buffer->ring_size - buffer->drop_buffers), buffer->current_id, buffer->n_fragments[buffer_id], buffer->fragments_per_dataset); // FIXME: Send semi-complete buffers further? // FIXME: Or shall we drop more if larger buffers are allocated? - for (guint i = buffer->current_id; i <= (dataset_id - buffer->ring_size); i++) - buffer->n_fragments[i%buffer->ring_size] = 0; - buffer->current_id = dataset_id - buffer->ring_size + 1; - + if ((dataset_id - buffer->current_id) > 2 * buffer->ring_size) { + memset(buffer->n_fragments, 0, buffer->ring_size * sizeof(_Atomic guint)); + buffer->current_id = dataset_id; + } else { + for (guint i = buffer->current_id; i <= (dataset_id - (buffer->ring_size - buffer->drop_buffers)); i++) + buffer->n_fragments[i%buffer->ring_size] = 0; + buffer->current_id = dataset_id - (buffer->ring_size - buffer->drop_buffers) + 1; + } + + if (buffer->n_fragments[buffer->current_id%buffer->ring_size] == buffer->fragments_per_dataset) + ready = TRUE; + // FIXME: In mult-threaded case, we need to ensure that all threads are stopped writting here (and generator is not reading) before we can reassign buffer to the new dataset. // To avoid locking, we can store per-thread 'current_id' and only proceed to writting when all per-threads current_ids are equal or above the global value // The updates may happen after writting/reading is finished. @@ -97,12 +106,11 @@ gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, gu if (buffer->n_fragments[buffer_id] == buffer->fragments_per_dataset) { // FIXME: what about a complete dataset blocked by earlier one with only a few framents missing? - if (dataset_id == buffer->current_id) { - return TRUE; - } + if (dataset_id == buffer->current_id) + ready = TRUE; } - return FALSE; + return ready; } -- cgit v1.2.3