diff options
-rw-r--r-- | src/DEVELOPMENT | 10 | ||||
-rw-r--r-- | src/ufo-roof-buffer.c | 7 | ||||
-rw-r--r-- | src/ufo-roof-buffer.h | 3 | ||||
-rw-r--r-- | src/ufo-roof-build-task.c | 29 | ||||
-rw-r--r-- | src/ufo-roof-read-socket.c | 17 | ||||
-rw-r--r-- | tests/config.sh | 3 | ||||
-rwxr-xr-x | tests/roof.sh | 1 |
7 files changed, 60 insertions, 10 deletions
diff --git a/src/DEVELOPMENT b/src/DEVELOPMENT index a40e2bb..18a8011 100644 --- a/src/DEVELOPMENT +++ b/src/DEVELOPMENT @@ -25,7 +25,15 @@ Problems speeding up (that was always like that). And during this period a significant desynchronization happens. To compensate it, we need about 400 buffers with libvma as compared to only 10 required if standard Linux networking is utilized. - - Can we pre-heat to avoid this speeding-up problem? Or it will be also not a problem with hardware? + - In any case (LibVMA or not), some packets will be lost in the beginning if high-speed communication is tested. + * Usually, first packets are transferred OK, but, then, a few packets will be lost occasionally here and there + (resulting in broken frames). This basically breaks grabbing a few packets and exitig. Unclear if server- or + client-side problem (makes sense to see how real-hardware will behave). + * Can we pre-heat to avoid this speeding-up problem (increase pre-allocated buffers, disable power-saving + mode, ??) Or it will be also not a problem with hardware? We can send UDP packets (should be send from another + host), but packets are still lost: + for i in $(seq 4000 4015); do echo "data" > /dev/udp/192.168.34.84/$i; done + * The following doesn't help: new version of libvma, tunning of the options. - Communication breaks with small MTU sizes (bellow 1500), but this is probably not important (Packets are delivered but with extreme latencies. Probably some tunning of network stack is required). - Technically, everything should work if we start UFO server when data is already streamed. However, the first diff --git a/src/ufo-roof-buffer.c b/src/ufo-roof-buffer.c index f83885e..179d153 100644 --- a/src/ufo-roof-buffer.c +++ b/src/ufo-roof-buffer.c @@ -1,5 +1,6 @@ #include <stdio.h> #include <stdint.h> +#include <time.h> #include "glib.h" @@ -8,10 +9,11 @@ // This is currently not thread safe. With dual-filter architecture this will be called sequentially. -UfoRoofBuffer *ufo_roof_buffer_new(UfoRoofConfig *cfg, GError **error) { +UfoRoofBuffer *ufo_roof_buffer_new(UfoRoofConfig *cfg, guint max_datasets, GError **error) { UfoRoofBuffer *buffer = (UfoRoofBuffer*)calloc(1, sizeof(UfoRoofBuffer)); if (!buffer) roof_new_error(error, "Can't allocate UfoRoofBuffer"); + buffer->max_datasets = max_datasets; buffer->ring_size = cfg->buffer_size; buffer->drop_buffers = cfg->drop_buffers; buffer->fragment_size = cfg->payload_size; @@ -65,6 +67,9 @@ gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, gu return FALSE; // roof_network_error_with_retval(error, FALSE, "Late arrived packet for dataset %i, currently processing %i", dataset_id, buffer->current_id); + if ((buffer->max_datasets)&&(dataset_id >= buffer->max_datasets)) + return FALSE; + // We are not fast enough, new packets are arrvining to fast if (dataset_id >= (buffer->current_id + buffer->ring_size)) { // FIXME: Broken packets sanity checks? Allocate additional buffers on demand? diff --git a/src/ufo-roof-buffer.h b/src/ufo-roof-buffer.h index f7b2124..c4c8474 100644 --- a/src/ufo-roof-buffer.h +++ b/src/ufo-roof-buffer.h @@ -14,6 +14,7 @@ struct _UfoRoofBuffer { // int *fragments; // Mark individual completed fragments (if we care for partial data) + guint max_datasets; // Only the specified number of datasets will be buffered, the rest will be silently dropped guint dataset_size; // Size (in bytes) of a full dataset guint fragment_size; // Size (in bytes) of a single fragment (we expect fixed-size fragments at the moment) @@ -23,7 +24,7 @@ struct _UfoRoofBuffer { typedef struct _UfoRoofBuffer UfoRoofBuffer; -UfoRoofBuffer *ufo_roof_buffer_new(UfoRoofConfig *cfg, GError **error); +UfoRoofBuffer *ufo_roof_buffer_new(UfoRoofConfig *cfg, guint max_datasets, GError **error); void ufo_roof_buffer_free(UfoRoofBuffer *buf); gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, guint fragment_id, gconstpointer fragment, GError **error); diff --git a/src/ufo-roof-build-task.c b/src/ufo-roof-build-task.c index 821c761..e5e5518 100644 --- a/src/ufo-roof-build-task.c +++ b/src/ufo-roof-build-task.c @@ -39,6 +39,8 @@ struct _UfoRoofBuildTaskPrivate { gboolean stop; // Stop flag guint announced; // For debugging + + struct timespec last_fragment_timestamp; }; static void ufo_task_interface_init (UfoTaskIface *iface); @@ -82,9 +84,11 @@ ufo_roof_build_task_setup (UfoTask *task, roof_propagate_error(error, gerr, "roof-build-setup: "); - priv->buf = ufo_roof_buffer_new(priv->cfg, &gerr); + priv->buf = ufo_roof_buffer_new(priv->cfg, priv->number, &gerr); if (!priv->buf) roof_propagate_error(error, gerr, "roof-build-setup: "); + + clock_gettime(CLOCK_REALTIME, &priv->last_fragment_timestamp); } static void @@ -188,8 +192,23 @@ ufo_roof_build_task_process (UfoTask *task, } // FIXME: if 2nd dataset is ready (2nd and 3rd?), skip the first one? + + if (ready) { + clock_gettime(CLOCK_REALTIME, &priv->last_fragment_timestamp); + } else { + struct timespec current_time; + clock_gettime(CLOCK_REALTIME, ¤t_time); + + // No new accepted events within timeout + if (((current_time.tv_sec - priv->last_fragment_timestamp.tv_sec) * 1000000 + (current_time.tv_nsec - priv->last_fragment_timestamp.tv_nsec) / 1000) > cfg->network_timeout) { + priv->stop = TRUE; + g_object_notify_by_pspec (G_OBJECT(task), properties[PROP_STOP]); + } + } + + /* - printf("proc (%s) - channel: %i, packets: %i, first dataset: %i\n", ready?"yes":" no", header->channel_id, header->n_packets, + printf("proc (%s - %u of %u) - channel: %i, packets: %i, first dataset: %i\n", ready?"yes":" no", buf->n_fragments[(buf->current_id)%buf->ring_size], buf->fragments_per_dataset, header->channel_id, header->n_packets, (cfg->header_size >= sizeof(UfoRoofPacketHeader))?UFO_ROOF_PACKET_HEADER(data)->packet_id / (cfg->dataset_size / cfg->payload_size / cfg->n_streams):0); */ @@ -216,14 +235,16 @@ ufo_roof_build_task_generate (UfoTask *task, ready = ufo_roof_buffer_get_dataset(buf, output_buffer, &gerr); if (gerr) roof_print_error(gerr); + // FIXME: Or shall we start from counting from the ID of the first registerd dataset if ((priv->number)&&(buf->current_id >= priv->number)) { +// printf("%u datasets processed, stopping\n", buf->current_id); priv->stop = TRUE; g_object_notify_by_pspec (G_OBJECT(task), properties[PROP_STOP]); } - if ((buf->current_id - priv->announced) > 1000) { - printf("Generating dataset %i (%s)\n", buf->current_id, ready?"yes":" no"); + if ((priv->number < 100)||((buf->current_id - priv->announced) > 1000)) { + printf("Generating dataset %i (%s), next: %u out of %u)\n", buf->current_id, ready?"yes":" no", buf->n_fragments[buf->current_id%buf->ring_size], buf->fragments_per_dataset); priv->announced = buf->current_id; } diff --git a/src/ufo-roof-read-socket.c b/src/ufo-roof-read-socket.c index 7bbe8ef..e8f7ce4 100644 --- a/src/ufo-roof-read-socket.c +++ b/src/ufo-roof-read-socket.c @@ -30,6 +30,7 @@ static void ufo_roof_read_socket_free(UfoRoofReadInterface *iface) { } static guint ufo_roof_read_socket(UfoRoofReadInterface *iface, uint8_t *buf, GError **error) { + int packets; struct timespec timeout_ts; UfoRoofReadSocket *reader = (UfoRoofReadSocket*)iface; @@ -51,11 +52,17 @@ static guint ufo_roof_read_socket(UfoRoofReadInterface *iface, uint8_t *buf, GEr msg[i].msg_hdr.msg_iovlen = 1; } +//retry: // Timeout seems broken, see BUGS in 'recvmmsg' bugs page - int packets = recvmmsg(reader->socket, msg, reader->cfg->max_packets, MSG_WAITFORONE, &timeout_ts); + packets = recvmmsg(reader->socket, msg, reader->cfg->max_packets, MSG_WAITFORONE, &timeout_ts); if (packets < 0) roof_network_error_with_retval(error, 0, "recvmmsg failed, error %i", errno); +/* // FIXME: Shall we verify packets consistency here? We can check at least the sizes... + if ((packets == 1)&&(msg[0].msg_len < 16)) { + goto retry; + } +*/ return (guint)packets; } @@ -113,6 +120,14 @@ UfoRoofReadInterface *ufo_roof_read_socket_new(UfoRoofConfig *cfg, guint id, GEr roof_new_error(error, "Error (%i) binding socket (%s) for address (%s) on port (%s)", err, cfg->protocol, addr_str, port_str); } +/* + // Send ping request to force initialization + char msg[4]; + addr_str = "192.168.34.83"; + getaddrinfo(addr_str, port_str, &sockaddr_hints, &sockaddr_info); + sendto(reader->socket, msg, sizeof(msg), 0, sockaddr_info->ai_addr, sockaddr_info->ai_addrlen); +*/ + freeaddrinfo(sockaddr_info); return (UfoRoofReadInterface*)reader; diff --git a/tests/config.sh b/tests/config.sh index c1c656b..ecb46b6 100644 --- a/tests/config.sh +++ b/tests/config.sh @@ -9,7 +9,8 @@ vma_lib_path=/mnt/ands/lib64-fedora/ [ $el7 -eq 1 ] && vma_lib_path="${vma_lib_path/fedora/centos}" # Standard library -vma_lib=${vma_lib_path}/libvma.so.8.6.10 +#vma_lib=${vma_lib_path}/libvma.so.8.6.10 +vma_lib=${vma_lib_path}/libvma.so.8.9.5 # With Mellanox OFED extensions (./configure --enable-socketxtreme) #vma_lib=${vma_lib_path}/mlx/libvma.so.8.6.10 diff --git a/tests/roof.sh b/tests/roof.sh index df51679..12a4cfa 100755 --- a/tests/roof.sh +++ b/tests/roof.sh @@ -11,7 +11,6 @@ ulimit -l unlimited echo 1000000000 > /proc/sys/kernel/shmmax # 18446744073692774399 echo 8000 > /proc/sys/vm/nr_hugepages # 0 - #VMA_THREAD_MODE=3 VMA_MTU=0 VMA_RX_POLL=0 VMA_SELECT_POLL=0 VMA_RING_ALLOCATION_LOGIC_RX=20 VMA_RX_BUFS=$bufs LD_PRELOAD=$vma_lib \ LD_LIBRARY_PATH="$LD_LIBRARY_PATH:/usr/local/lib64" GI_TYPELIB_PATH="/usr/local/lib64/girepository-1.0/" \ python roof.py "$@" |