From ad39481f238218ed3f3ce0f46c975a7ab178b5b5 Mon Sep 17 00:00:00 2001
From: "Suren A. Chilingaryan" <csa@suren.me>
Date: Sun, 17 Nov 2019 21:09:05 +0100
Subject: Few timeouts & exit conditions

---
 src/DEVELOPMENT            | 10 +++++++++-
 src/ufo-roof-buffer.c      |  7 ++++++-
 src/ufo-roof-buffer.h      |  3 ++-
 src/ufo-roof-build-task.c  | 29 +++++++++++++++++++++++++----
 src/ufo-roof-read-socket.c | 17 ++++++++++++++++-
 5 files changed, 58 insertions(+), 8 deletions(-)

(limited to 'src')

diff --git a/src/DEVELOPMENT b/src/DEVELOPMENT
index a40e2bb..18a8011 100644
--- a/src/DEVELOPMENT
+++ b/src/DEVELOPMENT
@@ -25,7 +25,15 @@ Problems
     speeding up (that was always like that). And during this period a significant desynchronization happens. To
     compensate it, we need about 400 buffers with libvma as compared to only 10 required if standard Linux 
     networking is utilized.
- - Can we pre-heat to avoid this speeding-up problem? Or it will be also not a problem with hardware?
+ - In any case (LibVMA or not), some packets will be lost in the beginning if high-speed communication is tested.
+    * Usually, first packets are transferred OK, but, then, a few packets will be lost occasionally here and there
+    (resulting in broken frames). This basically breaks grabbing a few packets and exitig. Unclear if server- or 
+    client-side problem (makes sense to see how real-hardware will behave).
+    * Can we pre-heat to avoid this speeding-up problem (increase pre-allocated buffers, disable power-saving 
+    mode, ??) Or it will be also not a problem with hardware? We can send UDP packets (should be send from another
+    host), but packets are still lost:
+        for i in $(seq 4000 4015); do echo "data" > /dev/udp/192.168.34.84/$i; done
+    * The following doesn't help: new version of libvma, tunning of the options.
  - Communication breaks with small MTU sizes (bellow 1500), but this is probably not important (Packets are 
  delivered but with extreme latencies. Probably some tunning of network stack is required).
  - Technically, everything should work if we start UFO server when data is already streamed. However, the first
diff --git a/src/ufo-roof-buffer.c b/src/ufo-roof-buffer.c
index f83885e..179d153 100644
--- a/src/ufo-roof-buffer.c
+++ b/src/ufo-roof-buffer.c
@@ -1,5 +1,6 @@
 #include <stdio.h>
 #include <stdint.h>
+#include <time.h>
 
 #include "glib.h"
 
@@ -8,10 +9,11 @@
 
 // This is currently not thread safe. With dual-filter architecture this will be called sequentially.
 
