diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/DEVELOPMENT | 22 | ||||
-rw-r--r-- | src/ufo-roof-buffer.c | 36 | ||||
-rw-r--r-- | src/ufo-roof-buffer.h | 1 | ||||
-rw-r--r-- | src/ufo-roof-build-task.c | 28 | ||||
-rw-r--r-- | src/ufo-roof-config.c | 8 | ||||
-rw-r--r-- | src/ufo-roof-config.h | 1 | ||||
-rw-r--r-- | src/ufo-roof-read-socket.c | 1 | ||||
-rw-r--r-- | src/ufo-roof-read-task.c | 1 |
8 files changed, 75 insertions, 23 deletions
diff --git a/src/DEVELOPMENT b/src/DEVELOPMENT index f3381fb..a40e2bb 100644 --- a/src/DEVELOPMENT +++ b/src/DEVELOPMENT @@ -10,6 +10,28 @@ Architecture for missing bits. +Problems +======== + - When streaming at high speed (~ 16 data streams; 600 Mbit & 600 kpck each), the data streams quickly get + desynchronized (but all packets are delivered). + * It is unclear if problem is on the receiver side (no overloaded CPU cores) or de-synchronization is first + appear on the simmulation sender. The test with real hardware is required. + * For border case scenarios, increasing number of buffers from 2 to 10-20 helps. But at full speed, even 1000s + buffers are not enough. Packets counts are quickly going appart. + * Further increase of packet buffer provided to 'recvmmsg' does not help (even if blocking is enforced until + all packets are received) + * At the speed specified above, the system works also without libvma. + * Actually, with libvma a larger buffer is required. In the beginning the performance of libvma is gradually + speeding up (that was always like that). And during this period a significant desynchronization happens. To + compensate it, we need about 400 buffers with libvma as compared to only 10 required if standard Linux + networking is utilized. + - Can we pre-heat to avoid this speeding-up problem? Or it will be also not a problem with hardware? + - Communication breaks with small MTU sizes (bellow 1500), but this is probably not important (Packets are + delivered but with extreme latencies. Probably some tunning of network stack is required). + - Technically, everything should work if we start UFO server when data is already streamed. However, the first + dataset could be any. Therefore, the check fails as the data is shifted by a random number of datasets. + + Questions ========= - Can we pre-allocate several UFO buffers for forth-comming events. Currently, we need to buffer out-of-order diff --git a/src/ufo-roof-buffer.c b/src/ufo-roof-buffer.c index eaf9b35..f83885e 100644 --- a/src/ufo-roof-buffer.c +++ b/src/ufo-roof-buffer.c @@ -13,6 +13,7 @@ UfoRoofBuffer *ufo_roof_buffer_new(UfoRoofConfig *cfg, GError **error) { if (!buffer) roof_new_error(error, "Can't allocate UfoRoofBuffer"); buffer->ring_size = cfg->buffer_size; + buffer->drop_buffers = cfg->drop_buffers; buffer->fragment_size = cfg->payload_size; buffer->dataset_size = cfg->dataset_size; buffer->fragments_per_dataset = buffer->dataset_size / buffer->fragment_size; @@ -46,6 +47,7 @@ void ufo_roof_buffer_free(UfoRoofBuffer *buffer) { // fragment_id is numbered from 1 (0 - means auto) gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, guint fragment_id, gconstpointer fragment, GError **error) { + gboolean ready = FALSE; guint buffer_id; guint dataset_id; @@ -58,25 +60,32 @@ gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, gu fragment_id = (fragment_id - 1) % buffer->fragments_per_stream; buffer_id = dataset_id % buffer->ring_size; - // Late arrived packed -// printf("data set: %i, channel: %i, fragment: %i (buffer: %i)\n", dataset_id, stream_id, fragment_id, buffer_id); + // FIXME: Currently, this produces too much output. Introduce some kind of debugging mode? if (dataset_id < buffer->current_id) - roof_network_error_with_retval(error, FALSE, "Late arrived packet for dataset %i, currently processing %i", dataset_id, buffer->current_id); + return FALSE; +// roof_network_error_with_retval(error, FALSE, "Late arrived packet for dataset %i, currently processing %i", dataset_id, buffer->current_id); // We are not fast enough, new packets are arrvining to fast if (dataset_id >= (buffer->current_id + buffer->ring_size)) { // FIXME: Broken packets sanity checks? Allocate additional buffers on demand? - if (error) - root_set_network_error(error, "Ring buffer exhausted. Dropping datasets from %i to %i, current dataset has %i parts of %i completed", - buffer->current_id, dataset_id - buffer->ring_size, buffer->n_fragments[buffer_id], buffer->fragments_per_dataset); + root_set_network_error(error, "Ring buffer exhausted. Get packet for dataset %i. Dropping datasets from %i to %i, dataset %i has %i parts of %i completed", + dataset_id, buffer->current_id, dataset_id - (buffer->ring_size - buffer->drop_buffers), buffer->current_id, buffer->n_fragments[buffer_id], buffer->fragments_per_dataset); // FIXME: Send semi-complete buffers further? // FIXME: Or shall we drop more if larger buffers are allocated? - for (guint i = buffer->current_id; i <= (dataset_id - buffer->ring_size); i++) - buffer->n_fragments[i%buffer->ring_size] = 0; - buffer->current_id = dataset_id - buffer->ring_size + 1; - + if ((dataset_id - buffer->current_id) > 2 * buffer->ring_size) { + memset(buffer->n_fragments, 0, buffer->ring_size * sizeof(_Atomic guint)); + buffer->current_id = dataset_id; + } else { + for (guint i = buffer->current_id; i <= (dataset_id - (buffer->ring_size - buffer->drop_buffers)); i++) + buffer->n_fragments[i%buffer->ring_size] = 0; + buffer->current_id = dataset_id - (buffer->ring_size - buffer->drop_buffers) + 1; + } + + if (buffer->n_fragments[buffer->current_id%buffer->ring_size] == buffer->fragments_per_dataset) + ready = TRUE; + // FIXME: In mult-threaded case, we need to ensure that all threads are stopped writting here (and generator is not reading) before we can reassign buffer to the new dataset. // To avoid locking, we can store per-thread 'current_id' and only proceed to writting when all per-threads current_ids are equal or above the global value // The updates may happen after writting/reading is finished. @@ -97,12 +106,11 @@ gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, gu if (buffer->n_fragments[buffer_id] == buffer->fragments_per_dataset) { // FIXME: what about a complete dataset blocked by earlier one with only a few framents missing? - if (dataset_id == buffer->current_id) { - return TRUE; - } + if (dataset_id == buffer->current_id) + ready = TRUE; } - return FALSE; + return ready; } diff --git a/src/ufo-roof-buffer.h b/src/ufo-roof-buffer.h index 367f2d5..f7b2124 100644 --- a/src/ufo-roof-buffer.h +++ b/src/ufo-roof-buffer.h @@ -7,6 +7,7 @@ struct _UfoRoofBuffer { guint current_id; // The ID of the first (active) dataset in the buffer guint ring_size; // Number of datasets to buffer + guint drop_buffers; // If we need to catch up uint8_t *ring_buffer; // The ring buffer _Atomic guint *n_fragments; // Number of completed fragments in each buffer guint *stream_fragment; // Currently processed fragment in the stream (for ordered streams) diff --git a/src/ufo-roof-build-task.c b/src/ufo-roof-build-task.c index fa2fdcd..821c761 100644 --- a/src/ufo-roof-build-task.c +++ b/src/ufo-roof-build-task.c @@ -37,6 +37,8 @@ struct _UfoRoofBuildTaskPrivate { guint number; // Number of datasets to read gboolean stop; // Stop flag + + guint announced; // For debugging }; static void ufo_task_interface_init (UfoTaskIface *iface); @@ -162,31 +164,35 @@ ufo_roof_build_task_process (UfoTask *task, // UfoRequisition in_req; // ufo_buffer_get_requisition (inputs[0], &in_req); - uint8_t *data = (uint8_t*)ufo_buffer_get_host_array(inputs[0], NULL); + const uint8_t *data = (uint8_t*)ufo_buffer_get_host_array(inputs[0], NULL); UfoRoofPacketBlockHeader *header = UFO_ROOF_PACKET_BLOCK_HEADER(data, cfg); if (priv->stop) return FALSE; - + + const uint8_t *fragment = data; for (guint i = 0; i < header->n_packets; i++) { guint packet_id = 0; // Otherwise considered consecutive and handled by the buffer if (cfg->header_size >= sizeof(UfoRoofPacketHeader)) { - UfoRoofPacketHeader *pheader = UFO_ROOF_PACKET_HEADER(data); + UfoRoofPacketHeader *pheader = UFO_ROOF_PACKET_HEADER(fragment); packet_id = pheader->packet_id + 1; } - ready |= ufo_roof_buffer_set_fragment(buf, header->channel_id, packet_id, data, &gerr); + // FIXME: Can we kill here the dataset finished during the previous step of iteration + ready |= ufo_roof_buffer_set_fragment(buf, header->channel_id, packet_id, fragment + cfg->header_size, &gerr); if (gerr) roof_print_error(gerr); - data += cfg->max_packet_size; + fragment += cfg->max_packet_size; } // FIXME: if 2nd dataset is ready (2nd and 3rd?), skip the first one? +/* + printf("proc (%s) - channel: %i, packets: %i, first dataset: %i\n", ready?"yes":" no", header->channel_id, header->n_packets, + (cfg->header_size >= sizeof(UfoRoofPacketHeader))?UFO_ROOF_PACKET_HEADER(data)->packet_id / (cfg->dataset_size / cfg->payload_size / cfg->n_streams):0); +*/ -// printf("proc (%s) - channel: %i, packets: %i\n", ready?"yes":" no", header->channel_id, header->n_packets); - return !ready; } @@ -214,8 +220,12 @@ ufo_roof_build_task_generate (UfoTask *task, priv->stop = TRUE; g_object_notify_by_pspec (G_OBJECT(task), properties[PROP_STOP]); } - -// printf("gen(%s) %i\n", ready?"yes":" no", buf->current_id); + + + if ((buf->current_id - priv->announced) > 1000) { + printf("Generating dataset %i (%s)\n", buf->current_id, ready?"yes":" no"); + priv->announced = buf->current_id; + } return ready; } diff --git a/src/ufo-roof-config.c b/src/ufo-roof-config.c index 11f8bd4..812d4a2 100644 --- a/src/ufo-roof-config.c +++ b/src/ufo-roof-config.c @@ -83,6 +83,7 @@ UfoRoofConfig *ufo_roof_config_new(const char *config, GError **error) { cfg->max_packets = 100; cfg->dataset_size = 0; cfg->buffer_size = 2; + cfg->drop_buffers = 0; cfg->path = NULL; // Read configuration @@ -101,6 +102,7 @@ UfoRoofConfig *ufo_roof_config_new(const char *config, GError **error) { roof_config_node_get(hardware, root, object, "hardware"); roof_config_node_get(network, root, object, "network"); roof_config_node_get(simulation, root, object, "simulation"); + roof_config_node_get(performance, root, object, "performance"); } if (hardware) { @@ -123,6 +125,7 @@ UfoRoofConfig *ufo_roof_config_new(const char *config, GError **error) { if (performance) { roof_config_node_get(cfg->max_packets, performance, int, "packets_at_once"); roof_config_node_get(cfg->buffer_size, performance, int, "buffer_size"); + roof_config_node_get(cfg->drop_buffers, performance, int, "drop_buffers"); } if (simulation) { @@ -167,6 +170,11 @@ UfoRoofConfig *ufo_roof_config_new(const char *config, GError **error) { if (!cfg->dataset_size) cfg->dataset_size = cfg->payload_size; + if (cfg->buffer_size < 4) { + cfg->drop_buffers = 0; + } else if (cfg->drop_buffers >= cfg->buffer_size) { + cfg->drop_buffers = cfg->buffer_size / 2; + } return cfg; } diff --git a/src/ufo-roof-config.h b/src/ufo-roof-config.h index a22c84f..f90c5f3 100644 --- a/src/ufo-roof-config.h +++ b/src/ufo-roof-config.h @@ -25,6 +25,7 @@ typedef struct { guint max_packets; // limits maximum number of packets which are read at once guint max_packet_size; // payload_size + header_size + ...? guint buffer_size; // How many datasets we can buffer. There is no sense to have more than 2 for odered protocols (default), but having larger number could help for UDP if significant order disturbances are expected + guint drop_buffers; // If we are slow and lost some buffers, we may drop more than minimally necessary to catch up. guint network_timeout; // Maximum time (us) to wait for data on the socket diff --git a/src/ufo-roof-read-socket.c b/src/ufo-roof-read-socket.c index f213d99..7bbe8ef 100644 --- a/src/ufo-roof-read-socket.c +++ b/src/ufo-roof-read-socket.c @@ -51,6 +51,7 @@ static guint ufo_roof_read_socket(UfoRoofReadInterface *iface, uint8_t *buf, GEr msg[i].msg_hdr.msg_iovlen = 1; } + // Timeout seems broken, see BUGS in 'recvmmsg' bugs page int packets = recvmmsg(reader->socket, msg, reader->cfg->max_packets, MSG_WAITFORONE, &timeout_ts); if (packets < 0) roof_network_error_with_retval(error, 0, "recvmmsg failed, error %i", errno); diff --git a/src/ufo-roof-read-task.c b/src/ufo-roof-read-task.c index ebff9de..1582437 100644 --- a/src/ufo-roof-read-task.c +++ b/src/ufo-roof-read-task.c @@ -172,6 +172,7 @@ ufo_roof_read_task_generate (UfoTask *task, guint packets = priv->reader->read(priv->reader, output_buffer, &gerr); if (gerr) { g_warning("Error reciving data: %s", gerr->message); + g_error_free(gerr); return FALSE; } |