summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/DEVELOPMENT22
-rw-r--r--src/ufo-roof-buffer.c36
-rw-r--r--src/ufo-roof-buffer.h1
-rw-r--r--src/ufo-roof-build-task.c28
-rw-r--r--src/ufo-roof-config.c8
-rw-r--r--src/ufo-roof-config.h1
-rw-r--r--src/ufo-roof-read-socket.c1
-rw-r--r--src/ufo-roof-read-task.c1
8 files changed, 75 insertions, 23 deletions
diff --git a/src/DEVELOPMENT b/src/DEVELOPMENT
index f3381fb..a40e2bb 100644
--- a/src/DEVELOPMENT
+++ b/src/DEVELOPMENT
@@ -10,6 +10,28 @@ Architecture
for missing bits.
+Problems
+========
+ - When streaming at high speed (~ 16 data streams; 600 Mbit & 600 kpck each), the data streams quickly get
+ desynchronized (but all packets are delivered).
+ * It is unclear if problem is on the receiver side (no overloaded CPU cores) or de-synchronization is first
+ appear on the simmulation sender. The test with real hardware is required.
+ * For border case scenarios, increasing number of buffers from 2 to 10-20 helps. But at full speed, even 1000s
+ buffers are not enough. Packets counts are quickly going appart.
+ * Further increase of packet buffer provided to 'recvmmsg' does not help (even if blocking is enforced until
+ all packets are received)
+ * At the speed specified above, the system works also without libvma.
+ * Actually, with libvma a larger buffer is required. In the beginning the performance of libvma is gradually
+ speeding up (that was always like that). And during this period a significant desynchronization happens. To
+ compensate it, we need about 400 buffers with libvma as compared to only 10 required if standard Linux
+ networking is utilized.
+ - Can we pre-heat to avoid this speeding-up problem? Or it will be also not a problem with hardware?
+ - Communication breaks with small MTU sizes (bellow 1500), but this is probably not important (Packets are
+ delivered but with extreme latencies. Probably some tunning of network stack is required).
+ - Technically, everything should work if we start UFO server when data is already streamed. However, the first
+ dataset could be any. Therefore, the check fails as the data is shifted by a random number of datasets.
+
+
Questions
=========
- Can we pre-allocate several UFO buffers for forth-comming events. Currently, we need to buffer out-of-order
diff --git a/src/ufo-roof-buffer.c b/src/ufo-roof-buffer.c
index eaf9b35..f83885e 100644
--- a/src/ufo-roof-buffer.c
+++ b/src/ufo-roof-buffer.c
@@ -13,6 +13,7 @@ UfoRoofBuffer *ufo_roof_buffer_new(UfoRoofConfig *cfg, GError **error) {
if (!buffer) roof_new_error(error, "Can't allocate UfoRoofBuffer");
buffer->ring_size = cfg->buffer_size;
+ buffer->drop_buffers = cfg->drop_buffers;
buffer->fragment_size = cfg->payload_size;
buffer->dataset_size = cfg->dataset_size;
buffer->fragments_per_dataset = buffer->dataset_size / buffer->fragment_size;
@@ -46,6 +47,7 @@ void ufo_roof_buffer_free(UfoRoofBuffer *buffer) {
// fragment_id is numbered from 1 (0 - means auto)
gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, guint fragment_id, gconstpointer fragment, GError **error) {
+ gboolean ready = FALSE;
guint buffer_id;
guint dataset_id;
@@ -58,25 +60,32 @@ gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, gu
fragment_id = (fragment_id - 1) % buffer->fragments_per_stream;
buffer_id = dataset_id % buffer->ring_size;
- // Late arrived packed
-// printf("data set: %i, channel: %i, fragment: %i (buffer: %i)\n", dataset_id, stream_id, fragment_id, buffer_id);
+ // FIXME: Currently, this produces too much output. Introduce some kind of debugging mode?
if (dataset_id < buffer->current_id)
- roof_network_error_with_retval(error, FALSE, "Late arrived packet for dataset %i, currently processing %i", dataset_id, buffer->current_id);
+ return FALSE;
+// roof_network_error_with_retval(error, FALSE, "Late arrived packet for dataset %i, currently processing %i", dataset_id, buffer->current_id);
// We are not fast enough, new packets are arrvining to fast
if (dataset_id >= (buffer->current_id + buffer->ring_size)) {
// FIXME: Broken packets sanity checks? Allocate additional buffers on demand?
- if (error)
- root_set_network_error(error, "Ring buffer exhausted. Dropping datasets from %i to %i, current dataset has %i parts of %i completed",
- buffer->current_id, dataset_id - buffer->ring_size, buffer->n_fragments[buffer_id], buffer->fragments_per_dataset);
+ root_set_network_error(error, "Ring buffer exhausted. Get packet for dataset %i. Dropping datasets from %i to %i, dataset %i has %i parts of %i completed",
+ dataset_id, buffer->current_id, dataset_id - (buffer->ring_size - buffer->drop_buffers), buffer->current_id, buffer->n_fragments[buffer_id], buffer->fragments_per_dataset);
// FIXME: Send semi-complete buffers further?
// FIXME: Or shall we drop more if larger buffers are allocated?
- for (guint i = buffer->current_id; i <= (dataset_id - buffer->ring_size); i++)
- buffer->n_fragments[i%buffer->ring_size] = 0;
- buffer->current_id = dataset_id - buffer->ring_size + 1;
-
+ if ((dataset_id - buffer->current_id) > 2 * buffer->ring_size) {
+ memset(buffer->n_fragments, 0, buffer->ring_size * sizeof(_Atomic guint));
+ buffer->current_id = dataset_id;
+ } else {
+ for (guint i = buffer->current_id; i <= (dataset_id - (buffer->ring_size - buffer->drop_buffers)); i++)
+ buffer->n_fragments[i%buffer->ring_size] = 0;
+ buffer->current_id = dataset_id - (buffer->ring_size - buffer->drop_buffers) + 1;
+ }
+
+ if (buffer->n_fragments[buffer->current_id%buffer->ring_size] == buffer->fragments_per_dataset)
+ ready = TRUE;
+
// FIXME: In mult-threaded case, we need to ensure that all threads are stopped writting here (and generator is not reading) before we can reassign buffer to the new dataset.
// To avoid locking, we can store per-thread 'current_id' and only proceed to writting when all per-threads current_ids are equal or above the global value
// The updates may happen after writting/reading is finished.
@@ -97,12 +106,11 @@ gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, gu
if (buffer->n_fragments[buffer_id] == buffer->fragments_per_dataset) {
// FIXME: what about a complete dataset blocked by earlier one with only a few framents missing?
- if (dataset_id == buffer->current_id) {
- return TRUE;
- }
+ if (dataset_id == buffer->current_id)
+ ready = TRUE;
}
- return FALSE;
+ return ready;
}
diff --git a/src/ufo-roof-buffer.h b/src/ufo-roof-buffer.h
index 367f2d5..f7b2124 100644
--- a/src/ufo-roof-buffer.h
+++ b/src/ufo-roof-buffer.h
@@ -7,6 +7,7 @@ struct _UfoRoofBuffer {
guint current_id; // The ID of the first (active) dataset in the buffer
guint ring_size; // Number of datasets to buffer
+ guint drop_buffers; // If we need to catch up
uint8_t *ring_buffer; // The ring buffer
_Atomic guint *n_fragments; // Number of completed fragments in each buffer
guint *stream_fragment; // Currently processed fragment in the stream (for ordered streams)
diff --git a/src/ufo-roof-build-task.c b/src/ufo-roof-build-task.c
index fa2fdcd..821c761 100644
--- a/src/ufo-roof-build-task.c
+++ b/src/ufo-roof-build-task.c
@@ -37,6 +37,8 @@ struct _UfoRoofBuildTaskPrivate {
guint number; // Number of datasets to read
gboolean stop; // Stop flag
+
+ guint announced; // For debugging
};
static void ufo_task_interface_init (UfoTaskIface *iface);
@@ -162,31 +164,35 @@ ufo_roof_build_task_process (UfoTask *task,
// UfoRequisition in_req;
// ufo_buffer_get_requisition (inputs[0], &in_req);
- uint8_t *data = (uint8_t*)ufo_buffer_get_host_array(inputs[0], NULL);
+ const uint8_t *data = (uint8_t*)ufo_buffer_get_host_array(inputs[0], NULL);
UfoRoofPacketBlockHeader *header = UFO_ROOF_PACKET_BLOCK_HEADER(data, cfg);
if (priv->stop)
return FALSE;
-
+
+ const uint8_t *fragment = data;
for (guint i = 0; i < header->n_packets; i++) {
guint packet_id = 0;
// Otherwise considered consecutive and handled by the buffer
if (cfg->header_size >= sizeof(UfoRoofPacketHeader)) {
- UfoRoofPacketHeader *pheader = UFO_ROOF_PACKET_HEADER(data);
+ UfoRoofPacketHeader *pheader = UFO_ROOF_PACKET_HEADER(fragment);
packet_id = pheader->packet_id + 1;
}
- ready |= ufo_roof_buffer_set_fragment(buf, header->channel_id, packet_id, data, &gerr);
+ // FIXME: Can we kill here the dataset finished during the previous step of iteration
+ ready |= ufo_roof_buffer_set_fragment(buf, header->channel_id, packet_id, fragment + cfg->header_size, &gerr);
if (gerr) roof_print_error(gerr);
- data += cfg->max_packet_size;
+ fragment += cfg->max_packet_size;
}
// FIXME: if 2nd dataset is ready (2nd and 3rd?), skip the first one?
+/*
+ printf("proc (%s) - channel: %i, packets: %i, first dataset: %i\n", ready?"yes":" no", header->channel_id, header->n_packets,
+ (cfg->header_size >= sizeof(UfoRoofPacketHeader))?UFO_ROOF_PACKET_HEADER(data)->packet_id / (cfg->dataset_size / cfg->payload_size / cfg->n_streams):0);
+*/
-// printf("proc (%s) - channel: %i, packets: %i\n", ready?"yes":" no", header->channel_id, header->n_packets);
-
return !ready;
}
@@ -214,8 +220,12 @@ ufo_roof_build_task_generate (UfoTask *task,
priv->stop = TRUE;
g_object_notify_by_pspec (G_OBJECT(task), properties[PROP_STOP]);
}
-
-// printf("gen(%s) %i\n", ready?"yes":" no", buf->current_id);
+
+
+ if ((buf->current_id - priv->announced) > 1000) {
+ printf("Generating dataset %i (%s)\n", buf->current_id, ready?"yes":" no");
+ priv->announced = buf->current_id;
+ }
return ready;
}
diff --git a/src/ufo-roof-config.c b/src/ufo-roof-config.c
index 11f8bd4..812d4a2 100644
--- a/src/ufo-roof-config.c
+++ b/src/ufo-roof-config.c
@@ -83,6 +83,7 @@ UfoRoofConfig *ufo_roof_config_new(const char *config, GError **error) {
cfg->max_packets = 100;
cfg->dataset_size = 0;
cfg->buffer_size = 2;
+ cfg->drop_buffers = 0;
cfg->path = NULL;
// Read configuration
@@ -101,6 +102,7 @@ UfoRoofConfig *ufo_roof_config_new(const char *config, GError **error) {
roof_config_node_get(hardware, root, object, "hardware");
roof_config_node_get(network, root, object, "network");
roof_config_node_get(simulation, root, object, "simulation");
+ roof_config_node_get(performance, root, object, "performance");
}
if (hardware) {
@@ -123,6 +125,7 @@ UfoRoofConfig *ufo_roof_config_new(const char *config, GError **error) {
if (performance) {
roof_config_node_get(cfg->max_packets, performance, int, "packets_at_once");
roof_config_node_get(cfg->buffer_size, performance, int, "buffer_size");
+ roof_config_node_get(cfg->drop_buffers, performance, int, "drop_buffers");
}
if (simulation) {
@@ -167,6 +170,11 @@ UfoRoofConfig *ufo_roof_config_new(const char *config, GError **error) {
if (!cfg->dataset_size)
cfg->dataset_size = cfg->payload_size;
+ if (cfg->buffer_size < 4) {
+ cfg->drop_buffers = 0;
+ } else if (cfg->drop_buffers >= cfg->buffer_size) {
+ cfg->drop_buffers = cfg->buffer_size / 2;
+ }
return cfg;
}
diff --git a/src/ufo-roof-config.h b/src/ufo-roof-config.h
index a22c84f..f90c5f3 100644
--- a/src/ufo-roof-config.h
+++ b/src/ufo-roof-config.h
@@ -25,6 +25,7 @@ typedef struct {
guint max_packets; // limits maximum number of packets which are read at once
guint max_packet_size; // payload_size + header_size + ...?
guint buffer_size; // How many datasets we can buffer. There is no sense to have more than 2 for odered protocols (default), but having larger number could help for UDP if significant order disturbances are expected
+ guint drop_buffers; // If we are slow and lost some buffers, we may drop more than minimally necessary to catch up.
guint network_timeout; // Maximum time (us) to wait for data on the socket
diff --git a/src/ufo-roof-read-socket.c b/src/ufo-roof-read-socket.c
index f213d99..7bbe8ef 100644
--- a/src/ufo-roof-read-socket.c
+++ b/src/ufo-roof-read-socket.c
@@ -51,6 +51,7 @@ static guint ufo_roof_read_socket(UfoRoofReadInterface *iface, uint8_t *buf, GEr
msg[i].msg_hdr.msg_iovlen = 1;
}
+ // Timeout seems broken, see BUGS in 'recvmmsg' bugs page
int packets = recvmmsg(reader->socket, msg, reader->cfg->max_packets, MSG_WAITFORONE, &timeout_ts);
if (packets < 0) roof_network_error_with_retval(error, 0, "recvmmsg failed, error %i", errno);
diff --git a/src/ufo-roof-read-task.c b/src/ufo-roof-read-task.c
index ebff9de..1582437 100644
--- a/src/ufo-roof-read-task.c
+++ b/src/ufo-roof-read-task.c
@@ -172,6 +172,7 @@ ufo_roof_read_task_generate (UfoTask *task,
guint packets = priv->reader->read(priv->reader, output_buffer, &gerr);
if (gerr) {
g_warning("Error reciving data: %s", gerr->message);
+ g_error_free(gerr);
return FALSE;
}