-UfoRoofBuffer *ufo_roof_buffer_new(UfoRoofConfig *cfg, GError **error) {
+UfoRoofBuffer *ufo_roof_buffer_new(UfoRoofConfig *cfg, guint max_datasets, GError **error) {
     UfoRoofBuffer *buffer = (UfoRoofBuffer*)calloc(1, sizeof(UfoRoofBuffer));
     if (!buffer) roof_new_error(error, "Can't allocate UfoRoofBuffer");
 
+    buffer->max_datasets = max_datasets;
     buffer->ring_size = cfg->buffer_size;
     buffer->drop_buffers = cfg->drop_buffers;
     buffer->fragment_size = cfg->payload_size;
@@ -65,6 +67,9 @@ gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, gu
 	return FALSE;
 //        roof_network_error_with_retval(error, FALSE, "Late arrived packet for dataset %i, currently processing %i", dataset_id, buffer->current_id);
 
+    if ((buffer->max_datasets)&&(dataset_id >= buffer->max_datasets))
+	return FALSE;
+
         // We are not fast enough, new packets are arrvining to fast        
     if (dataset_id >= (buffer->current_id + buffer->ring_size)) {
             // FIXME: Broken packets sanity checks? Allocate additional buffers on demand?
diff --git a/src/ufo-roof-buffer.h b/src/ufo-roof-buffer.h
index f7b2124..c4c8474 100644
--- a/src/ufo-roof-buffer.h
+++ b/src/ufo-roof-buffer.h
@@ -14,6 +14,7 @@ struct _UfoRoofBuffer {
 //    int		                *fragments;				// Mark individual completed fragments (if we care for partial data)
 
 
+    guint			max_datasets;				// Only the specified number of datasets will be buffered, the rest will be silently dropped
     guint                       dataset_size;                           // Size (in bytes) of a full dataset
     guint                       fragment_size;                          // Size (in bytes) of a single fragment (we expect fixed-size fragments at the moment)
     
@@ -23,7 +24,7 @@ struct _UfoRoofBuffer {
 
 typedef struct _UfoRoofBuffer	UfoRoofBuffer;
 
-UfoRoofBuffer *ufo_roof_buffer_new(UfoRoofConfig *cfg, GError **error);
+UfoRoofBuffer *ufo_roof_buffer_new(UfoRoofConfig *cfg, guint max_datasets, GError **error);
 void ufo_roof_buffer_free(UfoRoofBuffer *buf);
 
 gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, guint fragment_id, gconstpointer fragment, GError **error);
diff --git a/src/ufo-roof-build-task.c b/src/ufo-roof-build-task.c
index 821c761..e5e5518 100644
--- a/src/ufo-roof-build-task.c
+++ b/src/ufo-roof-build-task.c
@@ -39,6 +39,8 @@ struct _UfoRoofBuildTaskPrivate {
     gboolean                    stop;                                   // Stop flag
 
     guint 			announced;				// For debugging
+
+    struct timespec		last_fragment_timestamp;
 };
 
 static void ufo_task_interface_init (UfoTaskIface *iface);
@@ -82,9 +84,11 @@ ufo_roof_build_task_setup (UfoTask *task,
         roof_propagate_error(error, gerr, "roof-build-setup: ");
 
 
-    priv->buf = ufo_roof_buffer_new(priv->cfg, &gerr);
+    priv->buf = ufo_roof_buffer_new(priv->cfg, priv->number, &gerr);
     if (!priv->buf) 
         roof_propagate_error(error, gerr, "roof-build-setup: ");
+
+    clock_gettime(CLOCK_REALTIME, &priv->last_fragment_timestamp);
 }
 
 static void
@@ -188,8 +192,23 @@ ufo_roof_build_task_process (UfoTask *task,
     }
     
     // FIXME: if 2nd dataset is ready (2nd and 3rd?), skip the first one?
+
+    if (ready) {
+	clock_gettime(CLOCK_REALTIME, &priv->last_fragment_timestamp);
+    } else {
+	struct timespec current_time;
+	clock_gettime(CLOCK_REALTIME, &current_time);
+
+	    // No new accepted events within timeout
+        if (((current_time.tv_sec - priv->last_fragment_timestamp.tv_sec) * 1000000 +  (current_time.tv_nsec - priv->last_fragment_timestamp.tv_nsec) / 1000) > cfg->network_timeout) {
+	    priv->stop = TRUE;
+	    g_object_notify_by_pspec (G_OBJECT(task), properties[PROP_STOP]);
+	}
+    }
+
+
 /*
-    printf("proc (%s) - channel: %i, packets: %i, first dataset: %i\n", ready?"yes":" no", header->channel_id, header->n_packets, 
+    printf("proc (%s - %u of %u) - channel: %i, packets: %i, first dataset: %i\n", ready?"yes":" no", buf->n_fragments[(buf->current_id)%buf->ring_size], buf->fragments_per_dataset, header->channel_id, header->n_packets, 
 	(cfg->header_size >= sizeof(UfoRoofPacketHeader))?UFO_ROOF_PACKET_HEADER(data)->packet_id / (cfg->dataset_size / cfg->payload_size / cfg->n_streams):0);
 */
 
@@ -216,14 +235,16 @@ ufo_roof_build_task_generate (UfoTask *task,
     ready = ufo_roof_buffer_get_dataset(buf, output_buffer, &gerr);
     if (gerr) roof_print_error(gerr);
 
+	// FIXME: Or shall we start from counting from the ID of the first registerd dataset
     if ((priv->number)&&(buf->current_id >= priv->number)) {
+//	printf("%u datasets processed, stopping\n", buf->current_id);
         priv->stop = TRUE;
         g_object_notify_by_pspec (G_OBJECT(task), properties[PROP_STOP]);
     }
 
 
-    if ((buf->current_id - priv->announced) > 1000) {
-	printf("Generating dataset %i (%s)\n", buf->current_id, ready?"yes":" no");
+    if ((priv->number < 100)||((buf->current_id - priv->announced) > 1000)) {
+	printf("Generating dataset %i (%s), next: %u out of %u)\n", buf->current_id, ready?"yes":" no", buf->n_fragments[buf->current_id%buf->ring_size], buf->fragments_per_dataset);
 	priv->announced = buf->current_id;
     }
 
diff --git a/src/ufo-roof-read-socket.c b/src/ufo-roof-read-socket.c
index 7bbe8ef..e8f7ce4 100644
--- a/src/ufo-roof-read-socket.c
+++ b/src/ufo-roof-read-socket.c
@@ -30,6 +30,7 @@ static void ufo_roof_read_socket_free(UfoRoofReadInterface *iface) {
 }
 
 static guint ufo_roof_read_socket(UfoRoofReadInterface *iface, uint8_t *buf, GError **error) {
+    int packets;
     struct timespec timeout_ts;
 
     UfoRoofReadSocket *reader = (UfoRoofReadSocket*)iface;
@@ -51,11 +52,17 @@ static guint ufo_roof_read_socket(UfoRoofReadInterface *iface, uint8_t *buf, GEr
         msg[i].msg_hdr.msg_iovlen = 1;
     }
 
+//retry:
 	// Timeout seems broken, see BUGS in 'recvmmsg' bugs page
-    int packets = recvmmsg(reader->socket, msg, reader->cfg->max_packets, MSG_WAITFORONE, &timeout_ts);
+    packets = recvmmsg(reader->socket, msg, reader->cfg->max_packets, MSG_WAITFORONE, &timeout_ts);
     if (packets < 0) roof_network_error_with_retval(error, 0, "recvmmsg failed, error %i", errno);
 
+/*
         // FIXME: Shall we verify packets consistency here? We can check at least the sizes...
+    if ((packets == 1)&&(msg[0].msg_len < 16)) {
+	goto retry;
+    }
+*/
 
     return (guint)packets;
 }
@@ -113,6 +120,14 @@ UfoRoofReadInterface *ufo_roof_read_socket_new(UfoRoofConfig *cfg, guint id, GEr
         roof_new_error(error, "Error (%i) binding socket (%s) for address (%s) on port (%s)", err, cfg->protocol, addr_str, port_str);
     }
 
+/*
+	// Send ping request to force initialization
+    char msg[4];
+    addr_str = "192.168.34.83";
+    getaddrinfo(addr_str, port_str, &sockaddr_hints, &sockaddr_info);
+    sendto(reader->socket, msg, sizeof(msg), 0, sockaddr_info->ai_addr, sockaddr_info->ai_addrlen);
+*/
+
     freeaddrinfo(sockaddr_info);
 
     return (UfoRoofReadInterface*)reader;
-- 
cgit v1.2.3