summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/DEVELOPMENT10
-rw-r--r--src/ufo-roof-buffer.c7
-rw-r--r--src/ufo-roof-buffer.h3
-rw-r--r--src/ufo-roof-build-task.c29
-rw-r--r--src/ufo-roof-read-socket.c17
-rw-r--r--tests/config.sh3
-rwxr-xr-xtests/roof.sh1
7 files changed, 60 insertions, 10 deletions
diff --git a/src/DEVELOPMENT b/src/DEVELOPMENT
index a40e2bb..18a8011 100644
--- a/src/DEVELOPMENT
+++ b/src/DEVELOPMENT
@@ -25,7 +25,15 @@ Problems
speeding up (that was always like that). And during this period a significant desynchronization happens. To
compensate it, we need about 400 buffers with libvma as compared to only 10 required if standard Linux
networking is utilized.
- - Can we pre-heat to avoid this speeding-up problem? Or it will be also not a problem with hardware?
+ - In any case (LibVMA or not), some packets will be lost in the beginning if high-speed communication is tested.
+ * Usually, first packets are transferred OK, but, then, a few packets will be lost occasionally here and there
+ (resulting in broken frames). This basically breaks grabbing a few packets and exitig. Unclear if server- or
+ client-side problem (makes sense to see how real-hardware will behave).
+ * Can we pre-heat to avoid this speeding-up problem (increase pre-allocated buffers, disable power-saving
+ mode, ??) Or it will be also not a problem with hardware? We can send UDP packets (should be send from another
+ host), but packets are still lost:
+ for i in $(seq 4000 4015); do echo "data" > /dev/udp/192.168.34.84/$i; done
+ * The following doesn't help: new version of libvma, tunning of the options.
- Communication breaks with small MTU sizes (bellow 1500), but this is probably not important (Packets are
delivered but with extreme latencies. Probably some tunning of network stack is required).
- Technically, everything should work if we start UFO server when data is already streamed. However, the first
diff --git a/src/ufo-roof-buffer.c b/src/ufo-roof-buffer.c
index f83885e..179d153 100644
--- a/src/ufo-roof-buffer.c
+++ b/src/ufo-roof-buffer.c
@@ -1,5 +1,6 @@
#include <stdio.h>
#include <stdint.h>
+#include <time.h>
#include "glib.h"
@@ -8,10 +9,11 @@
// This is currently not thread safe. With dual-filter architecture this will be called sequentially.
-UfoRoofBuffer *ufo_roof_buffer_new(UfoRoofConfig *cfg, GError **error) {
+UfoRoofBuffer *ufo_roof_buffer_new(UfoRoofConfig *cfg, guint max_datasets, GError **error) {
UfoRoofBuffer *buffer = (UfoRoofBuffer*)calloc(1, sizeof(UfoRoofBuffer));
if (!buffer) roof_new_error(error, "Can't allocate UfoRoofBuffer");
+ buffer->max_datasets = max_datasets;
buffer->ring_size = cfg->buffer_size;
buffer->drop_buffers = cfg->drop_buffers;
buffer->fragment_size = cfg->payload_size;
@@ -65,6 +67,9 @@ gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, gu
return FALSE;
// roof_network_error_with_retval(error, FALSE, "Late arrived packet for dataset %i, currently processing %i", dataset_id, buffer->current_id);
+ if ((buffer->max_datasets)&&(dataset_id >= buffer->max_datasets))
+ return FALSE;
+
// We are not fast enough, new packets are arrvining to fast
if (dataset_id >= (buffer->current_id + buffer->ring_size)) {
// FIXME: Broken packets sanity checks? Allocate additional buffers on demand?
diff --git a/src/ufo-roof-buffer.h b/src/ufo-roof-buffer.h
index f7b2124..c4c8474 100644
--- a/src/ufo-roof-buffer.h
+++ b/src/ufo-roof-buffer.h
@@ -14,6 +14,7 @@ struct _UfoRoofBuffer {
// int *fragments; // Mark individual completed fragments (if we care for partial data)
+ guint max_datasets; // Only the specified number of datasets will be buffered, the rest will be silently dropped
guint dataset_size; // Size (in bytes) of a full dataset
guint fragment_size; // Size (in bytes) of a single fragment (we expect fixed-size fragments at the moment)
@@ -23,7 +24,7 @@ struct _UfoRoofBuffer {
typedef struct _UfoRoofBuffer UfoRoofBuffer;
-UfoRoofBuffer *ufo_roof_buffer_new(UfoRoofConfig *cfg, GError **error);
+UfoRoofBuffer *ufo_roof_buffer_new(UfoRoofConfig *cfg, guint max_datasets, GError **error);
void ufo_roof_buffer_free(UfoRoofBuffer *buf);
gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, guint fragment_id, gconstpointer fragment, GError **error);
diff --git a/src/ufo-roof-build-task.c b/src/ufo-roof-build-task.c
index 821c761..e5e5518 100644
--- a/src/ufo-roof-build-task.c
+++ b/src/ufo-roof-build-task.c
@@ -39,6 +39,8 @@ struct _UfoRoofBuildTaskPrivate {
gboolean stop; // Stop flag
guint announced; // For debugging
+
+ struct timespec last_fragment_timestamp;
};
static void ufo_task_interface_init (UfoTaskIface *iface);
@@ -82,9 +84,11 @@ ufo_roof_build_task_setup (UfoTask *task,
roof_propagate_error(error, gerr, "roof-build-setup: ");
- priv->buf = ufo_roof_buffer_new(priv->cfg, &gerr);
+ priv->buf = ufo_roof_buffer_new(priv->cfg, priv->number, &gerr);
if (!priv->buf)
roof_propagate_error(error, gerr, "roof-build-setup: ");
+
+ clock_gettime(CLOCK_REALTIME, &priv->last_fragment_timestamp);
}
static void
@@ -188,8 +192,23 @@ ufo_roof_build_task_process (UfoTask *task,
}
// FIXME: if 2nd dataset is ready (2nd and 3rd?), skip the first one?
+
+ if (ready) {
+ clock_gettime(CLOCK_REALTIME, &priv->last_fragment_timestamp);
+ } else {
+ struct timespec current_time;
+ clock_gettime(CLOCK_REALTIME, &current_time);
+
+ // No new accepted events within timeout
+ if (((current_time.tv_sec - priv->last_fragment_timestamp.tv_sec) * 1000000 + (current_time.tv_nsec - priv->last_fragment_timestamp.tv_nsec) / 1000) > cfg->network_timeout) {
+ priv->stop = TRUE;
+ g_object_notify_by_pspec (G_OBJECT(task), properties[PROP_STOP]);
+ }
+ }
+
+
/*
- printf("proc (%s) - channel: %i, packets: %i, first dataset: %i\n", ready?"yes":" no", header->channel_id, header->n_packets,
+ printf("proc (%s - %u of %u) - channel: %i, packets: %i, first dataset: %i\n", ready?"yes":" no", buf->n_fragments[(buf->current_id)%buf->ring_size], buf->fragments_per_dataset, header->channel_id, header->n_packets,
(cfg->header_size >= sizeof(UfoRoofPacketHeader))?UFO_ROOF_PACKET_HEADER(data)->packet_id / (cfg->dataset_size / cfg->payload_size / cfg->n_streams):0);
*/
@@ -216,14 +235,16 @@ ufo_roof_build_task_generate (UfoTask *task,
ready = ufo_roof_buffer_get_dataset(buf, output_buffer, &gerr);
if (gerr) roof_print_error(gerr);
+ // FIXME: Or shall we start from counting from the ID of the first registerd dataset
if ((priv->number)&&(buf->current_id >= priv->number)) {
+// printf("%u datasets processed, stopping\n", buf->current_id);
priv->stop = TRUE;
g_object_notify_by_pspec (G_OBJECT(task), properties[PROP_STOP]);
}
- if ((buf->current_id - priv->announced) > 1000) {
- printf("Generating dataset %i (%s)\n", buf->current_id, ready?"yes":" no");
+ if ((priv->number < 100)||((buf->current_id - priv->announced) > 1000)) {
+ printf("Generating dataset %i (%s), next: %u out of %u)\n", buf->current_id, ready?"yes":" no", buf->n_fragments[buf->current_id%buf->ring_size], buf->fragments_per_dataset);
priv->announced = buf->current_id;
}
diff --git a/src/ufo-roof-read-socket.c b/src/ufo-roof-read-socket.c
index 7bbe8ef..e8f7ce4 100644
--- a/src/ufo-roof-read-socket.c
+++ b/src/ufo-roof-read-socket.c
@@ -30,6 +30,7 @@ static void ufo_roof_read_socket_free(UfoRoofReadInterface *iface) {
}
static guint ufo_roof_read_socket(UfoRoofReadInterface *iface, uint8_t *buf, GError **error) {
+ int packets;
struct timespec timeout_ts;
UfoRoofReadSocket *reader = (UfoRoofReadSocket*)iface;
@@ -51,11 +52,17 @@ static guint ufo_roof_read_socket(UfoRoofReadInterface *iface, uint8_t *buf, GEr
msg[i].msg_hdr.msg_iovlen = 1;
}
+//retry:
// Timeout seems broken, see BUGS in 'recvmmsg' bugs page
- int packets = recvmmsg(reader->socket, msg, reader->cfg->max_packets, MSG_WAITFORONE, &timeout_ts);
+ packets = recvmmsg(reader->socket, msg, reader->cfg->max_packets, MSG_WAITFORONE, &timeout_ts);
if (packets < 0) roof_network_error_with_retval(error, 0, "recvmmsg failed, error %i", errno);
+/*
// FIXME: Shall we verify packets consistency here? We can check at least the sizes...
+ if ((packets == 1)&&(msg[0].msg_len < 16)) {
+ goto retry;
+ }
+*/
return (guint)packets;
}
@@ -113,6 +120,14 @@ UfoRoofReadInterface *ufo_roof_read_socket_new(UfoRoofConfig *cfg, guint id, GEr
roof_new_error(error, "Error (%i) binding socket (%s) for address (%s) on port (%s)", err, cfg->protocol, addr_str, port_str);
}
+/*
+ // Send ping request to force initialization
+ char msg[4];
+ addr_str = "192.168.34.83";
+ getaddrinfo(addr_str, port_str, &sockaddr_hints, &sockaddr_info);
+ sendto(reader->socket, msg, sizeof(msg), 0, sockaddr_info->ai_addr, sockaddr_info->ai_addrlen);
+*/
+
freeaddrinfo(sockaddr_info);
return (UfoRoofReadInterface*)reader;
diff --git a/tests/config.sh b/tests/config.sh
index c1c656b..ecb46b6 100644
--- a/tests/config.sh
+++ b/tests/config.sh
@@ -9,7 +9,8 @@ vma_lib_path=/mnt/ands/lib64-fedora/
[ $el7 -eq 1 ] && vma_lib_path="${vma_lib_path/fedora/centos}"
# Standard library
-vma_lib=${vma_lib_path}/libvma.so.8.6.10
+#vma_lib=${vma_lib_path}/libvma.so.8.6.10
+vma_lib=${vma_lib_path}/libvma.so.8.9.5
# With Mellanox OFED extensions (./configure --enable-socketxtreme)
#vma_lib=${vma_lib_path}/mlx/libvma.so.8.6.10
diff --git a/tests/roof.sh b/tests/roof.sh
index df51679..12a4cfa 100755
--- a/tests/roof.sh
+++ b/tests/roof.sh
@@ -11,7 +11,6 @@ ulimit -l unlimited
echo 1000000000 > /proc/sys/kernel/shmmax # 18446744073692774399
echo 8000 > /proc/sys/vm/nr_hugepages # 0
-
#VMA_THREAD_MODE=3 VMA_MTU=0 VMA_RX_POLL=0 VMA_SELECT_POLL=0 VMA_RING_ALLOCATION_LOGIC_RX=20 VMA_RX_BUFS=$bufs LD_PRELOAD=$vma_lib \
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:/usr/local/lib64" GI_TYPELIB_PATH="/usr/local/lib64/girepository-1.0/" \
python roof.py "$@"