summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSuren A. Chilingaryan <csa@suren.me>2020-09-03 03:00:30 +0200
committerSuren A. Chilingaryan <csa@suren.me>2020-09-03 03:00:30 +0200
commit5172421d248250b4ab3b69eb57fd83656e23a4da (patch)
treea499d9f1dd0b74b754816884a59927b3171656fc
parent7b2e6168b049be9e7852b2d364d897592eff69fc (diff)
downloadufo-roof-temp-master.tar.gz
ufo-roof-temp-master.tar.bz2
ufo-roof-temp-master.tar.xz
ufo-roof-temp-master.zip
This is unfinished work implemeting out-of-UFO network serversHEADmaster
-rw-r--r--docs/architecture.txt98
-rw-r--r--docs/mellanox.txt88
-rw-r--r--docs/todo.txt7
-rw-r--r--src/CMakeLists.txt10
-rw-r--r--src/hw_config.h8
-rw-r--r--src/hw_sched.c398
-rw-r--r--src/hw_sched.h146
-rw-r--r--src/hw_thread.c143
-rw-r--r--src/hw_thread.h76
-rw-r--r--src/meson.build7
-rw-r--r--src/roof-buffer.c (renamed from src/ufo-roof-buffer.c)85
-rw-r--r--src/roof-buffer.h (renamed from src/ufo-roof-buffer.h)29
-rw-r--r--src/roof-config.c (renamed from src/ufo-roof-config.c)52
-rw-r--r--src/roof-config.h (renamed from src/ufo-roof-config.h)20
-rw-r--r--src/roof-error.h (renamed from src/ufo-roof-error.h)9
-rw-r--r--src/roof-read-file.c (renamed from src/ufo-roof-read-file.c)56
-rw-r--r--src/roof-read-file.h8
-rw-r--r--src/roof-read-socket.c (renamed from src/ufo-roof-read-socket.c)89
-rw-r--r--src/roof-read-socket.h8
-rw-r--r--src/roof-read.c107
-rw-r--r--src/roof-read.h45
-rw-r--r--src/roof-thread.c95
-rw-r--r--src/roof-thread.h30
-rw-r--r--src/roof.c133
-rw-r--r--src/roof.h46
-rw-r--r--src/save/memcpy.c344
-rw-r--r--src/save/memcpy.h63
-rw-r--r--src/save/ufo-roof-buffer-build-task.c474
-rw-r--r--src/save/ufo-roof-read-thread.c155
-rw-r--r--src/save/ufo-roof-read-thread.h23
-rw-r--r--src/save/ufo-roof-read.c123
-rw-r--r--src/save/ufo-roof-read.h61
-rw-r--r--src/ufo-roof-build-task.c271
-rw-r--r--src/ufo-roof-read-file.h8
-rw-r--r--src/ufo-roof-read-socket.h8
-rw-r--r--src/ufo-roof-read-task.c68
-rw-r--r--src/ufo-roof-read.h17
-rw-r--r--src/ufo-roof.h16
-rw-r--r--tests/checks.sh12
-rw-r--r--tests/config.sh14
-rw-r--r--tests/control.py28
-rwxr-xr-x[l---------]tests/roof-vma.sh49
-rw-r--r--tests/roof.json19
-rwxr-xr-xtests/roof.sh22
-rw-r--r--tests/roof/config.py2
-rw-r--r--tests/roof/graph.py10
-rwxr-xr-xtests/vma-analyze.sh4
47 files changed, 3270 insertions, 314 deletions
diff --git a/docs/architecture.txt b/docs/architecture.txt
index 2b00ab3..e38da42 100644
--- a/docs/architecture.txt
+++ b/docs/architecture.txt
@@ -16,3 +16,101 @@ Configuration
- Metadata seems preserved while passing trough standard UFO filters. So, this is easiest way out.
- We can build a stand-alone subgraph for each plane, but this likely involves a full data copy.
- This probably OK for simple use case: "raw storage + visualization" as only two instances, but for could be too challenging for multi-plane analysis.
+
+
+Overloaded Mode
+===============
+ x How/when do we stop capturing in fixed frame mode. [ After building required number of frames ]
+ - After capturing theoretical number of packets required for reconstruction (or plus one frame to address half)
+ * Initail packets are likely broken as our experiments with LibVMA shows... Likely will be less...
+ * As work-around, we can _always_ skip first few frames to allow system become responsive.
+ * Overall, this seems a less reliable system.
+ => After initial building of the required number of packets.
+ * Unlike previous case, this may never finish if processing is bottleneck as buffers will be overwritten...
+ * Alternative is to stop capturing frames if buffers are exhausted...
+ x Do we pause receiving when buffer is exhausted or do we start over-writting (original ufo variant is overwritting). [ stop ]
+ - For streaming, the overwritting is better as it is better skipping older frames rather than newer.
+ => But for capturing fixed number of frames, we need to stop streaming (in overloaded case) or the frames will be continuously overwritten.
+ * This is more repeatable way to handle the frames and something has to be optimized anyway if we are to slow in streaming mode,
+ but in this mode we can reliably get first frames even if event-building is overloaded.
+ x Do we always skip first frames to keep system responsive or we process normally? [ Not now, TBD later if necessary ]
+ - Technically, skipping first frames could allow faster stabilization.
+ - Could only cause problems if streaming is started by some trigger and first frames are really important.
+ - This would be mandatory in fixed-frame-mode if stopping independent of reconstruction.
+
+Data Flow Model
+===============
+ x MP-RQ vs recvfrom_zcopy vs SocketXtreme
+ => MP-RQ is least overhead method (seems to remove one memcpy) with requirements fitting our use-case
+
+ x How to abstract read/LibVMA read access?
+ => Return pointer to buffer, number of packets, and padding between packets.
+
+ x Possible Models: Buffer independent streams or buffer sinograms
+ - Sinogram-buffer (push-model)
+ * Sinograms are buffered. The readers are directly writting to appropriate place in the sinogram buffers and increment fragment
+ counter for each buffer.
+ * Readers are paused on the first item out of current buffer and wait until buffer start is advanced.
+ * After receving fragment for a new sinogram, the reader informs controller about number of missing framgements in the buffer.
+ E.g. counting 'missing' fragments using atomic array (on top of completed).
+ * The controller advances buffer start after 'missing' is increased above the specified threshold.
+ - Stream-buffers (pull-model)
+ * Data is directly buffered in the independent receive-buffers. So, there is no memcpy in the receiving part of the code.
+ * Master builder thread determines sinogram to build (maximum amongst buffer).
+ * Builder threads skip to the required sinogram and start copying data until missing fragment is detected
+ * On missing fragment, new sinogram is determined and operation restarted (when to skip large amount?)
+
+ x Locking with global buffer (No problems if buffering independent streams)
+ - We can syncrhonize "Advances of Buffer Start" and "Copy out" with locks as this is low frequency events, but we need to ensure that
+ copy fragments are lock-less.
+ - In the 'counting' scenario this is problematc:
+ - Different threads write to diffent parts of the buffer. If the buffer start moves, it is OK to finish the old fragment. It will be
+ rewritten by the same thread with new data later. No problems here.
+ - But how to handle framgent counting? Atomics are fine for concurrent threads. But if we move buffer, the fragment count should not
+ be increased. This means we need execute 'if' and 'inc' atomicly (increase only if buffer has not moved and the move could happen between
+ 'if' and 'inc'). This is the main problem.
+ - We can push increases to pipe and use the main thread (which also advances the start of the buffer) to read from the pipe and
+ increase counts (or ignore if buffer moved). Is it faster than locking (or linux pipes perform locking or kernel/user-space
+ switches anyway?)
+ ? Can we find alternative ways without locks/pipes? E.g. using hashes? Really large counting arrays?
+
+ ? Performance considerations
+ - Sinogram-buffer advantage
+ - Sinogram-buffer works fine with re-ordered data streams. The stream-buffer approach can handle the re-odered streams in theory, but
+ with large performance penalty and incresed complexithy. But in fact, there is little reason why re-ordering could happen and experiments
+ with existing switch doesn't show any re-ordering.
+ - There is always uncached reads. However, in sinogram-buffer it is uncached copy of large image. And in case of stream-buffer, the
+ many small images are accessed uncached.
+ ? With Sinogram-buffer we can prepare data in advance and only single memcpy will be required. With stream-buffer all steps are performed
+ only once the buffer is available. But does it has any impact on performance?
+ ? With stream-buffer, building _seems_ more complex and requires more synchronization (but only if we could find a simple method to avoid
+ locking in the sinogram-buffer scenario).
+
+ => Stream-buffer advantage
+ - With MP-RQ we can use Mellanox ring-buffer (and probably Mellanox in general) as stream-buffer.
+ - Stream-buffer incures singifincantly less load during the reading phase. If building overloads system, with this approach we can
+ buffer as much data as memory permits and process it later. This is not possible with sinogram-buffer approach. But we still have
+ socket buffers...
+ - Stream-buffer removes one 'memcpy' unless zero-copy SocketExtreme is used. We also need to store raw data with fastwriter and the
+ new external sinogram buffer could be used to store data also for fastwriter. Hence, removing the necessity to memcpy there.
+ I.e. solved with SocketExtreme otherwise either performance penalty or additional complexity here.
+ ? Stream-buffer simplifies lock management. We don't need to provide additional pipes to reduce amount of required locking. Or is
+ there a simple solution?
+
+ ? Single-thread builder vs multi-thread builder?
+
+ ? LRO (Large Receive Offload). Can we use it? How it compareswith LibVMA?
+
+
+
+
+
+
+Dependencies
+============
+ x C11 threads vs pthreads vs glib threads
+ * C11 threads are only supported starting with glibc 2.28. Ubuntu 18.04 sheeps 2.27. There are work-arounds, but there is
+ little advantage over pthreads.
+ * We still will try to use atomics from C11.
+
+
diff --git a/docs/mellanox.txt b/docs/mellanox.txt
new file mode 100644
index 0000000..ed20048
--- /dev/null
+++ b/docs/mellanox.txt
@@ -0,0 +1,88 @@
+Terminology
+===========
+ - Send/Receive Queues
+ QP (Queue Pair): Combines RQ and SQ. Generally, irrelevant for the following
+ RQ (Receive Queue):
+ SQ (Send Queue):
+ CQ (Completion Queue): Completed operations reported here
+ EQ (Event Queue): Completions generate events (at specified rate) which in turn generate IRQs
+ WR/WQ (Work Request Queue): This is basically buffers (SG-lists) which should be either send or used for data reception
+ *QE (* Queue Event)
+
+ Flow: WQE --submit work--> WQ --execute--> SQ/RQ --on completion-> CQ --signal--> EQ -> IRQ
+ * Completion Event Moderation: Redeuce amount of reported events (EQ)
+
+ - Ofloads
+ RSS (Receive Side Scalling): Distribute load across CPU cores
+ LRO (Large Receive Offload): Group packets and deliver to user-space as a large single grouped packet [ ethtool -K shows if LRO on/off ]
+
+ - Various
+ AEV (Asynchronous Event): Errors,etc.
+ SRQ (Shared Receive Queue):
+ ICM (Interconnect Context Memory): Address Translation Tables, Control Objects, User Access Region (registers)
+ MPT (Memory Protection Table):
+ RMP (Receive Memory Pool):
+ TIR (Transport Interface Receive):
+ RQT (RQ Table):
+ MCG (Multicast Group):
+
+Driver
+======
+ - Network packets is/are streamed to ring buffers (with all Ethernet, IP, UDP/TCP headers).
+ The number of ring buffers dependents on VMA_RING_ALLOCATION parameter:
+ 0 - per network interface
+ 1 - per IP
+ => 10 - per socket
+ 20 - per thread (which was used to create the socket)
+ 30 - per core
+ 31 - per core (with some affinity of threads to cores)
+
+ - The memory for ring buffer is allocated based on VMA_MEM_ALLOC_TYPE:
+ 0 - malloc (this will be very slow if large buffers are requested)
+ 1 - contigous
+ => 2 - HugePages
+
+ - The number of buffers per ring is controlled with VMA_RX_BUFS (this is total in all rings)
+ * Each buffer VMA_MTU bytes
+ * Recommended: VMA_RX_BUFS ~ #rings * VMA_RX_WRE (number of WRE allocated on all interfaces)
+
+LibVMA
+======
+ There is 3 interfaces:
+ - MP-RQ (Multi-packet Receive Queue): vma_cyclic_buffer_read
+ This is useful for processing data streams when packet size stays contant and the packet flow doesn't change
+ drastically over time. Requires ConntextX-5 or newer.
+
+ * Use 'vma_add_ring_profile' to configure the size of ring buffer (specifies buffer size & the packet size)
+ * Set per-socket SO_VMA_RING_ALLOC_LOGIC using setsockopt
+ * Call 'vma_cyclic_buffer_read' to access raw ring buffer, specifies minimum and maximum packets to return
+
+ * The returned 'completion' structure referencing the position in the ring buffer. Packets in ring buffer
+ include all headers (ethernet - 14 bytes, ip - 20 bytes, udp - 8 bytes).
+ * New packets meanwhile are written in the remaining part of the ring buffer (until the linear end of the
+ buffer - consequently the returned data is not overwritten).
+ * The buffer rewinded only on call to 'vma_cyclic_buffer_read'. Less than the specified minimum amount of
+ packets can be returned if currently near the end of buffer and not enough space to fullfil the minimum
+ requirement.
+
+ * To ensure enough space for the follow up packets, synchronization between buffer size and min/max packet
+ is required. It should never happen that the space for only few packets is left when end of the buffer is
+ close.
+
+ - SocketXtreme: socketxtreme_poll
+ More complex interface allowing more control over process particularly processing packets with varing size.
+ Requires ConnectX-5 or newer.
+
+ * Get ring buffers associated with socket 'get_socket_rings_num' and 'get_socket_rings_fds'
+ * Get ready completions on the specified ring buffer with 'socketxtreme_poll' (pass 'fd' returned with 'get_socket_rings_fds')
+ * Two types of completions: 'VMA_SOCKETXTREME_NEW_CONNECTION_ACCEPTED' and 'VMA_SOCKETXTREME_PACKET'.
+ * For the second type, process an associated list of buffers and keep reference counting with 'socketxtreme_ref_vma_buf',
+ 'socketxtreme_free_vma_buf'.
+ * Clean/unreference received packets with socketxtreme_free_vma_packets
+
+ - Zero Copy: recvfrom_zcopy
+ The simplest interface working with ConnectX-3 cards. The packet is still written to ring-buffers. The data is not copied out
+ of ring buffers. This interface provides a way to get pointers to locations in ring buffer. There is a slight overhead compared
+ to MP-RQ approach to prepare list of packet pointers.
+
+
diff --git a/docs/todo.txt b/docs/todo.txt
index a9ab4c8..07258f3 100644
--- a/docs/todo.txt
+++ b/docs/todo.txt
@@ -11,6 +11,13 @@ Main
- Try UFO visualization filter
- "Reconstructed data storage" and "Visualization + raw data storage" modes. Implement stand-alone 'roof-converter' filter.
+Network
+=======
+ - Implement MP-RQ and corresponding abstractions
+ - LRO (Large Receive Offload). Can we use it? How it compareswith LibVMA?
+ - Check we can pre-allocate big enough buffers with LibVMA to receive required number of sinograms without loses
+
+
If necesary
===========
- Task 'roof-ingest-missing' to ingest zero-padded broken frames (and include get_writer())
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 720c18f..837a6fb 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -2,7 +2,6 @@ cmake_minimum_required(VERSION 2.6)
#{{{ Sources
set(ufofilter_SRCS
- ufo-roof-read-task.c
ufo-roof-build-task.c
ufo-roof-filter-task.c
ufo-roof-flat-field-correct-task.c
@@ -12,12 +11,11 @@ set(common_SRCS
ufo-roof-config.c
)
-set(roof_read_aux_SRCS
+set(roof_build_aux_SRCS
+ hw_sched.c
+ hw_thread.c
ufo-roof-read-socket.c
ufo-roof-read-file.c
- )
-
-set(roof_build_aux_SRCS
ufo-roof-buffer.c
)
@@ -30,7 +28,7 @@ set(ufofilter_LIBS
${UFO_LIBRARIES}
${OpenCL_LIBRARIES})
-set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -std=gnu18 -pedantic -Wall -Wextra -fPIC -Wno-unused-parameter -Wno-deprecated-declarations")
+set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -std=gnu18 -pedantic -Wall -Wextra -fPIC -Wno-unused-parameter -Wno-deprecated-declarations -g -gdwarf-2 -g3 -fno-omit-frame-pointer")
add_definitions(-D_FILE_OFFSET_BITS=64 -D_LARGE_FILES)
#}}}
diff --git a/src/hw_config.h b/src/hw_config.h
new file mode 100644
index 0000000..2351dea
--- /dev/null
+++ b/src/hw_config.h
@@ -0,0 +1,8 @@
+#ifndef _HW_CONFIG_H
+#define _HW_CONFIG_H
+
+ // enable threading
+#define HW_HAVE_SCHED_HEADERS
+#define HW_USE_THREADS
+
+#endif
diff --git a/src/hw_sched.c b/src/hw_sched.c
new file mode 100644
index 0000000..ec4d812
--- /dev/null
+++ b/src/hw_sched.c
@@ -0,0 +1,398 @@
+/*
+ * The PyHST program is Copyright (C) 2002-2011 of the
+ * European Synchrotron Radiation Facility (ESRF) and
+ * Karlsruhe Institute of Technology (KIT).
+ *
+ * PyHST is free software: you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * hst is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ * See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#define _GNU_SOURCE
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "hw_config.h"
+
+#ifdef HW_HAVE_SCHED_HEADERS
+# include <sys/types.h>
+# include <unistd.h>
+# include <sched.h>
+#endif /* HW_HAVE_SCHED_HEADERS */
+
+#include "hw_sched.h"
+
+
+#ifdef HW_USE_THREADS
+# define MUTEX_INIT(ctx, name) \
+ if (!err) { \
+ ctx->name##_mutex = g_mutex_new(); \
+ if (!ctx->name##_mutex) err = 1; \
+ }
+
+# define MUTEX_FREE(ctx, name) \
+ if (ctx->name##_mutex) g_mutex_free(ctx->name##_mutex);
+
+# define COND_INIT(ctx, name) \
+ MUTEX_INIT(ctx, name##_cond) \
+ if (!err) { \
+ ctx->name##_cond = g_cond_new(); \
+ if (!ctx->name##_cond) { \
+ err = 1; \
+ MUTEX_FREE(ctx, name##_cond) \
+ } \
+ }
+
+# define COND_FREE(ctx, name) \
+ if (ctx->name##_cond) g_cond_free(ctx->name##_cond); \
+ MUTEX_FREE(ctx, name##_cond)
+#else /* HW_USE_THREADS */
+# define MUTEX_INIT(ctx, name)
+# define MUTEX_FREE(ctx, name)
+# define COND_INIT(ctx, name)
+# define COND_FREE(ctx, name)
+#endif /* HW_USE_THREADS */
+
+
+HWRunFunction ppu_run[] = {
+ (HWRunFunction)NULL
+};
+
+static int hw_sched_initialized = 0;
+
+int hw_sched_init(void) {
+ if (!hw_sched_initialized) {
+#ifdef HW_USE_THREADS
+ g_thread_init(NULL);
+#endif /* HW_USE_THREADS */
+ hw_sched_initialized = 1;
+ }
+
+ return 0;
+}
+
+
+int hw_sched_get_cpu_count(void) {
+#ifdef HW_HAVE_SCHED_HEADERS
+ int err;
+
+ int cpu_count;
+ cpu_set_t mask;
+
+ err = sched_getaffinity(getpid(), sizeof(mask), &mask);
+ if (err) return 1;
+
+# ifdef CPU_COUNT
+ cpu_count = CPU_COUNT(&mask);
+# else
+ for (cpu_count = 0; cpu_count < CPU_SETSIZE; cpu_count++) {
+ if (!CPU_ISSET(cpu_count, &mask)) break;
+ }
+# endif
+
+ if (!cpu_count) cpu_count = 1;
+ return cpu_count;
+#else /* HW_HAVE_SCHED_HEADERS */
+ return 1;
+#endif /* HW_HAVE_SCHED_HEADERS */
+}
+
+
+HWSched hw_sched_create(int cpu_count) {
+ int i;
+ int err = 0;
+
+ HWSched ctx;
+
+ //hw_sched_init();
+
+ ctx = (HWSched)malloc(sizeof(HWSchedS));
+ if (!ctx) return NULL;
+
+ memset(ctx, 0, sizeof(HWSchedS));
+
+ ctx->status = 1;
+
+ MUTEX_INIT(ctx, sync);
+ MUTEX_INIT(ctx, data);
+ COND_INIT(ctx, compl);
+ COND_INIT(ctx, job);
+
+ if (err) {
+ hw_sched_destroy(ctx);
+ return NULL;
+ }
+
+ if (!cpu_count) cpu_count = hw_sched_get_cpu_count();
+ if (cpu_count > HW_MAX_THREADS) cpu_count = HW_MAX_THREADS;
+
+ ctx->n_threads = 0;
+ for (i = 0; i < cpu_count; i++) {
+ ctx->thread[ctx->n_threads] = hw_thread_create(ctx, ctx->n_threads, NULL, ppu_run, NULL);
+ if (ctx->thread[ctx->n_threads]) {
+#ifndef HW_USE_THREADS
+ ctx->thread[ctx->n_threads]->status = HW_THREAD_STATUS_STARTING;
+#endif /* HW_USE_THREADS */
+ ++ctx->n_threads;
+ }
+ }
+
+ if (!ctx->n_threads) {
+ hw_sched_destroy(ctx);
+ return NULL;
+ }
+
+ return ctx;
+}
+
+static int hw_sched_wait_threads(HWSched ctx) {
+#ifdef HW_USE_THREADS
+ int i = 0;
+
+ hw_sched_lock(ctx, compl_cond);
+ while (i < ctx->n_threads) {
+ for (; i < ctx->n_threads; i++) {
+ if (ctx->thread[i]->status == HW_THREAD_STATUS_INIT) {
+ hw_sched_wait(ctx, compl);
+ break;
+ }
+ }
+
+ }
+ hw_sched_unlock(ctx, compl_cond);
+#endif /* HW_USE_THREADS */
+
+ ctx->started = 1;
+
+ return 0;
+}
+
+void hw_sched_destroy(HWSched ctx) {
+ int i;
+
+ if (ctx->n_threads > 0) {
+ if (!ctx->started) {
+ hw_sched_wait_threads(ctx);
+ }
+
+ ctx->status = 0;
+ hw_sched_lock(ctx, job_cond);
+ hw_sched_broadcast(ctx, job);
+ hw_sched_unlock(ctx, job_cond);
+
+ for (i = 0; i < ctx->n_threads; i++) {
+ hw_thread_destroy(ctx->thread[i]);
+ }
+ }
+
+ COND_FREE(ctx, job);
+ COND_FREE(ctx, compl);
+ MUTEX_FREE(ctx, data);
+ MUTEX_FREE(ctx, sync);
+
+ free(ctx);
+}
+
+int hw_sched_set_sequential_mode(HWSched ctx, int *n_blocks, int *cur_block, HWSchedFlags flags) {
+ ctx->mode = HW_SCHED_MODE_SEQUENTIAL;
+ ctx->n_blocks = n_blocks;
+ ctx->cur_block = cur_block;
+ ctx->flags = flags;
+
+ return 0;
+}
+
+int hw_sched_get_chunk(HWSched ctx, int thread_id) {
+ int block;
+
+ switch (ctx->mode) {
+ case HW_SCHED_MODE_PREALLOCATED:
+ if (ctx->thread[thread_id]->status == HW_THREAD_STATUS_STARTING) {
+#ifndef HW_USE_THREADS
+ ctx->thread[thread_id]->status = HW_THREAD_STATUS_DONE;
+#endif /* HW_USE_THREADS */
+ return thread_id;
+ } else {
+ return HW_SCHED_CHUNK_INVALID;
+ }
+ case HW_SCHED_MODE_SEQUENTIAL:
+ if ((ctx->flags&HW_SCHED_FLAG_INIT_CALL)&&(ctx->thread[thread_id]->status == HW_THREAD_STATUS_STARTING)) {
+ return HW_SCHED_CHUNK_INIT;
+ }
+ hw_sched_lock(ctx, data);
+ block = *ctx->cur_block;
+ if (block < *ctx->n_blocks) {
+ *ctx->cur_block = *ctx->cur_block + 1;
+ } else {
+ block = HW_SCHED_CHUNK_INVALID;
+ }
+ hw_sched_unlock(ctx, data);
+ if (block == HW_SCHED_CHUNK_INVALID) {
+ if (((ctx->flags&HW_SCHED_FLAG_FREE_CALL)&&(ctx->thread[thread_id]->status == HW_THREAD_STATUS_RUNNING))) {
+ ctx->thread[thread_id]->status = HW_THREAD_STATUS_FINISHING;
+ return HW_SCHED_CHUNK_FREE;
+ }
+ if ((ctx->flags&HW_SCHED_FLAG_TERMINATOR_CALL)&&((ctx->thread[thread_id]->status == HW_THREAD_STATUS_RUNNING)||(ctx->thread[thread_id]->status == HW_THREAD_STATUS_FINISHING))) {
+ int i;
+ hw_sched_lock(ctx, data);
+ for (i = 0; i < ctx->n_threads; i++) {
+ if (thread_id == i) continue;
+ if ((ctx->thread[i]->status != HW_THREAD_STATUS_DONE)&&(ctx->thread[i]->status != HW_THREAD_STATUS_FINISHING2)&&(ctx->thread[i]->status != HW_THREAD_STATUS_IDLE)) {
+ break;
+ }
+ }
+ ctx->thread[thread_id]->status = HW_THREAD_STATUS_FINISHING2;
+ hw_sched_unlock(ctx, data);
+ if (i == ctx->n_threads) {
+ return HW_SCHED_CHUNK_TERMINATOR;
+ }
+ }
+ }
+ return block;
+ default:
+ return HW_SCHED_CHUNK_INVALID;
+ }
+
+ return -1;
+}
+
+
+int hw_sched_schedule_task(HWSched ctx, void *appctx, HWEntry entry) {
+#ifdef HW_USE_THREADS
+ if (!ctx->started) {
+ hw_sched_wait_threads(ctx);
+ }
+#else /* HW_USE_THREADS */
+ int err;
+ int i, chunk_id, n_threads;
+ HWRunFunction run;
+ HWThread thrctx;
+#endif /* HW_USE_THREADS */
+
+ ctx->ctx = appctx;
+ ctx->entry = entry;
+
+ switch (ctx->mode) {
+ case HW_SCHED_MODE_SEQUENTIAL:
+ *ctx->cur_block = 0;
+ break;
+ default:
+ ;
+ }
+
+#ifdef HW_USE_THREADS
+ hw_sched_lock(ctx, compl_cond);
+
+ hw_sched_lock(ctx, job_cond);
+ hw_sched_broadcast(ctx, job);
+ hw_sched_unlock(ctx, job_cond);
+#else /* HW_USE_THREADS */
+ n_threads = ctx->n_threads;
+
+ for (i = 0; i < n_threads; i++) {
+ thrctx = ctx->thread[i];
+ thrctx->err = 0;
+ }
+
+ i = 0;
+ thrctx = ctx->thread[i];
+ chunk_id = hw_sched_get_chunk(ctx, thrctx->thread_id);
+
+ while (chunk_id >= 0) {
+ run = hw_run_entry(thrctx->runs, entry);
+ err = run(thrctx, thrctx->hwctx, chunk_id, appctx);
+ if (err) {
+ thrctx->err = err;
+ break;
+ }
+
+ if ((++i) == n_threads) i = 0;
+ thrctx = ctx->thread[i];
+ chunk_id = hw_sched_get_chunk(ctx, thrctx->thread_id);
+ }
+#endif /* HW_USE_THREADS */
+
+ return 0;
+}
+
+int hw_sched_wait_task(HWSched ctx) {
+ int err = 0;
+ int i = 0, n_threads = ctx->n_threads;
+
+#ifdef HW_USE_THREADS
+ while (i < ctx->n_threads) {
+ for (; i < ctx->n_threads; i++) {
+ if (ctx->thread[i]->status == HW_THREAD_STATUS_DONE) {
+ ctx->thread[i]->status = HW_THREAD_STATUS_IDLE;
+ } else {
+ hw_sched_wait(ctx, compl);
+ break;
+ }
+ }
+
+ }
+
+ hw_sched_unlock(ctx, compl_cond);
+#endif /* HW_USE_THREADS */
+
+ for (i = 0; i < n_threads; i++) {
+ HWThread thrctx = ctx->thread[i];
+ if (thrctx->err) return err = thrctx->err;
+
+#ifndef HW_USE_THREADS
+ thrctx->status = HW_THREAD_STATUS_IDLE;
+#endif /* HW_USE_THREADS */
+ }
+
+ return err;
+}
+
+int hw_sched_execute_task(HWSched ctx, void *appctx, HWEntry entry) {
+ int err;
+
+ err = hw_sched_schedule_task(ctx, appctx, entry);
+ if (err) return err;
+
+ return hw_sched_wait_task(ctx);
+}
+
+int hw_sched_schedule_thread_task(HWSched ctx, void *appctx, HWEntry entry) {
+ int err;
+
+ ctx->saved_mode = ctx->mode;
+ ctx->mode = HW_SCHED_MODE_PREALLOCATED;
+ err = hw_sched_schedule_task(ctx, appctx, entry);
+
+ return err;
+}
+
+
+int hw_sched_wait_thread_task(HWSched ctx) {
+ int err;
+
+ err = hw_sched_wait_task(ctx);
+ ctx->mode = ctx->saved_mode;
+
+ return err;
+}
+
+int hw_sched_execute_thread_task(HWSched ctx, void *appctx, HWEntry entry) {
+ int err;
+ int saved_mode = ctx->mode;
+
+ ctx->mode = HW_SCHED_MODE_PREALLOCATED;
+ err = hw_sched_execute_task(ctx, appctx, entry);
+ ctx->mode = saved_mode;
+
+ return err;
+}
diff --git a/src/hw_sched.h b/src/hw_sched.h
new file mode 100644
index 0000000..af9e363
--- /dev/null
+++ b/src/hw_sched.h
@@ -0,0 +1,146 @@
+/*
+ * The PyHST program is Copyright (C) 2002-2011 of the
+ * European Synchrotron Radiation Facility (ESRF) and
+ * Karlsruhe Institute of Technology (KIT).
+ *
+ * PyHST is free software: you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * hst is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ * See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef _HW_SCHED_H
+#define _HW_SCHED_H
+
+#include <glib.h>
+
+typedef struct HWSchedT *HWSched;
+#ifdef HW_USE_THREADS
+typedef GMutex *HWMutex;
+#else /* HW_USE_THREADS */
+typedef void *HWMutex;
+#endif /* HW_USE_THREADS */
+
+
+#include "hw_thread.h"
+
+enum HWSchedModeT {
+ HW_SCHED_MODE_PREALLOCATED = 0,
+ HW_SCHED_MODE_SEQUENTIAL
+};
+typedef enum HWSchedModeT HWSchedMode;
+
+enum HWSchedChunkT {
+ HW_SCHED_CHUNK_INVALID = -1,
+ HW_SCHED_CHUNK_INIT = -2,
+ HW_SCHED_CHUNK_FREE = -3,
+ HW_SCHED_CHUNK_TERMINATOR = -4
+};
+typedef enum HWSchedChunkT HWSchedChunk;
+
+enum HWSchedFlagsT {
+ HW_SCHED_FLAG_INIT_CALL = 1, //! Executed in each thread before real chunks
+ HW_SCHED_FLAG_FREE_CALL = 2, //! Executed in each thread after real chunks
+ HW_SCHED_FLAG_TERMINATOR_CALL = 4 //! Executed in one of the threads after all threads are done
+};
+typedef enum HWSchedFlagsT HWSchedFlags;
+
+
+#define HW_SINGLE_MODE
+//#define HW_DETECT_CPU_CORES
+#define HW_MAX_THREADS 128
+
+#ifdef HW_SINGLE_MODE
+ typedef HWRunFunction HWEntry;
+# define hw_run_entry(runs, entry) entry
+#else /* HW_SINGLE_MODE */
+ typedef int HWEntry;
+# define hw_run_entry(runs, entry) runs[entry]
+#endif /* HW_SINGLE_MODE */
+
+#ifndef HW_HIDE_DETAILS
+struct HWSchedT {
+ int status;
+ int started;
+
+ int n_threads;
+ HWThread thread[HW_MAX_THREADS];
+
+#ifdef HW_USE_THREADS
+ GCond *job_cond, *compl_cond;
+ GMutex *job_cond_mutex, *compl_cond_mutex, *data_mutex;
+ GMutex *sync_mutex;
+#endif /* HW_USE_THREADS */
+
+ HWSchedMode mode;
+ HWSchedMode saved_mode;
+ HWSchedFlags flags;
+ int *n_blocks;
+ int *cur_block;
+
+ HWEntry entry;
+ void *ctx;
+};
+typedef struct HWSchedT HWSchedS;
+#endif /* HW_HIDE_DETAILS */
+
+# ifdef __cplusplus
+extern "C" {
+# endif
+
+HWSched hw_sched_create(int ppu_count);
+int hw_sched_init(void);
+void hw_sched_destroy(HWSched ctx);
+int hw_sched_get_cpu_count(void);
+
+int hw_sched_set_sequential_mode(HWSched ctx, int *n_blocks, int *cur_block, HWSchedFlags flags);
+int hw_sched_get_chunk(HWSched ctx, int thread_id);
+int hw_sched_schedule_task(HWSched ctx, void *appctx, HWEntry entry);
+int hw_sched_wait_task(HWSched ctx);
+int hw_sched_execute_task(HWSched ctx, void *appctx, HWEntry entry);
+
+int hw_sched_schedule_thread_task(HWSched ctx, void *appctx, HWEntry entry);
+int hw_sched_wait_thread_task(HWSched ctx);
+int hw_sched_execute_thread_task(HWSched ctx, void *appctx, HWEntry entry);
+
+HWMutex hw_sched_create_mutex(void);
+void hw_sched_destroy_mutex(HWMutex ctx);
+
+#ifdef HW_USE_THREADS
+# define hw_sched_lock(ctx, type) g_mutex_lock(ctx->type##_mutex)
+# define hw_sched_unlock(ctx, type) g_mutex_unlock(ctx->type##_mutex)
+# define hw_sched_broadcast(ctx, type) g_cond_broadcast(ctx->type##_cond)
+# define hw_sched_signal(ctx, type) g_cond_signal(ctx->type##_cond)
+# define hw_sched_wait(ctx, type) g_cond_wait(ctx->type##_cond, ctx->type##_cond_mutex)
+
+#define hw_sched_create_mutex(void) g_mutex_new()
+#define hw_sched_destroy_mutex(ctx) g_mutex_free(ctx)
+#define hw_sched_lock_mutex(ctx) g_mutex_lock(ctx)
+#define hw_sched_unlock_mutex(ctx) g_mutex_unlock(ctx)
+#else /* HW_USE_THREADS */
+# define hw_sched_lock(ctx, type)
+# define hw_sched_unlock(ctx, type)
+# define hw_sched_broadcast(ctx, type)
+# define hw_sched_signal(ctx, type)
+# define hw_sched_wait(ctx, type)
+
+#define hw_sched_create_mutex(void) NULL
+#define hw_sched_destroy_mutex(ctx)
+#define hw_sched_lock_mutex(ctx)
+#define hw_sched_unlock_mutex(ctx)
+#endif /* HW_USE_THREADS */
+
+# ifdef __cplusplus
+}
+# endif
+
+#endif /* _HW_SCHED_H */
+
diff --git a/src/hw_thread.c b/src/hw_thread.c
new file mode 100644
index 0000000..0374630
--- /dev/null
+++ b/src/hw_thread.c
@@ -0,0 +1,143 @@
+/*
+ * The PyHST program is Copyright (C) 2002-2011 of the
+ * European Synchrotron Radiation Facility (ESRF) and
+ * Karlsruhe Institute of Technology (KIT).
+ *
+ * PyHST is free software: you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * hst is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ * See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "hw_config.h"
+
+#include "hw_sched.h"
+#include "hw_thread.h"
+
+#ifdef HW_USE_THREADS
+static void *hw_thread_function(HWThread ctx) {
+ int err;
+ int chunk_id;
+
+ HWRunFunction *runs;
+ HWRunFunction run;
+ HWSched sched;
+ void *hwctx;
+
+ sched = ctx->sched;
+ runs = ctx->run;
+ hwctx = ctx->hwctx;
+
+ hw_sched_lock(sched, job_cond);
+
+ hw_sched_lock(sched, compl_cond);
+ ctx->status = HW_THREAD_STATUS_IDLE;
+ hw_sched_broadcast(sched, compl);
+ hw_sched_unlock(sched, compl_cond);
+
+ while (sched->status) {
+ hw_sched_wait(sched, job);
+ if (!sched->status) break;
+
+ ctx->err = 0;
+ ctx->status = HW_THREAD_STATUS_STARTING;
+ hw_sched_unlock(sched, job_cond);
+
+ run = hw_run_entry(runs, sched->entry);
+#if 0
+ // Offset to interleave transfers if the GPUBox is used
+ // Just check with CUDA_LAUNCH_BLOCKED the togpu time and put it here
+ // It should be still significantly less than BP time
+ // We can do callibration during initilization in future
+
+ usleep(12000 * ctx->thread_id);
+#endif
+ chunk_id = hw_sched_get_chunk(sched, ctx->thread_id);
+
+ /* Should be after get_chunk, since we can check if it's first time */
+ ctx->status = HW_THREAD_STATUS_RUNNING;
+ while (chunk_id != HW_SCHED_CHUNK_INVALID) {
+ //printf("Thread %i processing slice %i\n", ctx->thread_id, chunk_id);
+ err = run(ctx, hwctx, chunk_id, sched->ctx);
+ if (err) {
+ ctx->err = err;
+ break;
+ }
+ chunk_id = hw_sched_get_chunk(sched, ctx->thread_id);
+ }
+
+ hw_sched_lock(sched, job_cond);
+
+ hw_sched_lock(sched, compl_cond);
+ ctx->status = HW_THREAD_STATUS_DONE;
+ hw_sched_broadcast(sched, compl);
+ hw_sched_unlock(sched, compl_cond);
+ }
+
+ hw_sched_unlock(sched, job_cond);
+
+ g_thread_exit(NULL);
+ return NULL; /* TODO: check this */
+}
+#endif /* HW_USE_THREADS */
+
+
+HWThread hw_thread_create(HWSched sched, int thread_id, void *hwctx, HWRunFunction *run_func, HWFreeFunction free_func) {
+ GError *err;
+
+ HWThread ctx;
+
+ ctx = (HWThread)malloc(sizeof(HWThreadS));
+ if (!ctx) return ctx;
+
+ memset(ctx, 0, sizeof(HWThreadS));
+
+ ctx->sched = sched;
+ ctx->hwctx = hwctx;
+ ctx->run = run_func;
+ ctx->free = free_func;
+ ctx->thread_id = thread_id;
+ ctx->status = HW_THREAD_STATUS_INIT;
+
+#ifdef HW_USE_THREADS
+ ctx->thread = g_thread_create((GThreadFunc)hw_thread_function, ctx, 1, &err);
+ if (!ctx->thread) {
+ g_error_free(err);
+
+ hw_thread_destroy(ctx);
+ return NULL;
+ }
+#endif /* HW_USE_THREADS */
+
+ return ctx;
+}
+
+void hw_thread_destroy(HWThread ctx) {
+#ifdef HW_USE_THREADS
+ if (ctx->thread) {
+ g_thread_join(ctx->thread);
+ }
+#endif /* HW_USE_THREADS */
+
+ if (ctx->data) {
+ free(ctx->data);
+ }
+
+ if (ctx->free) {
+ ctx->free(ctx->hwctx);
+ }
+
+ free(ctx);
+}
diff --git a/src/hw_thread.h b/src/hw_thread.h
new file mode 100644
index 0000000..de7f60f
--- /dev/null
+++ b/src/hw_thread.h
@@ -0,0 +1,76 @@
+/*
+ * The PyHST program is Copyright (C) 2002-2011 of the
+ * European Synchrotron Radiation Facility (ESRF) and
+ * Karlsruhe Institute of Technology (KIT).
+ *
+ * PyHST is free software: you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * hst is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ * See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef _HW_THREAD_H
+#define _HW_THREAD_H
+
+#include <glib.h>
+
+typedef struct HWThreadT *HWThread;
+typedef int (*HWRunFunction)(HWThread thread, void *ctx, int block, void *attr);
+typedef int (*HWFreeFunction)(void *ctx);
+
+#include "hw_sched.h"
+
+enum HWThreadStatusT {
+ HW_THREAD_STATUS_IDLE = 0,
+ HW_THREAD_STATUS_STARTING = 1,
+ HW_THREAD_STATUS_RUNNING = 2,
+ HW_THREAD_STATUS_FINISHING = 3,
+ HW_THREAD_STATUS_FINISHING2 = 4,
+ HW_THREAD_STATUS_DONE = 5,
+ HW_THREAD_STATUS_INIT = 6
+};
+typedef enum HWThreadStatusT HWThreadStatus;
+
+
+#ifndef HW_HIDE_DETAILS
+struct HWThreadT {
+ int thread_id;
+ HWSched sched;
+
+#ifdef HW_USE_THREADS
+ GThread *thread;
+#endif /* HW_USE_THREADS */
+
+ void *hwctx;
+ HWRunFunction *run;
+ HWFreeFunction free;
+
+ int err;
+ HWThreadStatus status;
+
+ void *data; /**< Per-thread data storage, will be free'd if set */
+};
+typedef struct HWThreadT HWThreadS;
+#endif /* HW_HIDE_DETAILS */
+
+# ifdef __cplusplus
+extern "C" {
+# endif
+
+HWThread hw_thread_create(HWSched sched, int thread_id, void *hwctx, HWRunFunction *run_func, HWFreeFunction free_func);
+void hw_thread_destroy(HWThread ctx);
+
+# ifdef __cplusplus
+}
+# endif
+
+
+#endif /* _HW_THREAD_H */
diff --git a/src/meson.build b/src/meson.build
index 324c744..fd41361 100644
--- a/src/meson.build
+++ b/src/meson.build
@@ -1,5 +1,4 @@
plugins = [
- 'roof-read',
'roof-build',
'roof-filter',
'roof-flat-field-correct'
@@ -10,11 +9,11 @@ roof_common_src = [
]
roof_plugin_src = {
- 'roof-read': [
+ 'roof-build': [
+ 'hw_sched.c',
+ 'hw_thread.c',
'ufo-roof-read-socket.c',
'ufo-roof-read-file.c',
- ],
- 'roof-build': [
'ufo-roof-buffer.c',
],
'roof-filter': [
diff --git a/src/ufo-roof-buffer.c b/src/roof-buffer.c
index 0e0a890..4ca0386 100644
--- a/src/ufo-roof-buffer.c
+++ b/src/roof-buffer.c
@@ -9,14 +9,15 @@
// This is currently not thread safe. With dual-filter architecture this will be called sequentially.
-UfoRoofBuffer *ufo_roof_buffer_new(UfoRoofConfig *cfg, guint n_dims, guint max_datasets, GError **error) {
+RoofBuffer *roof_buffer_new(RoofConfig *cfg, guint n_dims, guint max_datasets, GError **error) {
if ((n_dims < 1)||(n_dims > 2))
roof_new_error(error, "Unsupported number of dimmensions %u (only plain and 2D ROOF structure is currently supported)", n_dims);
- UfoRoofBuffer *buffer = (UfoRoofBuffer*)calloc(1, sizeof(UfoRoofBuffer));
- if (!buffer) roof_new_error(error, "Can't allocate UfoRoofBuffer");
+ RoofBuffer *buffer = (RoofBuffer*)calloc(1, sizeof(RoofBuffer));
+ if (!buffer) roof_new_error(error, "Can't allocate RoofBuffer");
buffer->max_datasets = max_datasets;
+ buffer->n_streams = cfg->n_streams;
buffer->ring_size = cfg->buffer_size;
buffer->drop_buffers = cfg->drop_buffers;
buffer->latency_buffers = cfg->latency_buffers;
@@ -36,26 +37,27 @@ UfoRoofBuffer *ufo_roof_buffer_new(UfoRoofConfig *cfg, guint n_dims, guint max_d
buffer->n_fragments = (_Atomic guint*)calloc(buffer->ring_size, sizeof(_Atomic int));
buffer->stream_fragment = (guint*)calloc(cfg->n_streams, sizeof(guint));
-#ifdef UFO_ROOF_INDEPENDENT_STREAMS
- buffer->first_id = malloc(buffer->ring_size * sizeof(guint64));
+#ifdef ROOF_INDEPENDENT_STREAMS
+ buffer->first_id = malloc(buffer->n_streams * sizeof(guint64));
if (!buffer->first_id) roof_new_error(error, "Can't allocate first_id buffer for ROOF datasets");
- for (guint i = 0; i < buffer->ring_size; i++)
+ for (guint i = 0; i < buffer->n_streams; i++)
buffer->first_id[i] = (guint64)-1;
#else
buffer->first_id = (guint64)-1;
#endif
+ buffer->last_id = malloc(buffer->n_streams * sizeof(guint64));
if ((!buffer->ring_buffer)||(!buffer->n_fragments)||(!buffer->stream_fragment)) {
- ufo_roof_buffer_free(buffer);
+ roof_buffer_free(buffer);
roof_new_error(error, "Can't allocate ring buffer for ROOF datasets, total size %u", buffer->ring_size * buffer->dataset_size);
}
return buffer;
}
-void ufo_roof_buffer_free(UfoRoofBuffer *buffer) {
+void roof_buffer_free(RoofBuffer *buffer) {
if (buffer) {
-#ifdef UFO_ROOF_INDEPENDENT_STREAMS
+#ifdef ROOF_INDEPENDENT_STREAMS
if (buffer->first_id)
free(buffer->first_id);
#endif
@@ -71,7 +73,7 @@ void ufo_roof_buffer_free(UfoRoofBuffer *buffer) {
}
// fragment_id is numbered from 1 (0 - means auto)
-gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, guint64 fragment_id, gconstpointer fragment, GError **error) {
+gboolean roof_buffer_set_fragment(RoofBuffer *buffer, guint stream_id, guint64 fragment_id, gconstpointer fragment, GError **error) {
gboolean ready = FALSE;
guint buffer_id;
guint64 first_id;
@@ -85,7 +87,7 @@ gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, gu
dataset_id = (fragment_id - 1) / buffer->fragments_per_stream;
fragment_id = (fragment_id - 1) % buffer->fragments_per_stream;
-#ifdef UFO_ROOF_INDEPENDENT_STREAMS
+#ifdef ROOF_INDEPENDENT_STREAMS
if (buffer->first_id[stream_id] == (guint64)-1)
buffer->first_id[stream_id] = dataset_id;
first_id = buffer->first_id[stream_id];
@@ -102,7 +104,7 @@ gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, gu
// FIXME: Currently, this produces too much output. Introduce some kind of debugging mode?
if (dataset_id < buffer->current_id) {
- roof_network_error_with_retval(error, FALSE, "Late arrived packet for dataset %li, currently processing %li", dataset_id, buffer->current_id);
+// roof_network_error_with_retval(error, FALSE, "Late arrived packet for dataset %li, currently processing %li", dataset_id, buffer->current_id);
return FALSE;
}
@@ -110,24 +112,39 @@ gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, gu
// printf("Stream %i: dataset %li < %li, first_id: %li\n", stream_id, dataset_id, buffer->max_datasets, first_id);
return FALSE;
}
+
+ buffer->last_id[stream_id] = dataset_id;
+
// We are not fast enough, new packets are arrvining to fast
if (dataset_id >= (buffer->current_id + buffer->ring_size)) {
// FIXME: Broken packets sanity checks? Allocate additional buffers on demand?
- root_set_network_error(error, "Ring buffer exhausted. Get packet for dataset %li. Dropping datasets from %li to %li, dataset %li has %i parts of %i completed",
- dataset_id, buffer->current_id, dataset_id - (buffer->ring_size - buffer->drop_buffers), buffer->current_id, buffer->n_fragments[buffer_id], buffer->fragments_per_dataset);
+ guint64 min_id = (guint64)-1;
+ guint64 max_id = 0;
+ for (guint i = 0; i < buffer->n_streams; i++) {
+ if (min_id > buffer->last_id[stream_id]) min_id = buffer->last_id[stream_id];
+ if (max_id < buffer->last_id[stream_id]) max_id = buffer->last_id[stream_id];
+ }
+ printf("Desync: %lu (Min: %lu, Max: %lu), stream: %u (of %u), dataset: %lu\n", max_id - min_id, min_id, max_id, stream_id, buffer->n_streams, dataset_id);
+ printf("Parts: %i %i %i %i\n", buffer->n_fragments[buffer_id], buffer->n_fragments[(buffer_id + 1)%buffer->ring_size], buffer->n_fragments[(buffer_id + 2)%buffer->ring_size], buffer->n_fragments[(buffer_id + 3)%buffer->ring_size]);
+
+ root_set_network_error(error, "Ring buffer exhausted. Get packet for dataset %li. Dropping datasets from %li to %li, dataset %li has %i parts of %i completed, already reported %lu",
+ dataset_id, buffer->current_id, dataset_id - (buffer->ring_size - buffer->drop_buffers), buffer->current_id, buffer->n_fragments[buffer_id], buffer->fragments_per_dataset, buffer->n_ready);
// FIXME: Send semi-complete buffers further?
// FIXME: Or shall we drop more if larger buffers are allocated?
if ((dataset_id - buffer->current_id) > 2 * buffer->ring_size) {
memset(buffer->n_fragments, 0, buffer->ring_size * sizeof(_Atomic guint));
+ // FIXME: Can finally received old buffer hit the counter here? Locking?
buffer->current_id = dataset_id;
} else {
for (guint i = buffer->current_id; i <= (dataset_id - (buffer->ring_size - buffer->drop_buffers)); i++)
buffer->n_fragments[i%buffer->ring_size] = 0;
+ // FIXME: Can finally received old buffers hit the counters here? Locking?
buffer->current_id = dataset_id - (buffer->ring_size - buffer->drop_buffers) + 1;
}
+ // FIXME: Can other threads obsolete the buffer meanwhile? This is not single threaded any more
if (buffer->n_fragments[buffer->current_id%buffer->ring_size] == buffer->fragments_per_dataset)
ready = TRUE;
@@ -145,6 +162,7 @@ gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, gu
);
*/
+ // FIXME: What if buffer obsolete meanwhile by other threads. We are not single threaded
uint8_t *dataset_buffer = buffer->ring_buffer + buffer_id * buffer->dataset_size;
if (buffer->n_dims == 2) {
uint8_t *fragment_buffer = dataset_buffer +
@@ -161,20 +179,25 @@ gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, gu
}
// FIXME: Sanity checks: verify is not a dublicate fragment?
- atomic_fetch_add(&buffer->n_fragments[buffer_id], 1);
+ // This stuff is anyway is single-threaded which is a problem.
+ //atomic_fetch_add(&buffer->n_fragments[buffer_id], 1);
+ buffer->n_fragments[buffer_id]++;
if (buffer->n_fragments[buffer_id] == buffer->fragments_per_dataset) {
// FIXME: what about a complete dataset blocked by earlier one with only a few framents missing?
if (dataset_id == buffer->current_id)
ready = TRUE;
- else if ((buffer->latency_buffers)&&(dataset_id >= (buffer->current_id + buffer->latency_buffers)))
- ready = ufo_roof_buffer_skip_to_ready(buffer);
+ else if ((buffer->latency_buffers)&&(dataset_id >= (buffer->current_id + buffer->latency_buffers))) {
+ ready = roof_buffer_skip_to_ready(buffer);
+ }
+
+ if (ready) buffer->n_ready++;
}
-
+
return ready;
}
-gboolean ufo_roof_buffer_skip_to_ready(UfoRoofBuffer *buffer) {
+gboolean roof_buffer_skip_to_ready(RoofBuffer *buffer) {
for (guint i = 0; i < buffer->ring_size; i++) {
guint64 id = buffer->current_id + i;
guint buffer_id = id % buffer->ring_size;
@@ -184,7 +207,7 @@ gboolean ufo_roof_buffer_skip_to_ready(UfoRoofBuffer *buffer) {
return TRUE;
}
-// printf("Skipping event %lu (%u), only %u of %u fragments are ready\n", id, buffer_id, buffer->n_fragments[buffer_id], buffer->fragments_per_dataset);
+ printf("Skipping dataset %lu (%u), only %u of %u fragments are ready\n", id, buffer_id, buffer->n_fragments[buffer_id], buffer->fragments_per_dataset);
buffer->n_fragments[buffer_id] = 0;
}
@@ -192,7 +215,7 @@ gboolean ufo_roof_buffer_skip_to_ready(UfoRoofBuffer *buffer) {
}
-gboolean ufo_roof_buffer_get_dataset(UfoRoofBuffer *buffer, gpointer output_buffer, gulong *seqid, GError **error) {
+gboolean roof_buffer_get_dataset(RoofBuffer *buffer, gpointer output_buffer, gulong *seqid, GError **error) {
guint buffer_id = buffer->current_id % buffer->ring_size;
void *dataset_buffer = buffer->ring_buffer + buffer_id * buffer->dataset_size;
@@ -207,3 +230,23 @@ gboolean ufo_roof_buffer_get_dataset(UfoRoofBuffer *buffer, gpointer output_buff
return TRUE;
}
+
+gboolean roof_buffer_wait_dataset(RoofBuffer *buffer, gpointer output_buffer, gulong *seqid, gulong timeout, GError **error) {
+ struct timespec start_time, current_time;
+ clock_gettime(CLOCK_REALTIME, &start_time);
+
+ guint buffer_id = buffer->current_id % buffer->ring_size;
+
+ // FIXME: what about a complete dataset blocked by earlier one with only a few framents missing?
+ while (buffer->n_fragments[buffer_id] < buffer->fragments_per_dataset) {
+ // FIXME: get some sleep? How we can interrupt it?
+ // just small nanosleep?
+ if (timeout) {
+ clock_gettime(CLOCK_REALTIME, &current_time);
+ gulong wait = (current_time.tv_sec - start_time.tv_sec) * 1000000 + (current_time.tv_nsec - start_time.tv_nsec) / 1000;
+ if (wait > timeout) return FALSE;
+ }
+ }
+
+ return roof_buffer_get_dataset(buffer, output_buffer, seqid, error);
+}
diff --git a/src/ufo-roof-buffer.h b/src/roof-buffer.h
index 8e9c00b..a95dbaa 100644
--- a/src/ufo-roof-buffer.h
+++ b/src/roof-buffer.h
@@ -1,20 +1,24 @@
-#ifndef __UFO_ROOF_BUFFER_H
-#define __UFO_ROOF_BUUFER_H
+#ifndef __ROOF_BUFFER_H
+#define __ROOF_BUFFER_H
// This IS harmful! Just for testing
-//#define UFO_ROOF_INDEPENDENT_STREAMS
+#define ROOF_INDEPENDENT_STREAMS
#include <stdatomic.h>
+#include <stdint.h>
+#include "ufo-roof-config.h"
-
-struct _UfoRoofBuffer {
+struct _RoofBuffer {
guint64 current_id; // The ID of the first (active) dataset in the buffer
-#ifdef UFO_ROOF_INDEPENDENT_STREAMS
+#ifdef ROOF_INDEPENDENT_STREAMS
guint64 *first_id; // The ID of the first received dataset (used for numbering), -1 means not yet known
#else
guint64 first_id; // The ID of the first received dataset (used for numbering), -1 means not yet known
#endif
+ guint64 *last_id;
+ guint64 n_ready;
+ guint n_streams;
guint ring_size; // Number of datasets to buffer
guint drop_buffers; // If we need to catch up
guint latency_buffers; // we skip incomplete buffers if current_id + latency_buffers is ready
@@ -35,13 +39,14 @@ struct _UfoRoofBuffer {
guint fragments_per_stream; // Number of packets in each of data streams (used to compute when dataset is ready)
};
-typedef struct _UfoRoofBuffer UfoRoofBuffer;
+typedef struct _RoofBuffer RoofBuffer;
-UfoRoofBuffer *ufo_roof_buffer_new(UfoRoofConfig *cfg, guint n_dims, guint max_datasets, GError **error);
-void ufo_roof_buffer_free(UfoRoofBuffer *buf);
+RoofBuffer *roof_buffer_new(RoofConfig *cfg, guint n_dims, guint max_datasets, GError **error);
+void roof_buffer_free(RoofBuffer *buf);
-gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, guint64 fragment_id, gconstpointer fragment, GError **error);
-gboolean ufo_roof_buffer_skip_to_ready(UfoRoofBuffer *buffer);
-gboolean ufo_roof_buffer_get_dataset(UfoRoofBuffer *buffer, gpointer output_buffer, gulong *seqid, GError **error);
+gboolean roof_buffer_set_fragment(RoofBuffer *buffer, guint stream_id, guint64 fragment_id, gconstpointer fragment, GError **error);
+gboolean roof_buffer_skip_to_ready(RoofBuffer *buffer);
+gboolean roof_buffer_get_dataset(RoofBuffer *buffer, gpointer output_buffer, gulong *seqid, GError **error);
+gboolean roof_buffer_wait_dataset(RoofBuffer *buffer, gpointer output_buffer, gulong *seqid, gulong timeout, GError **error);
#endif
diff --git a/src/ufo-roof-config.c b/src/roof-config.c
index 944ee31..189c08f 100644
--- a/src/ufo-roof-config.c
+++ b/src/roof-config.c
@@ -34,14 +34,14 @@
typedef struct {
- UfoRoofConfig cfg;
+ RoofConfig cfg;
JsonParser *parser;
-} UfoRoofConfigPrivate;
+} RoofConfigPrivate;
-void ufo_roof_config_free(UfoRoofConfig *cfg) {
+void roof_config_free(RoofConfig *cfg) {
if (cfg) {
- UfoRoofConfigPrivate *priv = (UfoRoofConfigPrivate*)cfg;
+ RoofConfigPrivate *priv = (RoofConfigPrivate*)cfg;
if (priv->parser)
g_object_unref (priv->parser);
@@ -50,9 +50,9 @@ void ufo_roof_config_free(UfoRoofConfig *cfg) {
}
}
-UfoRoofConfig *ufo_roof_config_new(const char *config, UfoRoofConfigFlags flags, GError **error) {
- UfoRoofConfigPrivate *priv;
- UfoRoofConfig *cfg;
+RoofConfig *roof_config_new(const char *config, RoofConfigFlags flags, GError **error) {
+ RoofConfigPrivate *priv;
+ RoofConfig *cfg;
// JsonNode *node;
JsonObject *root = NULL;
@@ -67,10 +67,10 @@ UfoRoofConfig *ufo_roof_config_new(const char *config, UfoRoofConfigFlags flags,
GError *gerr = NULL;
- priv = (UfoRoofConfigPrivate*)malloc(sizeof(UfoRoofConfigPrivate));
- if (!priv) roof_new_error(error, "Can't allocate UfoRoofConfig");
+ priv = (RoofConfigPrivate*)malloc(sizeof(RoofConfigPrivate));
+ if (!priv) roof_new_error(error, "Can't allocate RoofConfig");
- memset(priv, 0, sizeof(UfoRoofConfigPrivate));
+ memset(priv, 0, sizeof(RoofConfigPrivate));
// Set defaults
cfg = &priv->cfg;
@@ -87,8 +87,8 @@ UfoRoofConfig *ufo_roof_config_new(const char *config, UfoRoofConfigFlags flags,
cfg->port = 52067;
cfg->n_streams = 1;
cfg->protocol = "udp";
- cfg->network_timeout = 10000000;
- cfg->header_size = sizeof(UfoRoofPacketHeader);
+ cfg->network_timeout = 100000000; // FIXME: remove 0
+ cfg->header_size = sizeof(RoofPacketHeader);
cfg->payload_size = 0;
cfg->max_packet_size = 0;
cfg->max_packets = 100;
@@ -96,6 +96,7 @@ UfoRoofConfig *ufo_roof_config_new(const char *config, UfoRoofConfigFlags flags,
cfg->buffer_size = 2;
cfg->latency_buffers = 0;
cfg->drop_buffers = 0;
+ cfg->sockets_per_thread = 4;
// Read configuration
@@ -104,7 +105,7 @@ UfoRoofConfig *ufo_roof_config_new(const char *config, UfoRoofConfigFlags flags,
if (gerr != NULL) {
g_propagate_prefixed_error(error, gerr, "Error parsing JSON file (%s) with ROOF configuration: ", config);
- ufo_roof_config_free(cfg);
+ roof_config_free(cfg);
return NULL;
}
@@ -119,7 +120,7 @@ UfoRoofConfig *ufo_roof_config_new(const char *config, UfoRoofConfigFlags flags,
roof_config_node_get(performance, root, object, "performance");
roof_config_node_get(data, root, object, "data");
- if (flags&UFO_ROOF_CONFIG_SIMULATION)
+ if (flags&ROOF_CONFIG_SIMULATION)
roof_config_node_get(simulation, root, object, "simulation");
}
@@ -135,12 +136,12 @@ UfoRoofConfig *ufo_roof_config_new(const char *config, UfoRoofConfigFlags flags,
if ((cfg->sample_rate)||(cfg->imaging_rate)) {
if ((!cfg->sample_rate)||(!cfg->imaging_rate)||(cfg->sample_rate%cfg->imaging_rate)) {
- ufo_roof_config_free(cfg);
+ roof_config_free(cfg);
roof_new_error(error, "Invalid sample (%u) and imaging (%u) rates are specified", cfg->sample_rate, cfg->imaging_rate);
}
if ((json_object_get_member(hardware, "samples_per_rotation"))&&(cfg->samples_per_rotation != (cfg->sample_rate / cfg->imaging_rate))) {
- ufo_roof_config_free(cfg);
+ roof_config_free(cfg);
roof_new_error(error, "The specified samples-per-rotation (%u) doesn't match sample/imaging rates (%u / %u)", cfg->samples_per_rotation, cfg->sample_rate, cfg->imaging_rate);
}
@@ -148,7 +149,7 @@ UfoRoofConfig *ufo_roof_config_new(const char *config, UfoRoofConfigFlags flags,
}
if ((cfg->bit_depth%8)||(cfg->bit_depth > 32)) {
- ufo_roof_config_free(cfg);
+ roof_config_free(cfg);
roof_new_error(error, "Invalid bit-depth (%u) is configured, only 8, 16, 24, 32 is currently supported", cfg->bit_depth);
}
@@ -170,13 +171,13 @@ UfoRoofConfig *ufo_roof_config_new(const char *config, UfoRoofConfigFlags flags,
roof_config_node_get(cfg->dataset_size, network, int, "dataset_size");
if (!cfg->payload_size) {
- ufo_roof_config_free(cfg);
+ roof_config_free(cfg);
roof_new_error(error, "Packet payload and header size must be set");
}
- if ((cfg->header_size < sizeof(UfoRoofPacketHeader))&&(!strncmp(cfg->protocol, "udp", 3))) {
- ufo_roof_config_free(cfg);
- roof_new_error(error, "The header with packet id (%lu bytes) is expected for un-ordered protocols", sizeof(UfoRoofPacketHeader));
+ if ((cfg->header_size < sizeof(RoofPacketHeader))&&(!strncmp(cfg->protocol, "udp", 3))) {
+ roof_config_free(cfg);
+ roof_new_error(error, "The header with packet id (%lu bytes) is expected for un-ordered protocols", sizeof(RoofPacketHeader));
}
if (!cfg->dataset_size)
@@ -195,6 +196,7 @@ UfoRoofConfig *ufo_roof_config_new(const char *config, UfoRoofConfigFlags flags,
roof_config_node_get(cfg->buffer_size, performance, int, "buffer_size");
roof_config_node_get(cfg->drop_buffers, performance, int, "drop_buffers");
roof_config_node_get(cfg->latency_buffers, performance, int, "latency_buffers");
+ roof_config_node_get(cfg->sockets_per_thread, performance, int, "sockets_per_thread");
}
@@ -204,13 +206,13 @@ UfoRoofConfig *ufo_roof_config_new(const char *config, UfoRoofConfigFlags flags,
// Dataset should be split in an integer number of network packets (we don't expect data from different datasets in one packet at the moment)
if ((cfg->dataset_size % cfg->payload_size)||(fragments_per_dataset%cfg->n_streams)) {
- ufo_roof_config_free(cfg);
+ roof_config_free(cfg);
roof_new_error(error, "Inconsistent ROOF configuration: dataset_size=%u, packet_size=%u, data_streams=%u", cfg->dataset_size, cfg->payload_size, cfg->n_streams);
}
// Packet should contain an integer number of complete projections (their parts provided by a single module)
if ((cfg->roof_mode)&&(cfg->payload_size % (cfg->channels_per_module * (cfg->bit_depth / 8)))) {
- ufo_roof_config_free(cfg);
+ roof_config_free(cfg);
roof_new_error(error, "Inconsistent ROOF configuration: packet_size=%u, projection_size=%u (%u channels x %u bits)", cfg->payload_size, cfg->channels_per_module * (cfg->bit_depth / 8), cfg->channels_per_module, cfg->bit_depth);
}
@@ -219,12 +221,12 @@ UfoRoofConfig *ufo_roof_config_new(const char *config, UfoRoofConfigFlags flags,
if (hardware) {
if (cfg->n_modules != cfg->n_streams) {
- ufo_roof_config_free(cfg);
+ roof_config_free(cfg);
roof_new_error(error, "Currently, number of ROOF modules (%u) is exepcted to be equal to number of independent data streams (%u)", cfg->n_modules, cfg->n_streams);
}
if (cfg->dataset_size != (cfg->fan_projections * cfg->fan_bins * cfg->bit_depth / 8)) {
- ufo_roof_config_free(cfg);
+ roof_config_free(cfg);
roof_new_error(error, "Specified dataset size (%u) does not match ROOF configuration (modules: %u, channels-per-module: %u, bit-depth: %u, samples-per-rotation: %u)", cfg->dataset_size, cfg->n_modules, cfg->channels_per_module, cfg->bit_depth, cfg->samples_per_rotation);
}
}
diff --git a/src/ufo-roof-config.h b/src/roof-config.h
index 8718381..f28cb60 100644
--- a/src/ufo-roof-config.h
+++ b/src/roof-config.h
@@ -1,5 +1,5 @@
-#ifndef __UFO_ROOF_CONFIG_H
-#define __UFO_ROOF_CONFIG_H
+#ifndef __ROOF_CONFIG_H
+#define __ROOF_CONFIG_H
#include <glib.h>
@@ -39,7 +39,7 @@ typedef struct {
guint drop_buffers; // If we are slow and lost some buffers, we may drop more than minimally necessary to catch up.
guint latency_buffers; // We skip incomplete buffers if later (at least latency_buffer in future) dataset is already ready, 0 - never skip
guint network_timeout; // Maximum time (us) to wait for data on the socket
-
+ guint sockets_per_thread; // Number of sockets per thread. Optimally number of therads should not exceed number of CPU cores
@@ -50,15 +50,15 @@ typedef struct {
-} UfoRoofConfig;
+} RoofConfig;
typedef enum {
- UFO_ROOF_CONFIG_DEFAULT = 0,
- UFO_ROOF_CONFIG_SIMULATION = 1
-} UfoRoofConfigFlags;
+ ROOF_CONFIG_DEFAULT = 0,
+ ROOF_CONFIG_SIMULATION = 1
+} RoofConfigFlags;
-UfoRoofConfig *ufo_roof_config_new(const char *config, UfoRoofConfigFlags flags, GError **error);
-void ufo_roof_config_free(UfoRoofConfig *cfg);
+RoofConfig *roof_config_new(const char *config, RoofConfigFlags flags, GError **error);
+void roof_config_free(RoofConfig *cfg);
-#endif /* __UFO_ROOF_CONFIG_H */
+#endif /* __ROOF_CONFIG_H */
diff --git a/src/ufo-roof-error.h b/src/roof-error.h
index 5491f31..418645c 100644
--- a/src/ufo-roof-error.h
+++ b/src/roof-error.h
@@ -1,5 +1,5 @@
-#ifndef __UFO_ROOF_ERROR_H
-#define __UFO_ROOF_ERROR_H
+#ifndef __ROOF_ERROR_H
+#define __ROOF_ERROR_H
#include <ufo/ufo.h>
@@ -38,6 +38,9 @@
#define roof_setup_error(error, ...) \
roof_error(error, SETUP, __VA_ARGS__)
+#define roof_setup_error_with_retval(error, ...) \
+ roof_error_with_retval(error, retval, SETUP, __VA_ARGS__)
+
#define roof_new_error(error, ...) \
roof_error_with_retval(error, NULL, SETUP, __VA_ARGS__)
@@ -55,4 +58,4 @@
roof_error(error, SETUP, __VA_ARGS__)
-#endif /* __UFO_ROOF_ERROR_H */
+#endif /* __ROOF_ERROR_H */
diff --git a/src/ufo-roof-read-file.c b/src/roof-read-file.c
index 4ee11c6..da8d51c 100644
--- a/src/ufo-roof-read-file.c
+++ b/src/roof-read-file.c
@@ -1,5 +1,6 @@
#include <stdio.h>
#include <errno.h>
+#include <assert.h>
#include <stdint.h>
#include "glib.h"
@@ -8,64 +9,71 @@
#include "ufo-roof-read-file.h"
typedef struct {
- UfoRoofReadInterface iface;
+ RoofReadInterface iface;
+
+ RoofConfig *cfg;
- UfoRoofConfig *cfg;
-
gchar *fname;
FILE *fd;
-} UfoRoofReadFile;
+ uint8_t *buf;
+} RoofReadFile;
+
+static void roof_read_file_free(RoofReadInterface *iface) {
+ RoofReadFile *reader = (RoofReadFile*)iface;
-static void ufo_roof_read_file_free(UfoRoofReadInterface *iface) {
- UfoRoofReadFile *reader = (UfoRoofReadFile*)iface;
-
if (reader) {
if (reader->fname)
g_free(reader->fname);
if (reader->fd)
fclose(reader->fd);
-
+
+ if (reader->buf)
+ free(reader->buf);
free(reader);
}
}
-static guint ufo_roof_read_file(UfoRoofReadInterface *iface, uint8_t *buffers, GError **error) {
- UfoRoofReadFile *reader = (UfoRoofReadFile*)iface;
- UfoRoofConfig *cfg = reader->cfg;
+static guint roof_read_file(RoofReadInterface *iface, uint8_t **buffers, GError **error) {
+ RoofReadFile *reader = (RoofReadFile*)iface;
+ RoofConfig *cfg = reader->cfg;
+
+ assert(iface);
+ assert(buffers);
size_t bytes = 0;
size_t packet_size = cfg->header_size + cfg->payload_size;
size_t expected = cfg->max_packets * packet_size;
while ((!feof(reader->fd))&&(!ferror(reader->fd))&&(bytes < expected)) {
- size_t ret = fread(buffers + bytes, 1, expected - bytes, reader->fd);
+ size_t ret = fread(reader->buf + bytes, 1, expected - bytes, reader->fd);
bytes += ret;
}
guint packets = bytes / packet_size;
-
+
if (ferror(reader->fd)) {
roof_network_error_with_retval(error, 0, "read failed, error %i", ferror(reader->fd));
} else if ((feof(reader->fd))&&(bytes % packet_size)) {
roof_network_error_with_retval(error, packets, "extra data in the end of input");
}
-
+
+ *buffers = reader->buf;
return packets;
}
-UfoRoofReadInterface *ufo_roof_read_file_new(UfoRoofConfig *cfg, const char *path, guint file_id, GError **error) {
- UfoRoofReadFile *reader = (UfoRoofReadFile*)calloc(1, sizeof(UfoRoofReadFile));
- if (!reader) roof_new_error(error, "Can't allocate UfoRoofReadFile");
+RoofReadInterface *roof_read_file_new(RoofConfig *cfg, const char *path, guint file_id, GError **error) {
+ RoofReadFile *reader = (RoofReadFile*)calloc(1, sizeof(RoofReadFile));
+ if (!reader) roof_new_error(error, "Can't allocate RoofReadFile");
// FIXME: Shall we jump if max_packet_size > header+payload (or will be extra data included in the data files)? Report error for now.
if ((cfg->header_size + cfg->payload_size) != cfg->max_packet_size)
- roof_new_error(error, "packet_size (%u) should be equal to max_packet_size (%u) if UfoRoofReadFile is used", cfg->header_size + cfg->payload_size, cfg->max_packet_size);
+ roof_new_error(error, "packet_size (%u) should be equal to max_packet_size (%u) if RoofReadFile is used", cfg->header_size + cfg->payload_size, cfg->max_packet_size);
reader->cfg = cfg;
- reader->iface.close = ufo_roof_read_file_free;
- reader->iface.read =ufo_roof_read_file;
+ reader->iface.close = roof_read_file_free;
+ reader->iface.read =roof_read_file;
reader->fname = g_strdup_printf(path, file_id);
if (!reader->fname) {
@@ -80,5 +88,11 @@ UfoRoofReadInterface *ufo_roof_read_file_new(UfoRoofConfig *cfg, const char *pat
roof_new_error(error, "Can't open file %i at path %s", file_id, path);
}
- return (UfoRoofReadInterface*)reader;
+ reader->buf = (uint8_t*)malloc(cfg->max_packets * (cfg->header_size + cfg->payload_size));
+ if (!reader->buf) {
+ roof_read_file_free((RoofReadInterface*)reader);
+ roof_new_error(error, "Can't allocate file buffer");
+ }
+
+ return (RoofReadInterface*)reader;
}
diff --git a/src/roof-read-file.h b/src/roof-read-file.h
new file mode 100644
index 0000000..60f60ba
--- /dev/null
+++ b/src/roof-read-file.h
@@ -0,0 +1,8 @@
+#ifndef __ROOF_READ_FILE_H
+#define __ROOF_READ_FILE_H
+
+#include "ufo-roof-read.h"
+
+RoofReadInterface *roof_read_file_new(RoofConfig *cfg, const char *path, guint file_id, GError **error);
+
+#endif
diff --git a/src/ufo-roof-read-socket.c b/src/roof-read-socket.c
index e8f7ce4..1b98b44 100644
--- a/src/ufo-roof-read-socket.c
+++ b/src/roof-read-socket.c
@@ -2,6 +2,7 @@
#include <stdio.h>
#include <unistd.h>
+#include <assert.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/socket.h>
@@ -13,48 +14,51 @@
#include "ufo-roof-read-socket.h"
typedef struct {
- UfoRoofReadInterface iface;
+ RoofReadInterface iface;
+
+ RoofConfig *cfg;
- UfoRoofConfig *cfg;
int socket;
-} UfoRoofReadSocket;
+ struct mmsghdr *msg;
+ struct iovec *msgvec;
+ uint8_t *buf;
+
+ int libvma;
+} RoofReadSocket;
+
+static void roof_read_socket_free(RoofReadInterface *iface) {
+ RoofReadSocket *reader = (RoofReadSocket*)iface;
-static void ufo_roof_read_socket_free(UfoRoofReadInterface *iface) {
- UfoRoofReadSocket *reader = (UfoRoofReadSocket*)iface;
-
if (reader) {
if (reader->socket >= 0)
close(reader->socket);
+ if (reader->msgvec)
+ free(reader->msgvec);
+ if (reader->msg)
+ free(reader->msg);
+ if (reader->buf)
+ free(reader->buf);
free(reader);
}
}
-static guint ufo_roof_read_socket(UfoRoofReadInterface *iface, uint8_t *buf, GError **error) {
+static guint roof_read_socket(RoofReadInterface *iface, uint8_t **buf, GError **error) {
int packets;
struct timespec timeout_ts;
- UfoRoofReadSocket *reader = (UfoRoofReadSocket*)iface;
- UfoRoofConfig *cfg = reader->cfg;
+ assert(iface);
+ assert(buf);
- struct mmsghdr msg[cfg->max_packets];
- struct iovec msgvec[cfg->max_packets];
+ RoofReadSocket *reader = (RoofReadSocket*)iface;
+ RoofConfig *cfg = reader->cfg;
timeout_ts.tv_sec = cfg->network_timeout / 1000000;
timeout_ts.tv_nsec = 1000 * (cfg->network_timeout % 1000000);
- // FIXME: Is it optimal? Auto-tune max_packets? Combine read & build?
- memset(msg, 0, sizeof(msg));
- memset(msgvec, 0, sizeof(msgvec));
- for (guint i = 0; i < cfg->max_packets; i++) {
- msgvec[i].iov_base = buf + i * cfg->max_packet_size;
- msgvec[i].iov_len = cfg->max_packet_size;
- msg[i].msg_hdr.msg_iov = &msgvec[i];
- msg[i].msg_hdr.msg_iovlen = 1;
- }
//retry:
// Timeout seems broken, see BUGS in 'recvmmsg' bugs page
- packets = recvmmsg(reader->socket, msg, reader->cfg->max_packets, MSG_WAITFORONE, &timeout_ts);
+ packets = recvmmsg(reader->socket, reader->msg, reader->cfg->max_packets, MSG_WAITFORONE, &timeout_ts);
if (packets < 0) roof_network_error_with_retval(error, 0, "recvmmsg failed, error %i", errno);
/*
@@ -64,25 +68,25 @@ static guint ufo_roof_read_socket(UfoRoofReadInterface *iface, uint8_t *buf, GEr
}
*/
+ *buf = reader->buf;
return (guint)packets;
}
-
-UfoRoofReadInterface *ufo_roof_read_socket_new(UfoRoofConfig *cfg, guint id, GError **error) {
+RoofReadInterface *roof_read_socket_new(RoofConfig *cfg, guint id, GError **error) {
int err;
int port = cfg->port + id;
char port_str[16];
- const char *addr_str = "0.0.0.0";
+ const char *addr_str = "192.168.100.8";
struct addrinfo sockaddr_hints;
struct addrinfo *sockaddr_info;
- UfoRoofReadSocket *reader = (UfoRoofReadSocket*)calloc(1, sizeof(UfoRoofReadSocket));
- if (!reader) roof_new_error(error, "Can't allocate UfoRoofReadSocket");
+ RoofReadSocket *reader = (RoofReadSocket*)calloc(1, sizeof(RoofReadSocket));
+ if (!reader) roof_new_error(error, "Can't allocate RoofReadSocket");
reader->cfg = cfg;
- reader->iface.close = ufo_roof_read_socket_free;
- reader->iface.read =ufo_roof_read_socket;
-
+ reader->iface.close = roof_read_socket_free;
+ reader->iface.read =roof_read_socket;
+
snprintf(port_str, sizeof(port_str), "%d", port);
port_str[sizeof(port_str) / sizeof(port_str[0]) - 1] = '\0';
@@ -121,14 +125,35 @@ UfoRoofReadInterface *ufo_roof_read_socket_new(UfoRoofConfig *cfg, guint id, GEr
}
/*
- // Send ping request to force initialization
+ // Check that FPGA module is ready
char msg[4];
addr_str = "192.168.34.83";
getaddrinfo(addr_str, port_str, &sockaddr_hints, &sockaddr_info);
sendto(reader->socket, msg, sizeof(msg), 0, sockaddr_info->ai_addr, sockaddr_info->ai_addrlen);
*/
-
freeaddrinfo(sockaddr_info);
- return (UfoRoofReadInterface*)reader;
+ if (reader->libvma) {
+ } else {
+ reader->buf = (uint8_t*)malloc(cfg->max_packets * cfg->max_packet_size);
+ reader->msg = (struct mmsghdr*)malloc(cfg->max_packets * sizeof(struct mmsghdr));
+ reader->msgvec = (struct iovec*)malloc(cfg->max_packets * sizeof(struct iovec));
+
+ if ((!reader->buf)||(!reader->msg)||(!reader->msgvec)) {
+ roof_read_socket_free((RoofReadInterface*)reader);
+ roof_new_error(error, "Can't allocate socket buffer");
+ }
+
+ memset(reader->msg, 0, cfg->max_packets * sizeof(struct mmsghdr));
+ memset(reader->msgvec, 0, cfg->max_packets * sizeof(struct iovec));
+ for (guint i = 0; i < cfg->max_packets; i++) {
+ reader->msgvec[i].iov_base = reader->buf + i * cfg->max_packet_size;
+ reader->msgvec[i].iov_len = cfg->max_packet_size;
+ reader->msg[i].msg_hdr.msg_iov = &reader->msgvec[i];
+ reader->msg[i].msg_hdr.msg_iovlen = 1;
+ }
+ }
+
+
+ return (RoofReadInterface*)reader;
}
diff --git a/src/roof-read-socket.h b/src/roof-read-socket.h
new file mode 100644
index 0000000..34a98e8
--- /dev/null
+++ b/src/roof-read-socket.h
@@ -0,0 +1,8 @@
+#ifndef __ROOF_READ_SOCKET_H
+#define __ROOF_READ_SOCKET_H
+
+#include "ufo-roof-read.h"
+
+RoofReadInterface *roof_read_socket_new(RoofConfig *cfg, guint id, GError **error);
+
+#endif
diff --git a/src/roof-read.c b/src/roof-read.c
new file mode 100644
index 0000000..18a7508
--- /dev/null
+++ b/src/roof-read.c
@@ -0,0 +1,107 @@
+#include <stdio.h>
+
+#include "roof.h"
+#include "roof-read.h"
+
+RoofReadContext *roof_read_context_new(RoofConfig *cfg, RoofReadInterface *rdi, GError **error) {
+ guint i;
+ GError *gerr = NULL;
+
+ RoofReadContext *rdc = (RoofReadContext*)calloc(1, sizeof(RoofReadContext));
+ if (!rdc) roof_new_error(error, "Can't allocate RoofReadContext");
+
+ rdc->cfg = cfg;
+ rdc->rdi = rdi;
+
+ return rdc;
+}
+
+void roof_read_context_free(RoofReadContext *rdc) {
+ if (rdc) {
+ free(rdc);
+ }
+}
+
+void roof_read(RoofReadContext *rdc, RoofBufferInterface *bfi, GError *error) {
+ uint8_t *rdbuf = rdc->rdbuf;
+ guint n_packets = rdc->n_packets;
+ guint packet_id = rdc->packet_id;
+ guint fragment_id = rdc->fragment_id;
+ GError *gerr = NULL;
+
+ ReadRoofInterface *rdi = rdc->rdi;
+
+ if (rdc->rdbuf) {
+ rdbuf = rdc->rdbuf;
+ packets
+ else {
+ guint packets = rdi->read(rdi[stream_id], &rdbuf, &gerr);
+ if (gerr) roof_propagate_error(error, gerr, "roof_read: ");
+ packet_id = 0;
+ fragment_id = 0;
+ }
+
+ packet = rdbuf + packet_id * (size + padding
+
+ while (packet) {
+
+
+ while (rdbuf) {
+ //
+
+ for (guint i = 0; i < packets; i++) {
+ guint64 packet_id = 0;
+
+ // Otherwise considered consecutive and handled by the buffer
+ if (cfg->header_size >= sizeof(RoofPacketHeader)) {
+ RoofPacketHeader *pheader = ROOF_PACKET_HEADER(fragment);
+ packet_id = be64toh(pheader->packet_id) + 1;
+ } else {
+ // FIXME: consider consecutive
+ //fragment_id = ++buffer->stream_fragment[stream_id];
+ }
+
+ // FIXME: packet may contain fragments for multiple datasets
+ fragment_id = packet_id * fragments_per_packet;
+ guint64 dataset_id = (packet_id - 1) / cfg->fragments_per_stream;
+ guint64 fragment_id = (packet_id - 1) % cfg->fragments_per_stream;
+
+ // FIXME: verify that packet is consecutive
+ // if
+
+ // Drop packets of already skipped datasets
+ if (dataset_id < priv->current_id)
+ continue;
+
+ // FIXME: stop processing and return....
+ if (dataset_id < priv->current_id)
+
+ uint8_t *fragment_buffer = dataset_buffer +
+ stream_id * fragment_dims[0] + // x-coordinate
+ (fragment_id * fragment_dims[1]) * dataset_dims[0]; // y-coordinate
+
+ for (guint i = 0; i < buffer->fragment_dims[1]; ++i) {
+ memcpy(fragment_buffer + i * buffer->dataset_dims[0], (uint8_t*)fragment + i * buffer->fragment_dims[0], buffer->fragment_dims[0]);
+
+
+// buffer->last_id[stream_id] = dataset_id;
+
+ ready |= roof_buffer_set_fragment(buf, sid, packet_id, fragment, &gerr);
+ if (gerr) roof_print_error(gerr);
+
+ fragment += cfg->max_packet_size;
+
+
+ }
+
+
+ }
+
+
+
+
+ void *buf = roof_buffer_get_dataset(bfi, ?);
+
+ // Computing offsets each time, etc.
+ roof_buffer_set_fragment
+} \ No newline at end of file
diff --git a/src/roof-read.h b/src/roof-read.h
new file mode 100644
index 0000000..1ab846a
--- /dev/null
+++ b/src/roof-read.h
@@ -0,0 +1,45 @@
+#ifndef __ROOF_READ_H
+#define __ROOF_READ_H
+
+typedef struct _RoofRead RoofRead;
+
+#include "roof-config.h"
+
+G_BEGIN_DECLS
+
+typedef struct _RoofReadContext RoofReadContext;
+typedef struct _RoofReadInterface RoofReadInterface;
+typedef struct _RoofReadInterfaceSettings RoofReadInterfaceSettings;
+
+typedef guint (*RoofReaderRead)(RoofReadInterface *reader, uint8_t **buf, GError **error);
+typedef void (*RoofReaderClose)(RoofReadInterface *reader);
+
+struct _RoofReadInterfaceSettings {
+ guint padding; // Packet size + padding
+};
+
+struct _RoofReadInterface {
+ RoofReaderRead read;
+ RoofReaderClose close;
+
+ RoofReadInterfaceSettings settings;
+};
+
+struct _RoofReadContext {
+ RoofConfig *cfg;
+ RoofReadInterface *rdi;
+
+ void *rdbuf; // The buffer we are currently processing
+ guint n_packets; // Number of packets in the buffer
+ guint packet_id; // Last processed packet in the buffer
+ guint fragment_id; // Last processed fragment in the packet
+};
+
+RoofReadContext *roof_read_context_new(RoofConfig *cfg, RoofReadInterface *rdi, GError **error);
+void roof_read_context_free(RoofReadContext *ctx);
+const RoofReadInterfaceSettings *roof_read_get_settings(UFORoofRead *ctx, GError **error);
+
+
+G_END_DECLS
+
+#endif /* __ROOF_READ_H */
diff --git a/src/roof-thread.c b/src/roof-thread.c
new file mode 100644
index 0000000..570300f
--- /dev/null
+++ b/src/roof-thread.c
@@ -0,0 +1,95 @@
+#include <stdio.h>
+
+#include "roof.h"
+#include "roof-thread.h"
+
+
+RoofThreadContext *roof_thread_context_new(RoofConfig *cfg, Roof *roof, guint from, guint to, GError **error) {
+ GError *gerr = NULL;
+
+ RoofThreadContext *rdt = (RoofThreadContext*)calloc(1, sizeof(RoofThreadContext));
+ if (!rdt) roof_new_error(error, "Can't allocate RoofThreadContext");
+
+ rdt->cfg = cfg;
+ rdt->rdi = rdi;
+
+ return rdt;
+
+}
+
+void roof_thread_context_free(RoofThreadContext *rdt) {
+ if (rdt) {
+ free(rdt);
+ }
+}
+
+
+int roof_thread_read_socket(Roof *
+
+int roof_thread_read(HWThread thr, void *hwctx, int id, void *data) {
+ Roof *ctx = (Roof*)data;
+
+ RoofConfig *cfg = ctx->cfg;
+ RoofThreadContext *rdt = ctx->rdt[id];
+
+ guint dataset_dims[2] = { cfg->fan_bins * cfg->bit_depth / 8, cfg->fan_projections };
+ guint fragment_dims[2] = { cfg->channels_per_module * cfg->bit_depth / 8, ????cfg->payload_size / buffer->fragment_dims[0] };
+
+ packet_id
+ fragment_id =
+
+ for (guint stream_id = from; stream_id < to; stream_id++) {
+
+ uint8_t *rdbuf;
+ guint packets = priv->rdi[stream_id]->read(priv->rdi[stream_id], &rdbuf, &gerr);
+ if (gerr) roof_print_error(gerr);
+
+ for (guint i = 0; i < packets; i++) {
+ guint64 packet_id = 0;
+
+ // Otherwise considered consecutive and handled by the buffer
+ if (cfg->header_size >= sizeof(RoofPacketHeader)) {
+ RoofPacketHeader *pheader = ROOF_PACKET_HEADER(fragment);
+ packet_id = be64toh(pheader->packet_id) + 1;
+ } else {
+ // FIXME: consider consecutive
+ //fragment_id = ++buffer->stream_fragment[stream_id];
+ }
+
+ // FIXME: packet may contain fragments for multiple datasets
+ fragment_id = packet_id * fragments_per_packet;
+ guint64 dataset_id = (packet_id - 1) / cfg->fragments_per_stream;
+ guint64 fragment_id = (packet_id - 1) % cfg->fragments_per_stream;
+
+ // FIXME: verify that packet is consecutive
+ // if
+
+ // Drop packets of already skipped datasets
+ if (dataset_id < priv->current_id)
+ continue;
+
+ // FIXME: stop processing and return....
+ if (dataset_id < priv->current_id)
+
+ uint8_t *fragment_buffer = dataset_buffer +
+ stream_id * fragment_dims[0] + // x-coordinate
+ (fragment_id * fragment_dims[1]) * dataset_dims[0]; // y-coordinate
+
+ for (guint i = 0; i < buffer->fragment_dims[1]; ++i) {
+ memcpy(fragment_buffer + i * buffer->dataset_dims[0], (uint8_t*)fragment + i * buffer->fragment_dims[0], buffer->fragment_dims[0]);
+
+
+// buffer->last_id[stream_id] = dataset_id;
+
+ ready |= roof_buffer_set_fragment(buf, sid, packet_id, fragment, &gerr);
+ if (gerr) roof_print_error(gerr);
+
+ fragment += cfg->max_packet_size;
+
+
+ }
+
+ }
+
+}
+
diff --git a/src/roof-thread.h b/src/roof-thread.h
new file mode 100644
index 0000000..a180b29
--- /dev/null
+++ b/src/roof-thread.h
@@ -0,0 +1,30 @@
+#ifndef __ROOF_THREAD_H
+#define __ROOF_THREAD_H
+
+typedef struct _RoofThreadContext RoofThreadContext;
+
+#include "ufo-roof-read.h"
+
+struct _RoofThreadContext {
+ RoofConfig *cfg;
+ RoofBuffer *buf;
+ RoofReadInterface *rdi;
+ guint from, to; // Determines ports/files which are read by this thread (from is inclusive and to - exclusive)
+};
+
+/*
+RoofReadThread *guint roof_read_thread_new(RoofRead *rd, guint from, guint to, GError **error);
+void roof_read_thread_free(UFORoofReadThread *thr, GError **error);
+gboolean roof_read_thread_start(UFORoofReadThread *thr, GError **error);
+gboolean roof_read_thread_stop(UFORoofReadThread *thr, GError **error);
+*/
+
+int roof_thread_read(HWThread thr, void *hwctx, int id, void *data);
+
+RoofThreadContext *roof_thread_context_new(RoofConfig *cfg, RoofReadContext *rdc, guint from, guint to, GError **error);
+void roof_thread_context_free(RoofThreadContext *ctx);
+
+
+
+#endif /* __ROOF_THREAD_H */
+
diff --git a/src/roof.c b/src/roof.c
new file mode 100644
index 0000000..505e3aa
--- /dev/null
+++ b/src/roof.c
@@ -0,0 +1,133 @@
+void roof_init() {
+ hw_sched_init();
+}
+
+Roof *roof_new(UfoRoofConfig *cfg, GError **error) {
+ guint i;
+ GError *gerr = NULL;
+
+ Roof *ctx = (Roof*)calloc(1, sizeof(Roof));
+ if (!ctx) roof_new_error(error, "Can't allocate Roof context");
+
+ ctx->cfg = cfg;
+
+ ctx->n_threads = cfg->n_streams / cfg->sockets_per_thread;
+ if (cfg->n_streams % cfg->sockets_per_thread) ctx->n_threads++;
+
+ ctx->rdi = (RoofReadInterface**)calloc(cfg->n_streams, sizeof(RoofReadInterface*));
+ ctx->rdc = (RoofReadContext**)calloc(cfg->n_streams, sizeof(RoofReadContext*));
+ ctx->rdt = (RoofThreadContext**)calloc(ctx->n_threads, sizeof(RoofThreadContext*));
+ ctx->sched = hw_sched_create(cfg->n_threads);
+
+ if ((!ctx->rdi)||(!ctx->rdc)||(!ctx->rdt)||(!ctx->sched)) {
+ roof_free(ctx);
+ roof_setup_error(error, "Failed to allocate memory for various Roof contexts");
+ }
+
+ return ctx;
+}
+
+void roof_configure_simulation(Roof *ctx, const gchar *path, guint first_file_number, GError **error) {
+ assert(ctx);
+
+ ctx->simulate = 1;
+ ctx->path = path;
+ ctx->first_file_number = first_file_number;
+}
+
+void roof_configure_stop_mode(Roof *ctx, const gulong max, GError **error) {
+ assert(ctx);
+
+ ctx->max_datasets = max;
+}
+
+
+void roof_setup(Roof *ctx, GError **error) {
+ guint i;
+ GError *gerr = NULL;
+
+ assert(ctx);
+
+ RoofConfig *cfg = ctx->cfg;
+
+/*
+ ctx->buf = roof_buffer_new(cfg, 2, ctx->max_datasets, &gerr);
+ if ((gerr)||(!ctx->buf))
+ roof_propagate_error(error, gerr, "roof_buffer_new: ");
+*/
+
+ for (i = 0; i < cfg->n_streams; i++) {
+ if (ctx->simulate) {
+ if (!ctx->path)
+ roof_setup_error(error, "Path to simulated data should be specified");
+
+ ctx->rdi[i] = roof_read_file_new(cfg, ctx->path, ctx->first_file_number + i, &gerr);
+ } else {
+ ctx->rdi[i] = roof_read_socket_new(cfg, i, &gerr);
+ }
+
+ if (!ctx->rdi[i])
+ roof_propagate_error(error, gerr, "roof_read_interface_new: ");
+
+ ctx->rdc[i] = roof_read_context_new(cfg, ctx->rdi[i], &gerr);
+ if (!ctx->rdc[i])
+ roof_propagate_error(error, gerr, "roof_read_context_new: ");
+ }
+
+ // We try to distribute sockets uniformly respecting sockets_per_thread as maximum limit
+ guint extra = 0, sockets_per_thread = cfg->n_streams / ctx->n_threads;
+ if (cfg->n_streams % ctx->n_threads) extra = cfg->n_streams - ctx->n_threads * sockets_per_thread;
+
+ guint from, to;
+ for (i = 0; i < ctx->n_threads; i++) {
+ guint to = from + sockets_per_thread;
+ if (i < extra) to++;
+
+ ctx->thr[i]= roof_thread_new(cfg, ctx, from, to, &gerr);
+ if (!ctx->thr[i]) roof_propagate_error(error, gerr, "roof_thread_new (%i): ", i);
+ }
+}
+
+
+void roof_free(Roof *ctx) {
+ guint i;
+
+ if (ctx) {
+ RoofConfig *cfg = ctx->cfg;
+ if (ctx->sched) hw_sched_destroy(priv->sched);
+
+ if (ctx->rdt) {
+ for (i = 0; i < ctx->n_threads; i++)
+ if (ctx->rdt[i]) roof_thread_context_free(ctx->rdt[i]);
+ free(ctx->rdt);
+ }
+
+ if (ctx->rdc) {
+ for (i = 0; i < cfg->n_streams; i++)
+ if (ctx->rdc[i]) roof_read_context_free(ctx->rdc[i]);
+ free(ctx->rdc);
+ }
+
+ if (ctx->rdi) {
+ for (i = 0; i < cfg->n_streams; i++)
+ if (ctx->rdi[i]) roof_read_interface_free(ctx->rdi[i]);
+ free(ctx->rdi);
+ }
+
+ if (ctx->buf) roof_buffer_free(ctx->buf);
+ free(ctx);
+ }
+}
+
+
+void roof_read_dataset(Roof *ctx, void *buffer, GError **error) {
+ priv->current_dataset;
+ priv->current_buffer = buffer;
+
+ err = hw_sched_schedule_thread_task(sched, (void*)ctx, roof_thread_read);
+ if (!err) err = hw_sched_wait_task(sched);
+ if (err) { fprintf(stderr, "Error %i scheduling init threads", err); exit(-1); }
+}
+
+
+}
diff --git a/src/roof.h b/src/roof.h
new file mode 100644
index 0000000..316e8ed
--- /dev/null
+++ b/src/roof.h
@@ -0,0 +1,46 @@
+#ifndef __ROOF_H
+#define __ROOF_H
+
+typedef struct _Roof Roof;
+
+#include "roof-config.h"
+#include "roof-buffer.h"
+#include "roof-read.h"
+#include "roof-thread.h"
+
+struct _Roof {
+ RoofConfig *cfg; // Parsed ROOF parameters
+ RoofBuffer *buf; // Ring buffer for incomming UDP packet
+ RoofReadInterface **rdi; // Reader interface abstraction, one per socket (no threading)
+ RoofReadContext **rdc; // Reader context: common structures, one per socket (no threading)
+ RoofThread **rdt; // Threading context: multiple reader contexts per thread
+
+ guint n_threads; // Number of schedulled threads
+ HWSched sched; // OpenMP-style thread scheduler
+
+ gboolean simulate; // Indicates if we are running in network or simulation modes
+ gchar *path; // UFO file path for simulation mode
+ guint first_file_number; // Number of a first simulated file (0 or 1)
+
+ guint max_datasets; // Number of datasets to read
+
+// guint64 announced; // For debugging
+// guint64 generated; // Total number for control
+
+// struct timespec last_fragment_timestamp;
+
+};
+
+
+Roof *roof_new(RoofConfig *cfg, GError **error);
+void roof_free(Roof *ctx);
+
+void roof_configure_simulation(Roof *ctx, const gchar *path, guint first_file_number, GError **error);
+void roof_configure_stop_mode(Roof *ctx, const gulong max, GError **error);
+//void roof_configure_writer(Roof *ctx, ...);
+//void roof_configure_filter(Roof *ctx, ...);
+void roof_setup(Roof *ctx, GError **error);
+
+void roof_read(Roof *ctx, void *buffer, GError **error);
+
+#endif \ No newline at end of file
diff --git a/src/save/memcpy.c b/src/save/memcpy.c
new file mode 100644
index 0000000..5c29d01
--- /dev/null
+++ b/src/save/memcpy.c
@@ -0,0 +1,344 @@
+/********************************************************************
+ ** File: memcpy.c
+ **
+ ** Copyright (C) 1999-2010 Daniel Vik
+ **
+ ** This software is provided 'as-is', without any express or implied
+ ** warranty. In no event will the authors be held liable for any
+ ** damages arising from the use of this software.
+ ** Permission is granted to anyone to use this software for any
+ ** purpose, including commercial applications, and to alter it and
+ ** redistribute it freely, subject to the following restrictions:
+ **
+ ** 1. The origin of this software must not be misrepresented; you
+ ** must not claim that you wrote the original software. If you
+ ** use this software in a product, an acknowledgment in the
+ ** use this software in a product, an acknowledgment in the
+ ** product documentation would be appreciated but is not
+ ** required.
+ **
+ ** 2. Altered source versions must be plainly marked as such, and
+ ** must not be misrepresented as being the original software.
+ **
+ ** 3. This notice may not be removed or altered from any source
+ ** distribution.
+ **
+ **
+ ** Description: Implementation of the standard library function memcpy.
+ ** This implementation of memcpy() is ANSI-C89 compatible.
+ **
+ ** The following configuration options can be set:
+ **
+ ** LITTLE_ENDIAN - Uses processor with little endian
+ ** addressing. Default is big endian.
+ **
+ ** PRE_INC_PTRS - Use pre increment of pointers.
+ ** Default is post increment of
+ ** pointers.
+ **
+ ** INDEXED_COPY - Copying data using array indexing.
+ ** Using this option, disables the
+ ** PRE_INC_PTRS option.
+ **
+ ** MEMCPY_64BIT - Compiles memcpy for 64 bit
+ ** architectures
+ **
+ **
+ ** Best Settings:
+ **
+ ** Intel x86: LITTLE_ENDIAN and INDEXED_COPY
+ **
+ *******************************************************************/
+
+
+
+/********************************************************************
+ ** Configuration definitions.
+ *******************************************************************/
+
+#define LITTLE_ENDIAN
+#define INDEXED_COPY
+
+
+/********************************************************************
+ ** Includes for size_t definition
+ *******************************************************************/
+
+#include <stddef.h>
+
+
+/********************************************************************
+ ** Typedefs
+ *******************************************************************/
+
+typedef unsigned char UInt8;
+typedef unsigned short UInt16;
+typedef unsigned int UInt32;
+#ifdef _WIN32
+typedef unsigned __int64 UInt64;
+#else
+typedef unsigned long long UInt64;
+#endif
+
+#ifdef MEMCPY_64BIT
+typedef UInt64 UIntN;
+#define TYPE_WIDTH 8L
+#else
+typedef UInt32 UIntN;
+#define TYPE_WIDTH 4L
+#endif
+
+
+/********************************************************************
+ ** Remove definitions when INDEXED_COPY is defined.
+ *******************************************************************/
+
+#if defined (INDEXED_COPY)
+#if defined (PRE_INC_PTRS)
+#undef PRE_INC_PTRS
+#endif /*PRE_INC_PTRS*/
+#endif /*INDEXED_COPY*/
+
+
+
+/********************************************************************
+ ** Definitions for pre and post increment of pointers.
+ *******************************************************************/
+
+#if defined (PRE_INC_PTRS)
+
+#define START_VAL(x) (x)--
+#define INC_VAL(x) *++(x)
+#define CAST_TO_U8(p, o) ((UInt8*)p + o + TYPE_WIDTH)
+#define WHILE_DEST_BREAK (TYPE_WIDTH - 1)
+#define PRE_LOOP_ADJUST - (TYPE_WIDTH - 1)
+#define PRE_SWITCH_ADJUST + 1
+
+#else /*PRE_INC_PTRS*/
+
+#define START_VAL(x)
+#define INC_VAL(x) *(x)++
+#define CAST_TO_U8(p, o) ((UInt8*)p + o)
+#define WHILE_DEST_BREAK 0
+#define PRE_LOOP_ADJUST
+#define PRE_SWITCH_ADJUST
+
+#endif /*PRE_INC_PTRS*/
+
+
+
+/********************************************************************
+ ** Definitions for endians
+ *******************************************************************/
+
+#if defined (LITTLE_ENDIAN)
+
+#define SHL >>
+#define SHR <<
+
+#else /* LITTLE_ENDIAN */
+
+#define SHL <<
+#define SHR >>
+
+#endif /* LITTLE_ENDIAN */
+
+
+
+/********************************************************************
+ ** Macros for copying words of different alignment.
+ ** Uses incremening pointers.
+ *******************************************************************/
+
+#define CP_INCR() { \
+ INC_VAL(dstN) = INC_VAL(srcN); \
+}
+
+#define CP_INCR_SH(shl, shr) { \
+ dstWord = srcWord SHL shl; \
+ srcWord = INC_VAL(srcN); \
+ dstWord |= srcWord SHR shr; \
+ INC_VAL(dstN) = dstWord; \
+}
+
+
+
+/********************************************************************
+ ** Macros for copying words of different alignment.
+ ** Uses array indexes.
+ *******************************************************************/
+
+#define CP_INDEX(idx) { \
+ dstN[idx] = srcN[idx]; \
+}
+
+#define CP_INDEX_SH(x, shl, shr) { \
+ dstWord = srcWord SHL shl; \
+ srcWord = srcN[x]; \
+ dstWord |= srcWord SHR shr; \
+ dstN[x] = dstWord; \
+}
+
+
+
+/********************************************************************
+ ** Macros for copying words of different alignment.
+ ** Uses incremening pointers or array indexes depending on
+ ** configuration.
+ *******************************************************************/
+
+#if defined (INDEXED_COPY)
+
+#define CP(idx) CP_INDEX(idx)
+#define CP_SH(idx, shl, shr) CP_INDEX_SH(idx, shl, shr)
+
+#define INC_INDEX(p, o) ((p) += (o))
+
+#else /* INDEXED_COPY */
+
+#define CP(idx) CP_INCR()
+#define CP_SH(idx, shl, shr) CP_INCR_SH(shl, shr)
+
+#define INC_INDEX(p, o)
+
+#endif /* INDEXED_COPY */
+
+
+#define COPY_REMAINING(count) { \
+ START_VAL(dst8); \
+ START_VAL(src8); \
+ \
+ switch (count) { \
+ case 7: INC_VAL(dst8) = INC_VAL(src8); \
+ case 6: INC_VAL(dst8) = INC_VAL(src8); \
+ case 5: INC_VAL(dst8) = INC_VAL(src8); \
+ case 4: INC_VAL(dst8) = INC_VAL(src8); \
+ case 3: INC_VAL(dst8) = INC_VAL(src8); \
+ case 2: INC_VAL(dst8) = INC_VAL(src8); \
+ case 1: INC_VAL(dst8) = INC_VAL(src8); \
+ case 0: \
+ default: break; \
+ } \
+}
+
+#define COPY_NO_SHIFT() { \
+ UIntN* dstN = (UIntN*)(dst8 PRE_LOOP_ADJUST); \
+ UIntN* srcN = (UIntN*)(src8 PRE_LOOP_ADJUST); \
+ size_t length = count / TYPE_WIDTH; \
+ \
+ while (length & 7) { \
+ CP_INCR(); \
+ length--; \
+ } \
+ \
+ length /= 8; \
+ \
+ while (length--) { \
+ CP(0); \
+ CP(1); \
+ CP(2); \
+ CP(3); \
+ CP(4); \
+ CP(5); \
+ CP(6); \
+ CP(7); \
+ \
+ INC_INDEX(dstN, 8); \
+ INC_INDEX(srcN, 8); \
+ } \
+ \
+ src8 = CAST_TO_U8(srcN, 0); \
+ dst8 = CAST_TO_U8(dstN, 0); \
+ \
+ COPY_REMAINING(count & (TYPE_WIDTH - 1)); \
+ \
+ return dest; \
+}
+
+
+
+#define COPY_SHIFT(shift) { \
+ UIntN* dstN = (UIntN*)((((UIntN)dst8) PRE_LOOP_ADJUST) & \
+ ~(TYPE_WIDTH - 1)); \
+ UIntN* srcN = (UIntN*)((((UIntN)src8) PRE_LOOP_ADJUST) & \
+ ~(TYPE_WIDTH - 1)); \
+ size_t length = count / TYPE_WIDTH; \
+ UIntN srcWord = INC_VAL(srcN); \
+ UIntN dstWord; \
+ \
+ while (length & 7) { \
+ CP_INCR_SH(8 * shift, 8 * (TYPE_WIDTH - shift)); \
+ length--; \
+ } \
+ \
+ length /= 8; \
+ \
+ while (length--) { \
+ CP_SH(0, 8 * shift, 8 * (TYPE_WIDTH - shift)); \
+ CP_SH(1, 8 * shift, 8 * (TYPE_WIDTH - shift)); \
+ CP_SH(2, 8 * shift, 8 * (TYPE_WIDTH - shift)); \
+ CP_SH(3, 8 * shift, 8 * (TYPE_WIDTH - shift)); \
+ CP_SH(4, 8 * shift, 8 * (TYPE_WIDTH - shift)); \
+ CP_SH(5, 8 * shift, 8 * (TYPE_WIDTH - shift)); \
+ CP_SH(6, 8 * shift, 8 * (TYPE_WIDTH - shift)); \
+ CP_SH(7, 8 * shift, 8 * (TYPE_WIDTH - shift)); \
+ \
+ INC_INDEX(dstN, 8); \
+ INC_INDEX(srcN, 8); \
+ } \
+ \
+ src8 = CAST_TO_U8(srcN, (shift - TYPE_WIDTH)); \
+ dst8 = CAST_TO_U8(dstN, 0); \
+ \
+ COPY_REMAINING(count & (TYPE_WIDTH - 1)); \
+ \
+ return dest; \
+}
+
+
+/********************************************************************
+ **
+ ** void *memcpy(void *dest, const void *src, size_t count)
+ **
+ ** Args: dest - pointer to destination buffer
+ ** src - pointer to source buffer
+ ** count - number of bytes to copy
+ **
+ ** Return: A pointer to destination buffer
+ **
+ ** Purpose: Copies count bytes from src to dest.
+ ** No overlap check is performed.
+ **
+ *******************************************************************/
+
+void *fast_memcpy(void *dest, const void *src, size_t count)
+{
+ UInt8* dst8 = (UInt8*)dest;
+ UInt8* src8 = (UInt8*)src;
+
+ if (count < 8) {
+ COPY_REMAINING(count);
+ return dest;
+ }
+
+ START_VAL(dst8);
+ START_VAL(src8);
+
+ while (((UIntN)dst8 & (TYPE_WIDTH - 1)) != WHILE_DEST_BREAK) {
+ INC_VAL(dst8) = INC_VAL(src8);
+ count--;
+ }
+
+ switch ((((UIntN)src8) PRE_SWITCH_ADJUST) & (TYPE_WIDTH - 1)) {
+ case 0: COPY_NO_SHIFT(); break;
+ case 1: COPY_SHIFT(1); break;
+ case 2: COPY_SHIFT(2); break;
+ case 3: COPY_SHIFT(3); break;
+#if TYPE_WIDTH > 4
+ case 4: COPY_SHIFT(4); break;
+ case 5: COPY_SHIFT(5); break;
+ case 6: COPY_SHIFT(6); break;
+ case 7: COPY_SHIFT(7); break;
+#endif
+ }
+}
diff --git a/src/save/memcpy.h b/src/save/memcpy.h
new file mode 100644
index 0000000..0714823
--- /dev/null
+++ b/src/save/memcpy.h
@@ -0,0 +1,63 @@
+/********************************************************************
+ ** File: memcpy.h
+ **
+ ** Copyright (C) 2005 Daniel Vik
+ **
+ ** This software is provided 'as-is', without any express or implied
+ ** warranty. In no event will the authors be held liable for any
+ ** damages arising from the use of this software.
+ ** Permission is granted to anyone to use this software for any
+ ** purpose, including commercial applications, and to alter it and
+ ** redistribute it freely, subject to the following restrictions:
+ **
+ ** 1. The origin of this software must not be misrepresented; you
+ ** must not claim that you wrote the original software. If you
+ ** use this software in a product, an acknowledgment in the
+ ** use this software in a product, an acknowledgment in the
+ ** product documentation would be appreciated but is not
+ ** required.
+ **
+ ** 2. Altered source versions must be plainly marked as such, and
+ ** must not be misrepresented as being the original software.
+ **
+ ** 3. This notice may not be removed or altered from any source
+ ** distribution.
+ **
+ **
+ ** Description: Implementation of the standard library function memcpy.
+ ** This implementation of memcpy() is ANSI-C89 compatible.
+ **
+ *******************************************************************/
+
+
+/********************************************************************
+ ** Includes for size_t definition
+ *******************************************************************/
+
+#include <stddef.h>
+
+
+/********************************************************************
+ **
+ ** void *memcpy(void *dest, const void *src, size_t count)
+ **
+ ** Args: dest - pointer to destination buffer
+ ** src - pointer to source buffer
+ ** count - number of bytes to copy
+ **
+ ** Return: A pointer to destination buffer
+ **
+ ** Purpose: Copies count bytes from src to dest. No overlap check
+ ** is performed.
+ **
+ *******************************************************************/
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+void *fast_memcpy(void *dest, const void *src, size_t count);
+
+#ifdef __cplusplus
+}
+#endif
diff --git a/src/save/ufo-roof-buffer-build-task.c b/src/save/ufo-roof-buffer-build-task.c
new file mode 100644
index 0000000..bdb7a7d
--- /dev/null
+++ b/src/save/ufo-roof-buffer-build-task.c
@@ -0,0 +1,474 @@
+/*
+ * Copyright (C) 2011-2015 Karlsruhe Institute of Technology
+ *
+ * This file is part of Ufo.
+ *
+ * This library is free software: you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation, either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include <stdio.h>
+#include <endian.h>
+#include <threads.h>
+
+#ifdef __APPLE__
+#include <OpenCL/cl.h>
+#else
+#include <CL/cl.h>
+#endif
+
+#include "ufo-roof.h"
+#include "ufo-roof-buffer.h"
+#include "ufo-roof-build-task.h"
+
+typedef enum {
+ BUILD_AUTO = 0,
+ BUILD_RAW,
+ BUILD_SINO,
+ BUILD_UFO
+} BuildType;
+
+struct _UfoRoofBuildTaskPrivate {
+ gchar *config; // ROOF configuration file name
+ UfoRoofConfig *cfg; // Parsed ROOF parameters
+ UfoRoofBuffer *buf; // Ring buffer for incomming UDP packet
+ UfoRoofReadInterface; *rdi; // Reader interfaces, one per socket (no threading)
+ UfoRoofRead *rd; // Threading interface
+
+ gchar *path; // UFO file path for simulation mode
+ guint first_file_number; // Number of a first simulated file (0 or 1)
+
+ BuildType build; // What dataset do we build: ROOF sinogram or raw network data
+ guint number; // Number of datasets to read
+ gboolean stop; // Stop flag
+ gboolean simulate; // Indicates if we are running in network or simulation modes
+
+ guint64 announced; // For debugging
+ guint64 generated; // Total number for control
+
+ struct timespec last_fragment_timestamp;
+};
+
+static void ufo_task_interface_init (UfoTaskIface *iface);
+
+G_DEFINE_TYPE_WITH_CODE (UfoRoofBuildTask, ufo_roof_build_task, UFO_TYPE_TASK_NODE,
+ G_IMPLEMENT_INTERFACE (UFO_TYPE_TASK,
+ ufo_task_interface_init))
+
+#define UFO_ROOF_BUILD_TASK_GET_PRIVATE(obj) (G_TYPE_INSTANCE_GET_PRIVATE((obj), UFO_TYPE_ROOF_BUILD_TASK, UfoRoofBuildTaskPrivate))
+
+
+
+static GEnumValue build_values[] = {
+ { BUILD_AUTO, "BUILD_AUTO", "auto" },
+ { BUILD_RAW, "BUILD_RAW", "raw" },
+ { BUILD_SINO, "BUILD_SINO", "sino" },
+ { BUILD_UFO, "BUILD_UFO", "ufo" },
+ { 0, NULL, NULL}
+};
+
+enum {
+ PROP_0,
+ PROP_STOP,
+ PROP_SIMULATE,
+ PROP_PATH,
+ PROP_FIRST,
+ PROP_NUMBER,
+ PROP_BUILD,
+ PROP_CONFIG,
+ N_PROPERTIES
+};
+
+static GParamSpec *properties[N_PROPERTIES] = { NULL, };
+
+UfoNode *
+ufo_roof_build_task_new (void)
+{
+ return UFO_NODE (g_object_new (UFO_TYPE_ROOF_BUILD_TASK, NULL));
+}
+
+static void
+ufo_roof_build_task_setup (UfoTask *task,
+ UfoResources *resources,
+ GError **error)
+{
+ guint i;
+ GError *gerr = NULL;
+
+ UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task);
+
+ if (!priv->config)
+ roof_setup_error(error, "ROOF configuration is not specified");
+
+ priv->cfg = ufo_roof_config_new(priv->config, priv->simulate?UFO_ROOF_CONFIG_SIMULATION:UFO_ROOF_CONFIG_DEFAULT, &gerr);
+ if (!priv->cfg)
+ roof_propagate_error(error, gerr, "roof-build-setup: ");
+
+ for (i = 0; i < priv->cfg->n_streams; i++) {
+ if (priv->simulate) {
+ if (!priv->path)
+ roof_setup_error(error, "Path to simulated data should be specified");
+
+ priv->rdi[i] = ufo_roof_read_file_new(priv->cfg, priv->path, priv->id * priv->cfg->sockets_per_thread + i + priv->first_file_number, &gerr);
+ } else
+ priv->rdi[i] = ufo_roof_read_socket_new(priv->cfg, priv->id * priv->cfg->sockets_per_thread + i, &gerr);
+
+ if (!priv->rdi[i])
+ roof_propagate_error(error, gerr, "roof_read_interface_new: ");
+ }
+
+ if (priv->build == BUILD_AUTO) {
+ if (priv->cfg->roof_mode) priv->build = BUILD_SINO;
+ else priv->build = BUILD_RAW;
+ g_object_notify_by_pspec (G_OBJECT(task), properties[PROP_BUILD]);
+ }
+
+ priv->buf = ufo_roof_buffer_new(priv->cfg, (priv->build == BUILD_RAW)?1:2, priv->number, &gerr);
+ if ((gerr)||(!priv->buf))
+ roof_propagate_error(error, gerr, "roof_buffer_new: ");
+
+ priv->rd = ufo_roof_read_new(priv->cfg, priv->rdi, priv->buf, &gerr);
+ if (gerr)
+ roof_propagate_error(error, gerr, "roof_read_new: ");
+
+ clock_gettime(CLOCK_REALTIME, &priv->last_fragment_timestamp);
+}
+
+static void
+ufo_roof_build_task_finalize (GObject *object)
+{
+ UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (object);
+
+ if (priv->rd) {
+ ufo_roof_read_free(priv->rd);
+ priv->rd = NULL;
+ }
+
+ if (priv->buf) {
+ ufo_roof_buffer_free(priv->buf);
+ priv->buf = NULL;
+ }
+
+ if (priv->cfg) {
+ ufo_roof_config_free(priv->cfg);
+ priv->cfg = NULL;
+ }
+
+ if (priv->config) {
+ g_free(priv->config);
+ priv->config = NULL;
+ }
+
+ if (priv->path) {
+ g_free(priv->path);
+ priv->path = NULL;
+ }
+
+ G_OBJECT_CLASS (ufo_roof_build_task_parent_class)->finalize (object);
+}
+
+
+
+static void
+ufo_roof_build_task_get_requisition (UfoTask *task,
+ UfoBuffer **inputs,
+ UfoRequisition *requisition,
+ GError **error)
+{
+ UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task);
+
+ // FIXME: Can we handle data types more elegant?
+ if (priv->build == BUILD_RAW) {
+ guint bytes = priv->cfg->dataset_size;
+ requisition->n_dims = 1;
+ requisition->dims[0] = bytes / sizeof(float) + ((bytes%sizeof(float))?1:0);
+ } else if (priv->build == BUILD_SINO) {
+ guint bytes = priv->cfg->fan_bins * priv->cfg->bit_depth / 8;
+ requisition->n_dims = 2;
+ requisition->dims[0] = bytes / sizeof(float) + ((bytes%sizeof(float))?1:0);
+ requisition->dims[1] = priv->cfg->fan_projections;
+ } else if (priv->build == BUILD_UFO) {
+ requisition->n_dims = 2;
+ requisition->dims[0] = priv->cfg->fan_bins;
+ requisition->dims[1] = priv->cfg->fan_projections;
+ }
+}
+
+static guint
+ufo_roof_build_task_get_num_inputs (UfoTask *task)
+{
+ return 1;
+}
+
+static guint
+ufo_roof_build_task_get_num_dimensions (UfoTask *task,
+ guint input)
+{
+ return 1;
+}
+
+static UfoTaskMode
+ufo_roof_build_task_get_mode (UfoTask *task)
+{
+ return UFO_TASK_MODE_CPU | UFO_TASK_MODE_GENERATOR;
+}
+
+
+static gboolean
+ufo_roof_build_task_generate (UfoTask *task,
+ UfoBuffer *output,
+ UfoRequisition *requisition)
+{
+ gboolean ready = FALSE;
+ gulong seqid;
+ GError *gerr = NULL;
+ GValue ival = G_VALUE_INIT;
+ GValue lval = G_VALUE_INIT;
+
+ UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task);
+ UfoRoofConfig *cfg = priv->cfg;
+ UfoRoofBuffer *buf = priv->buf;
+
+ void *output_buffer = ufo_buffer_get_host_array(output, NULL);
+
+ if (priv->stop)
+ return FALSE;
+
+ // FIXME: Wait or break. Not both.
+ do {
+ ready = ufo_roof_buffer_wait_dataset(buf, output_buffer, &seqid, cfg->network_timeout, &gerr);
+ if (gerr) roof_print_error(gerr);
+
+ if (!ready) {
+ ready = ufo_roof_buffer_skip_to_ready(buf);
+ if (!ready) {
+ priv->stop = TRUE;
+ g_object_notify_by_pspec (G_OBJECT(task), properties[PROP_STOP]);
+ return FALSE;
+ }
+ }
+ } while (!ready);
+
+ // FIXME: integrate fastwriter somewhere here?
+
+ if (priv->build == BUILD_UFO) {
+ switch (cfg->bit_depth) {
+ case 8:
+ ufo_buffer_convert(output, UFO_BUFFER_DEPTH_8U);
+ break;
+ case 16:
+ ufo_buffer_convert(output, UFO_BUFFER_DEPTH_16U);
+ break;
+ case 32:
+ ufo_buffer_convert(output, UFO_BUFFER_DEPTH_32U);
+ break;
+ default:
+ printf("Usupported bit-depth %u\n", cfg->bit_depth);
+ }
+ }
+
+ // Metadata: plane and sequential number within the plane
+ g_value_init (&ival, G_TYPE_UINT);
+ g_value_init (&lval, G_TYPE_ULONG);
+ if (priv->build != BUILD_UFO) {
+ g_value_set_uint (&ival, cfg->bit_depth);
+ ufo_buffer_set_metadata (output, "bpp", &ival);
+ }
+ g_value_set_uint (&ival, 1 + seqid % cfg->n_planes);
+ ufo_buffer_set_metadata (output, "plane", &ival);
+ g_value_set_ulong (&lval, seqid / cfg->n_planes);
+ ufo_buffer_set_metadata (output, "plane_id", &lval);
+ g_value_set_ulong (&lval, seqid);
+ ufo_buffer_set_metadata (output, "seqid", &lval);
+ g_value_unset(&lval);
+ g_value_unset(&ival);
+
+ // FIXME: Or shall we start from counting from the ID of the first registerd dataset
+ if ((priv->number)&&(buf->current_id >= priv->number)) {
+// printf("%lu datasets processed, stopping\n", buf->current_id);
+ priv->stop = TRUE;
+ g_object_notify_by_pspec (G_OBJECT(task), properties[PROP_STOP]);
+ }
+
+ if (ready) priv->generated++;
+
+ if (((priv->number > 0)&&(priv->number <= 100))||((buf->current_id - priv->announced) > 1000)) {
+ if (ready)
+ printf("Processing dataset %li (ready ), next : %u out of %u\n", buf->current_id, buf->n_fragments[buf->current_id%buf->ring_size], buf->fragments_per_dataset);
+ else
+ printf("Skipping dataset %li (timeout), acquired: %u out of %u\n", buf->current_id + 1, buf->n_fragments[buf->current_id%buf->ring_size], buf->fragments_per_dataset);
+ priv->announced = buf->current_id;
+ }
+
+ return ready;
+}
+
+static void
+ufo_roof_build_task_set_property (GObject *object,
+ guint property_id,
+ const GValue *value,
+ GParamSpec *pspec)
+{
+ UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (object);
+
+ switch (property_id) {
+ case PROP_CONFIG:
+ if (priv->config) g_free(priv->config);
+ priv->config = g_value_dup_string(value);
+ break;
+ case PROP_STOP:
+ priv->stop = g_value_get_boolean (value);
+ break;
+ case PROP_SIMULATE:
+ priv->simulate = g_value_get_boolean (value);
+ break;
+ case PROP_PATH:
+ if (priv->path) g_free(priv->path);
+ priv->path = g_value_dup_string(value);
+ break;
+ case PROP_FIRST:
+ priv->first_file_number = g_value_get_uint (value);
+ break;
+ case PROP_NUMBER:
+ priv->number = g_value_get_uint (value);
+ break;
+ case PROP_BUILD:
+ priv->build = g_value_get_enum (value);
+ if ((priv->build == BUILD_AUTO)&&(priv->cfg)) {
+ if (priv->cfg->roof_mode) priv->build = BUILD_SINO;
+ else priv->build = BUILD_RAW;
+ }
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
+ break;
+ }
+}
+
+static void
+ufo_roof_build_task_get_property (GObject *object,
+ guint property_id,
+ GValue *value,
+ GParamSpec *pspec)
+{
+ UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (object);
+
+ switch (property_id) {
+ case PROP_CONFIG:
+ g_value_set_string(value, priv->config);
+ break;
+ case PROP_STOP:
+ g_value_set_boolean (value, priv->stop);
+ break;
+ case PROP_SIMULATE:
+ g_value_set_boolean (value, priv->simulate);
+ break;
+ case PROP_PATH:
+ g_value_set_string(value, priv->path?priv->path:"");
+ break;
+ case PROP_FIRST:
+ g_value_set_uint (value, priv->first_file_number);
+ break;
+ case PROP_NUMBER:
+ g_value_set_uint (value, priv->number);
+ break;
+ case PROP_BUILD:
+ g_value_set_enum (value, priv->build);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
+ break;
+ }
+}
+
+static void
+ufo_task_interface_init (UfoTaskIface *iface)
+{
+ iface->setup = ufo_roof_build_task_setup;
+ iface->get_num_inputs = ufo_roof_build_task_get_num_inputs;
+ iface->get_num_dimensions = ufo_roof_build_task_get_num_dimensions;
+ iface->get_mode = ufo_roof_build_task_get_mode;
+ iface->get_requisition = ufo_roof_build_task_get_requisition;
+ iface->generate = ufo_roof_build_task_generate;
+}
+
+static void
+ufo_roof_build_task_class_init (UfoRoofBuildTaskClass *klass)
+{
+ GObjectClass *oclass = G_OBJECT_CLASS (klass);
+
+ oclass->set_property = ufo_roof_build_task_set_property;
+ oclass->get_property = ufo_roof_build_task_get_property;
+ oclass->finalize = ufo_roof_build_task_finalize;
+
+ properties[PROP_CONFIG] =
+ g_param_spec_string ("config",
+ "ROOF configuration",
+ "Path to ROOF configuration file",
+ "",
+ G_PARAM_READWRITE);
+
+ properties[PROP_STOP] =
+ g_param_spec_boolean ("stop",
+ "Stop flag",
+ "Stop socket servers and terminates filter execution",
+ FALSE,
+ G_PARAM_READWRITE);
+
+ properties[PROP_SIMULATE] =
+ g_param_spec_boolean ("simulate",
+ "Simulation mode",
+ "Read data from the specified files instead of network",
+ FALSE,
+ G_PARAM_READWRITE);
+
+ properties[PROP_PATH] =
+ g_param_spec_string ("path",
+ "Input files for simulation mode",
+ "Optional path to input files for simulation mode (parameter from configuration file is used if not specified)",
+ "",
+ G_PARAM_READWRITE);
+
+ properties[PROP_FIRST] =
+ g_param_spec_uint ("first_file_number",
+ "Offset to the first read file",
+ "Offset to the first read file",
+ 0, G_MAXUINT, 0,
+ G_PARAM_READWRITE);
+
+ properties[PROP_NUMBER] =
+ g_param_spec_uint("number",
+ "Number of datasets to receive",
+ "Number of datasets to receive",
+ 0, G_MAXUINT, 0,
+ G_PARAM_READWRITE);
+
+ properties[PROP_BUILD] =
+ g_param_spec_enum ("build",
+ "Build type (\"raw\", \"sino\", \"ufo\")",
+ "Build type (\"raw\" - raw data, \"sino\" - arrange in sinogram, \"ufo\" - arrange in sinogram and convert UFO floating-point format)",
+ g_enum_register_static ("build", build_values),
+ 0, G_PARAM_READWRITE);
+
+
+ for (guint i = PROP_0 + 1; i < N_PROPERTIES; i++)
+ g_object_class_install_property (oclass, i, properties[i]);
+
+ g_type_class_add_private (oclass, sizeof(UfoRoofBuildTaskPrivate));
+}
+
+static void
+ufo_roof_build_task_init(UfoRoofBuildTask *self)
+{
+ self->priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE(self);
+}
diff --git a/src/save/ufo-roof-read-thread.c b/src/save/ufo-roof-read-thread.c
new file mode 100644
index 0000000..60868d2
--- /dev/null
+++ b/src/save/ufo-roof-read-thread.c
@@ -0,0 +1,155 @@
+#include <stdio.h>
+#include <threads.h>
+
+#include <ufo/ufo.h>
+
+#include "ufo-roof-buffer.h"
+#include "ufo-roof-read-thread.h"
+#include "ufo-roof-read.h"
+
+
+UfoRoofReadThread *guint ufo_roof_read_thread_new(UfoRoofRead *rd, guint from, guint to, GError **error) {
+ int i;
+ GError *gerr = NULL;
+
+ UfoRoofReadThread *thr = (UfoRoofReadThread*)calloc(1, sizeof(UfoRoofReadThread));
+ if (!ctx) roof_new_error(error, "Can't allocate UfoRoofReadThread context");
+
+ thr->rdbuf = malloc(cfg->max_packets * cfg->max_packet_size);
+ if (!thr->rdbuf) {
+ ufo_roof_read_thread_free(thr);
+ roof_new_error(error, "Can't allocate memory for temporary packet receiver buffer");
+ }
+
+ thr->rd = rd;
+ thr->from = from;
+ thr->to = to;
+
+
+ return thr;
+
+}
+
+void ufo_roof_read_thread_free(UFORoofReadThread *thr, GError **error) {
+ if (!thr) return;
+ if (thr->rdbuf) free(thr->rdbuf);
+
+ ufo_roof_thread_stop(thr, error);
+ free(thr);
+}
+
+static int ufo_roof_read_thread_run(void *arg) {
+ GError *gerr = NULL;
+
+ UfoRoofReadThread *thr = (UfoRoofReadThread*)arg;
+
+ UfoRoofConfig *cfg = thr->rd->cfg;
+ UfoRoofBuffer *buf = thr->rd->buf;
+ UfoRoofReadInterface *rdi = thr->rd->rdi;
+
+ guint from = thr->from;
+ guint to = thr->to;
+
+ void *rdbuf = thr->rdbuf;
+
+ uint64_t current_id[to - from] = {0};
+ uint64_t grabbed[to - from] = {0};
+
+ static uint64_t errors = 0;
+
+ while (thr->op != UFO_ROOF_OP_STOP) {
+ for (guint sid = from; sid < to; sid++) {
+ // FIXME break waiting on stop? If no packets are send
+ guint packets = rdi[sid]->read(priv->reader[sid], rdbuf, &gerr);
+ if (gerr) roof_print_error(gerr);
+
+ guint ready = false;
+ const uint8_t *fragment = (uint8_t*)rdbuf;
+ for (guint i = 0; i < packets; i++) {
+ guint64 packet_id = 0;
+
+ // Otherwise considered consecutive and handled by the buffer
+ if (cfg->header_size >= sizeof(UfoRoofPacketHeader)) {
+ UfoRoofPacketHeader *pheader = UFO_ROOF_PACKET_HEADER(fragment);
+ packet_id = be64toh(pheader->packet_id) + 1;
+ }
+
+#ifdef UFO_ROOF_DEBUG
+ if ((current_id[sid - from])&&(current_id[sid - from] + 1 != packet_id)) {
+ printf("Channel %i(%i): =======> Missing %lu packets, expecting %lu but got %lu (N %i from total packets in pack %u)\n", priv->id * cfg->sockets_per_thread + sid, sid, packet_id - current_id[sid] - 1, current_id[sid] + 1, packet_id, i, packets);
+ //if (++errors > 1000) exit(1);
+ }
+
+ current_id[sid - from] = packet_id;
+ grabbed[sid - from]++;
+ if ((grabbed[sid - from]%1000000)==0)
+ printf("Channel %i: Grabbed %lu Mpackets\n", sid, grabbed[sid - from]/1000000);
+#endif
+
+ ready |= ufo_roof_buffer_set_fragment(buf, sid, packet_id, fragment, &gerr);
+ if (gerr) roof_print_error(gerr);
+
+ fragment += cfg->max_packet_size;
+ } // fragment-loop
+
+ // send notification? Broadcast blocks, we don't want it.
+ if (ready) {
+ }
+
+ } // socket-loop
+ } // operation-loop
+
+
+#ifdef UFO_ROOF_DEBUG
+ // Store first received packet on each channel...
+ static int debug = 1;
+ if (debug) {
+ char fname[256];
+ sprintf(fname, "channel%i_packet0.raw", priv->id);
+ FILE *f = fopen(fname, "w");
+ if (f) {
+ fwrite(output_buffer, 1, cfg->max_packets * cfg->max_packet_size, f);
+ fclose(f);
+ }
+ debug = 0;
+ }
+#endif /* UFO_ROOF_DEBUG */
+
+ // FIXME: End of data (shall we restart in the network case?)
+// if (!packets)
+// return FALSE;
+
+ // Shall I use UFO metadata (ufo_buffer_set_metadata) insead?
+ header->channel_id = priv->id;
+// header->n_packets = packets;
+
+ return TRUE;
+}
+
+
+}
+
+gboolean ufo_roof_read_thread_start(UFORoofReadThread *thr, GError **error) {
+ int err;
+ if (!thr) return FALSE;
+
+ err = thrd_create(&thr->thread, ufo_roof_read_thread_run, thr);
+ if (err != thrd_success) roof_setup_error_with_retval(error, FALSE, "Error (%i) spawning new read thread", err);
+
+ ctx->launched = TRUE;
+ return TRUE;
+}
+
+gboolean ufo_roof_read_thread_stop(UFORoofReadThread *thr, GError **error) {
+ int err, ret;
+ if (!thr) return FALSE;
+ if (!thr->launched) return TRUE;
+
+ // Signal thread termination
+
+ err = thrd_join(&thr->thread, &ret);
+ if (err != thrd_success) roof_setup_error_with_retval(error, FALSE, "Error (%i) waiting for read thread termination", err);
+
+ return TRUE;
+}
+
diff --git a/src/save/ufo-roof-read-thread.h b/src/save/ufo-roof-read-thread.h
new file mode 100644
index 0000000..ebe8989
--- /dev/null
+++ b/src/save/ufo-roof-read-thread.h
@@ -0,0 +1,23 @@
+#ifndef __UFO_ROOF_READ_THREAD_H
+#define __UFO_ROOF_READ_THREAD_H
+
+typedef struct _UfoRoofReadThread UfoRoofReadThread;
+
+#include "ufo-roof-read.h"
+
+struct _UfoRoofReadThread {
+ UfoRoofRead *rd; // ROOF Reader Cotext
+ guint from, to; // Determines ports/files which are read by this thread (from is inclusive and to - exclusive)
+
+ gboolean launched; // Flag indicating if thread is launched
+ thrd_t thread; // Thread ID
+};
+
+/*
+UfoRoofReadThread *guint ufo_roof_read_thread_new(UfoRoofRead *rd, guint from, guint to, GError **error);
+void ufo_roof_read_thread_free(UFORoofReadThread *thr, GError **error);
+gboolean ufo_roof_read_thread_start(UFORoofReadThread *thr, GError **error);
+gboolean ufo_roof_read_thread_stop(UFORoofReadThread *thr, GError **error);
+*/
+
+#endif /* __UFO_ROOF_READ_THREAD_H */ \ No newline at end of file
diff --git a/src/save/ufo-roof-read.c b/src/save/ufo-roof-read.c
new file mode 100644
index 0000000..f3d790d
--- /dev/null
+++ b/src/save/ufo-roof-read.c
@@ -0,0 +1,123 @@
+#include <stdio.h>
+#include <assert.h>
+
+#include <ufo/ufo.h>
+
+#include "ufo-roof-buffer.h"
+#include "ufo-roof-read-thread.h"
+#include "ufo-roof-read.h"
+
+
+
+#include <errno.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+
+static void ufo_roof_read_optimize(UfoRoofConfig *cfg) {
+ // FIXME: request real-time permissions?
+ // FIXME: do we need this?
+/*
+ uint32_t lat = 0;
+ int fd = open("/dev/cpu_dma_latency", O_RDWR);
+ if (fd == -1) {
+ fprintf(stderr, "Failed to open cpu_dma_latency: error %s", strerror(errno));
+ } else {
+ write(fd, &lat, sizeof(lat));
+ close(fd);
+ }
+*/
+}
+
+
+const UfoRoofReadInterfaceSettings *ufo_roof_read_get_settings(UFORoofRead *ctx, GError **error) {
+ assert(ctx);
+ return ctx->settings;
+}
+
+
+UfoRoofRead *ufo_roof_read_new(UfoRoofConfig *cfg, UfoRoofReadInterface *rdi, UfoRoofBuffer *buf, GError **error) {
+ guint i;
+ GError *gerr = NULL;
+
+ UfoRoofRead *ctx = (UfoRoofRead*)calloc(1, sizeof(UfoRoofRead));
+ if (!ctx) roof_new_error(error, "Can't allocate UfoRoofRead context");
+
+ ufo_roof_read_optimize(ctx);
+
+ ctx->n_threads = cfg->n_read_threads;
+ ctx->cfg = cfg;
+ ctx->buf = buf;
+ ctx->rdi = rdi;
+
+ ctx->thr = (UfoRoofReadThread*)calloc(cfg->n_read_threads, sizeof(UfoRoofReadThread));
+ if (!ctx->thr) {
+ ufo_roof_read_free(ctx);
+ roof_new_error(error, "Error allocating UfoRoofReadThread contexts");
+ }
+
+ // We try to distribute sockets uniformly respecting sockets_per_thread as maximum limit
+ guint n_threads = priv->cfg->n_streams / priv->cfg->sockets_per_thread;
+ if (priv->cfg->n_streams % priv->cfg->sockets_per_thread) n_threads++;
+ ctx->n_threads = n_threads;
+
+ guint extra = 0, sockets_per_thread = priv->cfg->n_streams / n_threads;
+ if (priv->cfg->n_streams % n_threads) extra = priv->cfg->n_streams - n_threads * sockets_per_thread;
+
+ guint from, to;
+ for (i = 0, from = 0; i < n_threads; i++, from = to) {
+ guint to = from + sockets_per_thread;
+ if (i < extra) to++;
+
+ ctx->thr[i]= ufo_roof_thread_new(ctx, from, to, &gerr);
+ if (!ctx->thr[i]) roof_propagate_error(error, gerr, "ufo_roof_thread_new (%i): ", i);
+ }
+
+ return ctx;
+}
+
+void ufo_roof_read_free(UfoRoofRead *ctx) {
+ if (!ctx) return;
+
+ if (ctx->thr) {
+ int i;
+ ufo_roof_read_stop(ctx);
+ for (i = 0; i < ctx->n_threads; i++) {
+ if (ctx->thr[i])
+ ufo_roof_read_thread_free(ctx->thr[i]);
+ }
+ free(ctx->thr);
+ }
+ free(ctx);
+}
+
+gboolean ufo_roof_read_start(UFORoofRead *ctx, GError **error) {
+ gboolean ret;
+ GError *gerr;
+
+ if ((!ctx)||(!ctx->thr)) return FALSE;
+
+ for (int i = 0; i < ctx->n_threads; i++) {
+ if (!ctx->thr[i]) return FALSE;
+ ret = ufo_roof_read_thread_start(ctx, &gerr);
+ if (!ret) roof_propagate_error_with_retval(error, FALSE, gerr, "ufo_roof_read_thread_start (%i): ", i);
+ }
+ return TRUE;
+}
+
+gboolean ufo_roof_read_stop(UFORoofRead *ctx, GError **error) {
+ gboolean ret, res = FALSE;
+ GError *gerr;
+
+ if ((!ctx)||(!ctx->thr)) return FALSE;
+
+ for (int i = 0; i < ctx->n_threads; i++) {
+ if (!ctx->thr[i]) return FALSE;
+ ret = ufo_roof_read_thread_stop(ctx, &gerr);
+ if (!ret) g_propagate_perfixed_error(error, gerr, "ufo_roof_read_thread_stop (%i): ", i);
+ res |= !ret;
+ }
+ return !res;
+}
+
+
diff --git a/src/save/ufo-roof-read.h b/src/save/ufo-roof-read.h
new file mode 100644
index 0000000..16e910b
--- /dev/null
+++ b/src/save/ufo-roof-read.h
@@ -0,0 +1,61 @@
+#ifndef __UFO_ROOF_READ_H
+#define __UFO_ROOF_READ_H
+
+typedef struct _UfoRoofRead UfoRoofRead;
+
+#include "ufo-roof-config.h"
+#include "ufo-roof-buffer.h"
+//#include "ufo-roof-read-thread.h"
+
+G_BEGIN_DECLS
+
+typedef struct _UfoRoofReadInterface UfoRoofReadInterface;
+typedef struct _UfoRoofReadInterfaceSettings UfoRoofReadInterfaceSettings;
+
+//typedef guint (*UfoRoofReaderRead)(UfoRoofReadInterface *reader, uint8_t *buf, GError **error);
+
+typedef guint (*UfoRoofReaderRead)(UfoRoofReadInterface *reader, uint8_t **buf, GError **error);
+typedef void (*UfoRoofReaderClose)(UfoRoofReadInterface *reader);
+
+struct _UfoRoofReadInterfaceSettings {
+ guint padding; // Packet size + padding
+};
+
+struct _UfoRoofReadInterface {
+ UfoRoofReaderRead read;
+ UfoRoofReaderClose close;
+
+ UfoRoofReadInterfaceSettings settings;
+};
+
+typedef enum {
+ UFO_ROOF_OP_STOP = 0,
+ UFO_ROOF_OP_READ
+} UfoRoofOp;
+
+struct _UfoRoofReadContext {
+ UfoRoofConfig *cfg;
+ UfoRoofBuffer *buf;
+ UfoRoofReadInterface *rdi;
+
+ gulong packet
+// UfoRoofReadThread *thr;
+
+ guint n_threads;
+ UfoRoofOp op; // Current operation (reading by default)
+};
+
+
+
+/*
+UfoRoofRead *ufo_roof_read_new(UfoRoofConfig *cfg, UfoRoofReadInterface *rdi, UfoRoofBuffer *buf, GError **error);
+void ufo_roof_read_free(UfoRoofRead *ctx);
+gboolean ufo_roof_read_start(UFORoofRead *ctx, GError **error);
+gboolean ufo_roof_read_stop(UFORoofRead *ctx, GError **error);
+
+const UfoRoofReadInterfaceSettings *ufo_roof_read_get_settings(UFORoofRead *ctx, GError **error);
+*/
+
+G_END_DECLS
+
+#endif /* __UFO_ROOF_READ_H */
diff --git a/src/ufo-roof-build-task.c b/src/ufo-roof-build-task.c
index a39ed11..ee1cec5 100644
--- a/src/ufo-roof-build-task.c
+++ b/src/ufo-roof-build-task.c
@@ -19,6 +19,7 @@
#include <stdio.h>
#include <endian.h>
+#include <threads.h>
#ifdef __APPLE__
#include <OpenCL/cl.h>
@@ -26,10 +27,16 @@
#include <CL/cl.h>
#endif
+#include "hw_sched.h"
+
#include "ufo-roof.h"
+#include "ufo-roof-read.h"
#include "ufo-roof-buffer.h"
+#include "ufo-roof-read-socket.h"
+#include "ufo-roof-read-file.h"
#include "ufo-roof-build-task.h"
+
typedef enum {
BUILD_AUTO = 0,
BUILD_RAW,
@@ -37,10 +44,16 @@ typedef enum {
BUILD_UFO
} BuildType;
-struct _UfoRoofBuildTaskPrivate {
+struct _RoofBuildTaskPrivate {
gchar *config; // ROOF configuration file name
- UfoRoofConfig *cfg; // Parsed ROOF parameters
- UfoRoofBuffer *buf; // Ring buffer for incomming UDP packet
+ RoofConfig *cfg; // Parsed ROOF parameters
+// RoofBuffer *buf; // Ring buffer for incomming UDP packet
+ RoofReadInterface **rdi; // Reader interfaces, one per socket (no threading)
+// RoofRead *rd; // Threading interface
+ HWSched sched;
+
+ gchar *path; // UFO file path for simulation mode
+ guint first_file_number; // Number of a first simulated file (0 or 1)
BuildType build; // What dataset do we build: ROOF sinogram or raw network data
guint number; // Number of datasets to read
@@ -48,17 +61,21 @@ struct _UfoRoofBuildTaskPrivate {
gboolean simulate; // Indicates if we are running in network or simulation modes
guint64 announced; // For debugging
+ guint64 generated; // Total number for control
+
+ guint n_threads; // Number of schedulled threads
+
struct timespec last_fragment_timestamp;
};
static void ufo_task_interface_init (UfoTaskIface *iface);
-G_DEFINE_TYPE_WITH_CODE (UfoRoofBuildTask, ufo_roof_build_task, UFO_TYPE_TASK_NODE,
+G_DEFINE_TYPE_WITH_CODE (RoofBuildTask, ufo_roof_build_task, UFO_TYPE_TASK_NODE,
G_IMPLEMENT_INTERFACE (UFO_TYPE_TASK,
ufo_task_interface_init))
-#define UFO_ROOF_BUILD_TASK_GET_PRIVATE(obj) (G_TYPE_INSTANCE_GET_PRIVATE((obj), UFO_TYPE_ROOF_BUILD_TASK, UfoRoofBuildTaskPrivate))
+#define UFO_ROOF_BUILD_TASK_GET_PRIVATE(obj) (G_TYPE_INSTANCE_GET_PRIVATE((obj), UFO_TYPE_ROOF_BUILD_TASK, RoofBuildTaskPrivate))
@@ -74,6 +91,8 @@ enum {
PROP_0,
PROP_STOP,
PROP_SIMULATE,
+ PROP_PATH,
+ PROP_FIRST,
PROP_NUMBER,
PROP_BUILD,
PROP_CONFIG,
@@ -93,9 +112,10 @@ ufo_roof_build_task_setup (UfoTask *task,
UfoResources *resources,
GError **error)
{
+ guint i;
GError *gerr = NULL;
- UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task);
+ RoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task);
if (!priv->config)
roof_setup_error(error, "ROOF configuration is not specified");
@@ -104,15 +124,65 @@ ufo_roof_build_task_setup (UfoTask *task,
if (!priv->cfg)
roof_propagate_error(error, gerr, "roof-build-setup: ");
+ priv->rdi = (RoofReadInterface**)calloc(priv->cfg->n_streams, sizeof(RoofReadInterface*));
+ if (!priv->rdi)
+ roof_setup_error(error, "Failed to allocate memory for RoofReadInterface array");
+
+
+ for (i = 0; i < priv->cfg->n_streams; i++) {
+ if (priv->simulate) {
+ if (!priv->path)
+ roof_setup_error(error, "Path to simulated data should be specified");
+
+ priv->rdi[i] = ufo_roof_read_file_new(priv->cfg, priv->path, priv->first_file_number + i, &gerr);
+ } else
+ priv->rdi[i] = ufo_roof_read_socket_new(priv->cfg, i, &gerr);
+
+ if (!priv->rdi[i])
+ roof_propagate_error(error, gerr, "roof_read_interface_new: ");
+
+ priv->rdc[i] = ufo_roof_read_context_new(priv->cfg, priv->rdi[i], &gerr);
+ if (!priv->rdc[i])
+ roof_propagate_error(error, gerr, "roof_read_context_new: ");
+ }
+
+
+ // We try to distribute sockets uniformly respecting sockets_per_thread as maximum limit
+ priv->n_threads = priv->cfg->n_streams / priv->cfg->sockets_per_thread;
+ if (priv->cfg->n_streams % priv->cfg->sockets_per_thread) priv->n_threads++;
+
+ guint extra = 0, sockets_per_thread = priv->cfg->n_streams / priv->n_threads;
+ if (priv->cfg->n_streams % priv->n_threads) extra = priv->cfg->n_streams - priv->n_threads * sockets_per_thread;
+
+ guint from, to;
+ for (i = 0; i < priv->n_threads; i++) {
+ guint to = from + sockets_per_thread;
+ if (i < extra) to++;
+
+ ctx->thr[i]= ufo_roof_thread_new(priv->cfg, priv->rdc, from, to, &gerr);
+ if (!ctx->thr[i]) roof_propagate_error(error, gerr, "ufo_roof_thread_new (%i): ", i);
+ }
+
if (priv->build == BUILD_AUTO) {
if (priv->cfg->roof_mode) priv->build = BUILD_SINO;
else priv->build = BUILD_RAW;
g_object_notify_by_pspec (G_OBJECT(task), properties[PROP_BUILD]);
}
+/*
priv->buf = ufo_roof_buffer_new(priv->cfg, (priv->build == BUILD_RAW)?1:2, priv->number, &gerr);
- if (!priv->buf)
- roof_propagate_error(error, gerr, "roof-build-setup: ");
+ if ((gerr)||(!priv->buf))
+ roof_propagate_error(error, gerr, "roof_buffer_new: ");
+
+ priv->rd = ufo_roof_read_new(priv->cfg, priv->rdi, priv->buf, &gerr);
+ if (gerr)
+ roof_propagate_error(error, gerr, "roof_read_new: ");
+*/
+
+ priv->sched = hw_sched_create(priv->cfg->n_read_threads);
+ if (!priv->sched)
+ roof_setup_error(error, "Failed to schedule builder threads");
+
clock_gettime(CLOCK_REALTIME, &priv->last_fragment_timestamp);
}
@@ -120,12 +190,34 @@ ufo_roof_build_task_setup (UfoTask *task,
static void
ufo_roof_build_task_finalize (GObject *object)
{
- UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (object);
-
+ RoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (object);
+
+ if (priv->sched) {
+ hw_sched_destroy(priv->sched);
+ priv->sched = NULL;
+ }
+
+/*
+ if (priv->rd) {
+ ufo_roof_read_free(priv->rd);
+ priv->rd = NULL;
+ }
+
if (priv->buf) {
ufo_roof_buffer_free(priv->buf);
priv->buf = NULL;
}
+*/
+
+ if (priv->rdi) {
+ guint i;
+ for (i = 0; i < priv->cfg->n_streams; i++) {
+ if (priv->rdi[i])
+ priv->rdi[i]->close(priv->rdi[i]);
+ }
+ free(priv->rdi);
+ priv->rdi = NULL;
+ }
if (priv->cfg) {
ufo_roof_config_free(priv->cfg);
@@ -137,6 +229,10 @@ ufo_roof_build_task_finalize (GObject *object)
priv->config = NULL;
}
+ if (priv->path) {
+ g_free(priv->path);
+ priv->path = NULL;
+ }
G_OBJECT_CLASS (ufo_roof_build_task_parent_class)->finalize (object);
}
@@ -149,9 +245,10 @@ ufo_roof_build_task_get_requisition (UfoTask *task,
UfoRequisition *requisition,
GError **error)
{
- UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task);
+ RoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task);
// FIXME: Can we handle data types more elegant?
+ // FIXME: Kill BUILD_RAW ?
if (priv->build == BUILD_RAW) {
guint bytes = priv->cfg->dataset_size;
requisition->n_dims = 1;
@@ -184,100 +281,56 @@ ufo_roof_build_task_get_num_dimensions (UfoTask *task,
static UfoTaskMode
ufo_roof_build_task_get_mode (UfoTask *task)
{
- return UFO_TASK_MODE_CPU | UFO_TASK_MODE_REDUCTOR;
+ return UFO_TASK_MODE_CPU | UFO_TASK_MODE_GENERATOR;
}
+
static gboolean
-ufo_roof_build_task_process (UfoTask *task,
- UfoBuffer **inputs,
+ufo_roof_build_task_generate (UfoTask *task,
UfoBuffer *output,
UfoRequisition *requisition)
{
- GError *gerr = NULL;
-
- UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task);
- UfoRoofConfig *cfg = priv->cfg;
- UfoRoofBuffer *buf = priv->buf;
gboolean ready = FALSE;
+ gulong seqid;
+ GError *gerr = NULL;
+ GValue ival = G_VALUE_INIT;
+ GValue lval = G_VALUE_INIT;
-// UfoRequisition in_req;
-// ufo_buffer_get_requisition (inputs[0], &in_req);
+ RoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task);
+ RoofConfig *cfg = priv->cfg;
+ RoofBuffer *buf = priv->buf;
- const uint8_t *data = (uint8_t*)ufo_buffer_get_host_array(inputs[0], NULL);
- UfoRoofPacketBlockHeader *header = UFO_ROOF_PACKET_BLOCK_HEADER(data, cfg);
+ void *output_buffer = ufo_buffer_get_host_array(output, NULL);
if (priv->stop)
return FALSE;
- const uint8_t *fragment = data;
- for (guint i = 0; i < header->n_packets; i++) {
- guint64 packet_id = 0;
- // Otherwise considered consecutive and handled by the buffer
- if (cfg->header_size >= sizeof(UfoRoofPacketHeader)) {
- UfoRoofPacketHeader *pheader = UFO_ROOF_PACKET_HEADER(fragment);
- packet_id = be64toh(pheader->packet_id) + 1;
- }
+ priv->current_dataset;
+ priv->current_buffer = output_buffer;
- // FIXME: Can we kill here the dataset finished during the previous step of iteration
- ready |= ufo_roof_buffer_set_fragment(buf, header->channel_id, packet_id, fragment + cfg->header_size, &gerr);
- if (gerr) roof_print_error(gerr);
+ err = hw_sched_schedule_thread_task(sched, (void*)&tnv_ctx, ufo_roof_build_task_read);
+ if (!err) err = hw_sched_wait_task(sched);
+ if (err) { fprintf(stderr, "Error %i scheduling init threads", err); exit(-1); }
- fragment += cfg->max_packet_size;
- }
-
- // FIXME: if 2nd dataset is ready (2nd and 3rd?), skip the first one?
-
- if (ready) {
- clock_gettime(CLOCK_REALTIME, &priv->last_fragment_timestamp);
- } else {
- struct timespec current_time;
- clock_gettime(CLOCK_REALTIME, &current_time);
-
- // No new accepted events within timeout
- if (((current_time.tv_sec - priv->last_fragment_timestamp.tv_sec) * 1000000 + (current_time.tv_nsec - priv->last_fragment_timestamp.tv_nsec) / 1000) > cfg->network_timeout) {
- ready = ufo_roof_buffer_skip_to_ready(buf);
- if (ready) {
- // FIXME: shall we really reset timer here?
- clock_gettime(CLOCK_REALTIME, &priv->last_fragment_timestamp);
- } else {
+/*
+ // FIXME: Wait or break. Not both.
+ do {
+ ready = ufo_roof_buffer_wait_dataset(buf, output_buffer, &seqid, cfg->network_timeout, &gerr);
+ if (gerr) roof_print_error(gerr);
+
+ if (!ready) {
+ ready = ufo_roof_buffer_skip_to_ready(buf);
+ if (!ready) {
priv->stop = TRUE;
g_object_notify_by_pspec (G_OBJECT(task), properties[PROP_STOP]);
- }
+ return FALSE;
+ }
}
- }
-
-
-/*
- printf("proc (%s - %u of %u) - channel: %i, packets: %i, first dataset: %i\n", ready?"yes":" no", buf->n_fragments[(buf->current_id)%buf->ring_size], buf->fragments_per_dataset, header->channel_id, header->n_packets,
- (cfg->header_size >= sizeof(UfoRoofPacketHeader))?UFO_ROOF_PACKET_HEADER(data)->packet_id / (cfg->dataset_size / cfg->payload_size / cfg->n_streams):0);
+ } while (!ready);
*/
- return !ready;
-}
-
-static gboolean
-ufo_roof_build_task_generate (UfoTask *task,
- UfoBuffer *output,
- UfoRequisition *requisition)
-{
- gboolean ready = FALSE;
- gulong seqid;
- GError *gerr = NULL;
- GValue ival = G_VALUE_INIT;
- GValue lval = G_VALUE_INIT;
-
- UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task);
- UfoRoofConfig *cfg = priv->cfg;
- UfoRoofBuffer *buf = priv->buf;
-
- void *output_buffer = ufo_buffer_get_host_array(output, NULL);
-
- if (priv->stop)
- return FALSE;
-
- ready = ufo_roof_buffer_get_dataset(buf, output_buffer, &seqid, &gerr);
- if (gerr) roof_print_error(gerr);
+ // FIXME: integrate fastwriter somewhere here?
if (priv->build == BUILD_UFO) {
switch (cfg->bit_depth) {
@@ -318,8 +371,13 @@ ufo_roof_build_task_generate (UfoTask *task,
g_object_notify_by_pspec (G_OBJECT(task), properties[PROP_STOP]);
}
+ if (ready) priv->generated++;
+
if (((priv->number > 0)&&(priv->number <= 100))||((buf->current_id - priv->announced) > 1000)) {
- printf("Processing dataset %li (%s), next: %u out of %u\n", buf->current_id + (ready?0:1), (ready?"ready ":"timeout "), buf->n_fragments[buf->current_id%buf->ring_size], buf->fragments_per_dataset);
+ if (ready)
+ printf("Processing dataset %li (ready ), next : %u out of %u\n", buf->current_id, buf->n_fragments[buf->current_id%buf->ring_size], buf->fragments_per_dataset);
+ else
+ printf("Skipping dataset %li (timeout), acquired: %u out of %u\n", buf->current_id + 1, buf->n_fragments[buf->current_id%buf->ring_size], buf->fragments_per_dataset);
priv->announced = buf->current_id;
}
@@ -332,7 +390,7 @@ ufo_roof_build_task_set_property (GObject *object,
const GValue *value,
GParamSpec *pspec)
{
- UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (object);
+ RoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (object);
switch (property_id) {
case PROP_CONFIG:
@@ -345,6 +403,13 @@ ufo_roof_build_task_set_property (GObject *object,
case PROP_SIMULATE:
priv->simulate = g_value_get_boolean (value);
break;
+ case PROP_PATH:
+ if (priv->path) g_free(priv->path);
+ priv->path = g_value_dup_string(value);
+ break;
+ case PROP_FIRST:
+ priv->first_file_number = g_value_get_uint (value);
+ break;
case PROP_NUMBER:
priv->number = g_value_get_uint (value);
break;
@@ -367,11 +432,11 @@ ufo_roof_build_task_get_property (GObject *object,
GValue *value,
GParamSpec *pspec)
{
- UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (object);
+ RoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (object);
switch (property_id) {
case PROP_CONFIG:
- g_value_set_string(value, priv->config);
+ g_value_set_string(value, priv->config);
break;
case PROP_STOP:
g_value_set_boolean (value, priv->stop);
@@ -379,6 +444,12 @@ ufo_roof_build_task_get_property (GObject *object,
case PROP_SIMULATE:
g_value_set_boolean (value, priv->simulate);
break;
+ case PROP_PATH:
+ g_value_set_string(value, priv->path?priv->path:"");
+ break;
+ case PROP_FIRST:
+ g_value_set_uint (value, priv->first_file_number);
+ break;
case PROP_NUMBER:
g_value_set_uint (value, priv->number);
break;
@@ -394,17 +465,18 @@ ufo_roof_build_task_get_property (GObject *object,
static void
ufo_task_interface_init (UfoTaskIface *iface)
{
+ roof_init();
+
iface->setup = ufo_roof_build_task_setup;
iface->get_num_inputs = ufo_roof_build_task_get_num_inputs;
iface->get_num_dimensions = ufo_roof_build_task_get_num_dimensions;
iface->get_mode = ufo_roof_build_task_get_mode;
iface->get_requisition = ufo_roof_build_task_get_requisition;
- iface->process = ufo_roof_build_task_process;
iface->generate = ufo_roof_build_task_generate;
}
static void
-ufo_roof_build_task_class_init (UfoRoofBuildTaskClass *klass)
+ufo_roof_build_task_class_init (RoofBuildTaskClass *klass)
{
GObjectClass *oclass = G_OBJECT_CLASS (klass);
@@ -426,7 +498,6 @@ ufo_roof_build_task_class_init (UfoRoofBuildTaskClass *klass)
FALSE,
G_PARAM_READWRITE);
-
properties[PROP_SIMULATE] =
g_param_spec_boolean ("simulate",
"Simulation mode",
@@ -434,6 +505,20 @@ ufo_roof_build_task_class_init (UfoRoofBuildTaskClass *klass)
FALSE,
G_PARAM_READWRITE);
+ properties[PROP_PATH] =
+ g_param_spec_string ("path",
+ "Input files for simulation mode",
+ "Optional path to input files for simulation mode (parameter from configuration file is used if not specified)",
+ "",
+ G_PARAM_READWRITE);
+
+ properties[PROP_FIRST] =
+ g_param_spec_uint ("first_file_number",
+ "Offset to the first read file",
+ "Offset to the first read file",
+ 0, G_MAXUINT, 0,
+ G_PARAM_READWRITE);
+
properties[PROP_NUMBER] =
g_param_spec_uint("number",
"Number of datasets to receive",
@@ -452,11 +537,11 @@ ufo_roof_build_task_class_init (UfoRoofBuildTaskClass *klass)
for (guint i = PROP_0 + 1; i < N_PROPERTIES; i++)
g_object_class_install_property (oclass, i, properties[i]);
- g_type_class_add_private (oclass, sizeof(UfoRoofBuildTaskPrivate));
+ g_type_class_add_private (oclass, sizeof(RoofBuildTaskPrivate));
}
static void
-ufo_roof_build_task_init(UfoRoofBuildTask *self)
+ufo_roof_build_task_init(RoofBuildTask *self)
{
self->priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE(self);
}
diff --git a/src/ufo-roof-read-file.h b/src/ufo-roof-read-file.h
deleted file mode 100644
index 787b441..0000000
--- a/src/ufo-roof-read-file.h
+++ /dev/null
@@ -1,8 +0,0 @@
-#ifndef __UFO_ROOF_READ_FILE_H
-#define __UFO_ROOF_READ_FILE_H
-
-#include "ufo-roof-read.h"
-
-UfoRoofReadInterface *ufo_roof_read_file_new(UfoRoofConfig *cfg, const char *path, guint file_id, GError **error);
-
-#endif
diff --git a/src/ufo-roof-read-socket.h b/src/ufo-roof-read-socket.h
deleted file mode 100644
index 74b0742..0000000
--- a/src/ufo-roof-read-socket.h
+++ /dev/null
@@ -1,8 +0,0 @@
-#ifndef __UFO_ROOF_READ_SOCKET_H
-#define __UFO_ROOF_READ_SOCKET_H
-
-#include "ufo-roof-read.h"
-
-UfoRoofReadInterface *ufo_roof_read_socket_new(UfoRoofConfig *cfg, guint id, GError **error);
-
-#endif
diff --git a/src/ufo-roof-read-task.c b/src/ufo-roof-read-task.c
index 7d55b79..83b9627 100644
--- a/src/ufo-roof-read-task.c
+++ b/src/ufo-roof-read-task.c
@@ -35,7 +35,7 @@
struct _UfoRoofReadTaskPrivate {
gchar *config; // ROOF configuration file name
UfoRoofConfig *cfg; // Parsed ROOF parameters
- UfoRoofReadInterface *reader;
+ UfoRoofReadInterface *reader[16];
guint id; // Reader ID (defince sequential port number)
gboolean stop; // Flag requiring termination
@@ -76,6 +76,7 @@ ufo_roof_read_task_setup (UfoTask *task,
UfoResources *resources,
GError **error)
{
+ guint i;
GError *gerr = NULL;
UfoRoofReadTaskPrivate *priv = UFO_ROOF_READ_TASK_GET_PRIVATE (task);
@@ -91,17 +92,19 @@ ufo_roof_read_task_setup (UfoTask *task,
roof_setup_error(error, "Specified Stream ID is %u, but only %u data streams is configured", priv->id, priv->cfg->n_streams);
// Start actual reader
- if (priv->simulate) {
- if (!priv->path)
- roof_setup_error(error, "Path to simulated data should be specified");
-
- priv->reader = ufo_roof_read_file_new(priv->cfg, priv->path, priv->id + priv->first_file_number, &gerr);
- } else
- priv->reader = ufo_roof_read_socket_new(priv->cfg, priv->id, &gerr);
- if (!priv->reader)
- roof_propagate_error(error, gerr, "roof_read_new: ");
+ for (i = 0; (i < priv->cfg->sockets_per_thread)&&((priv->id * priv->cfg->sockets_per_thread + i) < priv->cfg->n_streams); i++) {
+ if (priv->simulate) {
+ if (!priv->path)
+ roof_setup_error(error, "Path to simulated data should be specified");
+
+ priv->reader[i] = ufo_roof_read_file_new(priv->cfg, priv->path, priv->id * priv->cfg->sockets_per_thread + i + priv->first_file_number, &gerr);
+ } else
+ priv->reader[i] = ufo_roof_read_socket_new(priv->cfg, priv->id * priv->cfg->sockets_per_thread + i, &gerr);
+ if (!priv->reader[i])
+ roof_propagate_error(error, gerr, "roof_read_new: ");
+ }
}
@@ -110,8 +113,10 @@ ufo_roof_read_task_finalize (GObject *object)
{
UfoRoofReadTaskPrivate *priv = UFO_ROOF_READ_TASK_GET_PRIVATE (object);
- if (priv->reader) {
- priv->reader->close(priv->reader);
+ for (guint i = 0; i < priv->cfg->sockets_per_thread; i++) {
+ if (priv->reader[i]) {
+ priv->reader[i]->close(priv->reader[i]);
+ }
}
if (priv->cfg) {
@@ -180,16 +185,47 @@ ufo_roof_read_task_generate (UfoTask *task,
void *output_buffer = ufo_buffer_get_host_array(output, NULL);
UfoRoofPacketBlockHeader *header = UFO_ROOF_PACKET_BLOCK_HEADER(output_buffer, cfg);
+ uint64_t current_id[16] = {0};
+ uint64_t grabbed[16] = {0};
+
+ static uint64_t errors = 0;
+retry:
if (priv->stop)
return FALSE;
- guint packets = priv->reader->read(priv->reader, output_buffer, &gerr);
+ for (guint sid = 0; (sid < cfg->sockets_per_thread)&&((priv->id * cfg->sockets_per_thread + sid) < priv->cfg->n_streams); sid++) {
+
+ guint packets = priv->reader[sid]->read(priv->reader[sid], output_buffer, &gerr);
if (gerr) {
g_warning("Error reciving data: %s", gerr->message);
g_error_free(gerr);
return FALSE;
}
+ const uint8_t *fragment = output_buffer;
+ for (guint i = 0; i < packets; i++) {
+ guint64 packet_id = 0;
+
+ // Otherwise considered consecutive and handled by the buffer
+ if (cfg->header_size >= sizeof(UfoRoofPacketHeader)) {
+ UfoRoofPacketHeader *pheader = UFO_ROOF_PACKET_HEADER(fragment);
+ packet_id = be64toh(pheader->packet_id) + 1;
+ }
+
+ if ((current_id[sid])&&(current_id[sid] + 1 != packet_id)) {
+ printf("Channel %i(%i): =======> Missing %lu packets, expecting %lu but got %lu (N %i from total packets in pack %u)\n", priv->id * cfg->sockets_per_thread + sid, sid, packet_id - current_id[sid] - 1, current_id[sid] + 1, packet_id, i, packets);
+ //if (++errors > 1000) exit(1);
+ }
+ current_id[sid] = packet_id;
+ grabbed[sid]++;
+ if ((grabbed[sid]%1000000)==0) printf("Channel %i(%i): Grabbed %lu Mpackets\n", priv->id * cfg->sockets_per_thread + sid, sid, grabbed[sid]/1000000);
+
+ fragment += cfg->max_packet_size;
+ }
+ }
+
+ goto retry;
+
#ifdef UFO_ROOF_DEBUG
// Store first received packet on each channel...
static int debug = 1;
@@ -206,12 +242,12 @@ ufo_roof_read_task_generate (UfoTask *task,
#endif /* UFO_ROOF_DEBUG */
// FIXME: End of data (shall we restart in the network case?)
- if (!packets)
- return FALSE;
+// if (!packets)
+// return FALSE;
// Shall I use UFO metadata (ufo_buffer_set_metadata) insead?
header->channel_id = priv->id;
- header->n_packets = packets;
+// header->n_packets = packets;
return TRUE;
}
diff --git a/src/ufo-roof-read.h b/src/ufo-roof-read.h
deleted file mode 100644
index 5f0853c..0000000
--- a/src/ufo-roof-read.h
+++ /dev/null
@@ -1,17 +0,0 @@
-#ifndef __UFO_ROOF_READ_H
-#define __UFO_ROOF_READ_H
-
-#include "ufo-roof-config.h"
-
-typedef struct _UfoRoofReadInterface UfoRoofReadInterface;
-
-typedef guint (*UfoRoofReaderRead)(UfoRoofReadInterface *reader, uint8_t *buf, GError **error);
-typedef void (*UfoRoofReaderClose)(UfoRoofReadInterface *reader);
-
-struct _UfoRoofReadInterface {
- UfoRoofReaderRead read;
- UfoRoofReaderClose close;
-};
-
-
-#endif /* __UFO_ROOF_READ_H */
diff --git a/src/ufo-roof.h b/src/ufo-roof.h
index 23f8429..ea422e2 100644
--- a/src/ufo-roof.h
+++ b/src/ufo-roof.h
@@ -1,21 +1,21 @@
-#ifndef __UFO_ROOF_H
-#define __UFO_ROOF_H
+#ifndef __ROOF_H
+#define __ROOF_H
#include "ufo-roof-config.h"
#include "ufo-roof-error.h"
-//#define UFO_ROOF_DEBUG
-#define UFO_ROOF_PACKET_HEADER(buf) ((UfoRoofPacketHeader*)(buf))
-#define UFO_ROOF_PACKET_BLOCK_HEADER(buf, cfg) ((UfoRoofPacketBlockHeader*)(((uint8_t*)buf) + cfg->max_packets * cfg->max_packet_size))
+//#define ROOF_DEBUG
+#define ROOF_PACKET_HEADER(buf) ((RoofPacketHeader*)(buf))
+#define ROOF_PACKET_BLOCK_HEADER(buf, cfg) ((RoofPacketBlockHeader*)(((uint8_t*)buf) + cfg->max_packets * cfg->max_packet_size))
typedef struct {
uint64_t packet_id; // Sequential Packet ID (numbered from 0)
-} UfoRoofPacketHeader;
+} RoofPacketHeader;
typedef struct {
uint32_t channel_id; // Specifies channel on which the data were received (numbered from 0)
uint32_t n_packets; // Number of packets
-} UfoRoofPacketBlockHeader;
+} RoofPacketBlockHeader;
-#endif /* __UFO_ROOF_H */
+#endif /* __ROOF_H */
diff --git a/tests/checks.sh b/tests/checks.sh
new file mode 100644
index 0000000..536bb42
--- /dev/null
+++ b/tests/checks.sh
@@ -0,0 +1,12 @@
+sudo tcpdump -i ens4d1 -e -c 1000 | cut -d ' ' -f 2- | sort | uniq
+#nc -l -u -p 52080
+
+#vma_stats $(ps xa | grep python3 | grep roof | awk '{print $1}') -c 1 -v 3
+# PSR - CPU core running process/thread
+#ps -aeLF
+
+
+
+#echo 0 > /sys/kernel/mm/transparent_hugepage/khugepaged/defrag
+#echo never > /sys/kernel/mm/transparent_hugepage/defrag
+#echo never > /sys/kernel/mm/transparent_hugepage/enabled
diff --git a/tests/config.sh b/tests/config.sh
index 7b9d7d8..e0ee335 100644
--- a/tests/config.sh
+++ b/tests/config.sh
@@ -5,17 +5,13 @@ arch=""
[ $el7 -ne 0 ] && arch="64"
[ -f /etc/gentoo-release ] && arch="64"
+ands_path=/mnt/ands
+vma_lib_path=/usr/local/lib/
+vma_lib_ext=""
-ods_path=/mnt/ands/ods/bin-fedora/
-vma_path=/mnt/ands/
-vma_lib_path=/mnt/ands/lib64-fedora/
+[ $el7 -eq 1 ] && vma_lib_path="${ands_path/fedora/centos}"
-[ $el7 -eq 1 ] && ods_path="${ods_path/fedora/centos}"
-[ $el7 -eq 1 ] && vma_lib_path="${vma_lib_path/fedora/centos}"
-
-# Standard library
-#vma_lib=${vma_lib_path}/libvma.so.8.6.10
-vma_lib=${vma_lib_path}/libvma.so.8.9.5
+vma_lib=${vma_lib_path}/libvma.so${vma_lib_ext}
# With Mellanox OFED extensions (./configure --enable-socketxtreme)
#vma_lib=${vma_lib_path}/mlx/libvma.so.8.6.10
diff --git a/tests/control.py b/tests/control.py
new file mode 100644
index 0000000..0d27f14
--- /dev/null
+++ b/tests/control.py
@@ -0,0 +1,28 @@
+import socket
+import time
+
+#Communication with the boards (for configuration) happens via UDP messages to 192.168.100.xxx at port 51966. IP range is 192.168.100.100 to 192.168.100.119.
+#If you need to toggle the streaming on/off, send a udp broadcast to 192.168.100.255 at port 51966 with udp payload x011100 (disable) or x011101 (enable).
+#To reset the internal packet counter to 1, send x011500
+#To change packet size, send x0112XXXX with XXXX being the payload size. Received packets will always be 8 byte larger than this value because of the counter in the beginning of the payload. You should change the size only when streaming is turned off.
+#Currently, all 20 devices are streaming data packets of 800 byte size at a rate ~500 MBit/s per device. They send to ports 52067 through 52086.
+#I'm pretty sure it was configured correctly when I left. Just in case, you can use command hex 0106XXXXXXXXXXX with the mellanox MAC in hex as XX to set the target Mac address for the devices. Broadcast it to port 51966 at 192.168.100.255.
+
+
+server = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
+#server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
+server.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
+#message = bytes.fromhex("011101")
+#server.sendto(message, ("192.168.100.255", 51966))
+
+message = bytes.fromhex("011100")
+server.sendto(message, ("192.168.100.108", 51966))
+#server.sendto(message, ("192.168.100.119", 51966))
+server.sendto(message, ("192.168.100.106", 51966))
+
+
+
+#sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+#sock.bind(("0.0.0.0", 52068))
+#data, addr = sock.recvfrom(4096)
+#print ("received message from %s of %i bytes " % (addr, len(data)))
diff --git a/tests/roof-vma.sh b/tests/roof-vma.sh
index 2faff84..3e97738 120000..100755
--- a/tests/roof-vma.sh
+++ b/tests/roof-vma.sh
@@ -1 +1,48 @@
-roof.sh \ No newline at end of file
+#! /bin/bash
+
+. config.sh
+
+function pyroof {
+# numactl --cpunodebind=1
+ LD_LIBRARY_PATH="$LD_LIBRARY_PATH:/usr/local/lib$arch" GI_TYPELIB_PATH="/usr/local/lib$arch/girepository-1.0/" \
+ python3 roof.py "$@"
+# numactl --cpunodebind=1 python3 roof.py "$@"
+}
+
+if [[ "$0" =~ roof-vma ]]; then
+ function roof {
+ bufs=800000
+ bufs=$((bufs * 4))
+
+ ulimit -l unlimited
+ echo 1000000000 > /proc/sys/kernel/shmmax # 18446744073692774399
+ echo 8000 > /proc/sys/vm/nr_hugepages # 0
+
+
+ tuned-adm profile latency-performance
+ cpupower frequency-set --governor performance # powersave
+ echo 100000000 > /proc/sys/kernel/sched_min_granularity_ns # 3000000
+ echo 50000000 > /proc/sys/kernel/sched_migration_cost_ns # 500000
+ echo 0 > /proc/sys/kernel/numa_balancing # 1
+
+ echo 0 > /proc/sys/vm/swappiness
+ sysctl -w vm.swappiness=0
+ sysctl -w vm.zone_reclaim_mode=0
+ echo never > /sys/kernel/mm/transparent_hugepage/enabled
+
+ #VMA_SPEC=latency VMA_INTERNAL_THREAD_AFFINITY=1
+ VMA_THREAD_MODE=3 VMA_MTU=0 VMA_RX_POLL=10 VMA_SELECT_POLL=10 VMA_RING_ALLOCATION_LOGIC_RX=20 VMA_MEM_ALLOC_TYPE=2 VMA_RX_BUFS=$bufs VMA_CQ_AIM_INTERRUPTS_RATE_PER_SEC=1000 LD_PRELOAD=$vma_lib \
+ pyroof "$@"
+ }
+else
+ function roof {
+ pyroof "$@"
+ }
+fi
+
+#cat roof.yaml | sed '/simulation/,$d' | yq . > roof.json
+#cat roof.yaml | yq r - -j | jq '' | sed -r '/\[$/ {:a;N;s/\]/&/;Ta;s/\n +//g;s/,(.)/, \1/}' > roof.json
+#This is real
+#cat roof.yaml | python3 yaml2json.py | sed -r '/\[$/ {:a;N;s/\]/&/;Ta;s/\n +//g;s/,(.)/, \1/}' > roof.json
+
+roof "$@"
diff --git a/tests/roof.json b/tests/roof.json
index 1848bc6..07c86f6 100644
--- a/tests/roof.json
+++ b/tests/roof.json
@@ -1,15 +1,15 @@
{
"hardware": {
"planes": 2,
- "modules": 16,
+ "modules": 20,
"bit_depth": 16,
"channels_per_module": 16,
"samples_per_rotation": 500
},
"geometry": {
- "detector_diameter": [216, 216],
- "source_diameter": [360, 365],
- "source_angle": [270, 275],
+ "detector_diameter": 216,
+ "source_diameter": 360,
+ "source_angle": 270,
"source_angle_offset": 3.2,
"delta_x": 500,
"delta_z": 1200
@@ -17,16 +17,19 @@
"network": {
"protocol": "udp",
"port": 52067,
- "streams": 16,
+ "streams": 20,
"header_size": 8,
"payload_size": 800
},
"performance": {
- "buffer_size": 10,
- "packets_at_once": 100
+ "buffer_size": 10000,
+ "packets_at_once": 500,
+ "latency_buffers": 1000,
+ "drop_buffers": 5000,
+ "sockets_per_thread": 1
},
"data": {
- "base_path": "/home/csa/roof2_data/test_data"
+ "base_path": "/home/ufxray/roof/test_data"
},
"simulation": {
"first_file_number": 1,
diff --git a/tests/roof.sh b/tests/roof.sh
index a8fa89e..3e97738 100755
--- a/tests/roof.sh
+++ b/tests/roof.sh
@@ -3,8 +3,10 @@
. config.sh
function pyroof {
+# numactl --cpunodebind=1
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:/usr/local/lib$arch" GI_TYPELIB_PATH="/usr/local/lib$arch/girepository-1.0/" \
- python3 roof.py "$@"
+ python3 roof.py "$@"
+# numactl --cpunodebind=1 python3 roof.py "$@"
}
if [[ "$0" =~ roof-vma ]]; then
@@ -16,7 +18,20 @@ if [[ "$0" =~ roof-vma ]]; then
echo 1000000000 > /proc/sys/kernel/shmmax # 18446744073692774399
echo 8000 > /proc/sys/vm/nr_hugepages # 0
- VMA_THREAD_MODE=3 VMA_MTU=0 VMA_RX_POLL=0 VMA_SELECT_POLL=0 VMA_RING_ALLOCATION_LOGIC_RX=20 VMA_RX_BUFS=$bufs LD_PRELOAD=$vma_lib \
+
+ tuned-adm profile latency-performance
+ cpupower frequency-set --governor performance # powersave
+ echo 100000000 > /proc/sys/kernel/sched_min_granularity_ns # 3000000
+ echo 50000000 > /proc/sys/kernel/sched_migration_cost_ns # 500000
+ echo 0 > /proc/sys/kernel/numa_balancing # 1
+
+ echo 0 > /proc/sys/vm/swappiness
+ sysctl -w vm.swappiness=0
+ sysctl -w vm.zone_reclaim_mode=0
+ echo never > /sys/kernel/mm/transparent_hugepage/enabled
+
+ #VMA_SPEC=latency VMA_INTERNAL_THREAD_AFFINITY=1
+ VMA_THREAD_MODE=3 VMA_MTU=0 VMA_RX_POLL=10 VMA_SELECT_POLL=10 VMA_RING_ALLOCATION_LOGIC_RX=20 VMA_MEM_ALLOC_TYPE=2 VMA_RX_BUFS=$bufs VMA_CQ_AIM_INTERRUPTS_RATE_PER_SEC=1000 LD_PRELOAD=$vma_lib \
pyroof "$@"
}
else
@@ -27,6 +42,7 @@ fi
#cat roof.yaml | sed '/simulation/,$d' | yq . > roof.json
#cat roof.yaml | yq r - -j | jq '' | sed -r '/\[$/ {:a;N;s/\]/&/;Ta;s/\n +//g;s/,(.)/, \1/}' > roof.json
-cat roof.yaml | python3 yaml2json.py | sed -r '/\[$/ {:a;N;s/\]/&/;Ta;s/\n +//g;s/,(.)/, \1/}' > roof.json
+#This is real
+#cat roof.yaml | python3 yaml2json.py | sed -r '/\[$/ {:a;N;s/\]/&/;Ta;s/\n +//g;s/,(.)/, \1/}' > roof.json
roof "$@"
diff --git a/tests/roof/config.py b/tests/roof/config.py
index 5ea8d41..ace895a 100644
--- a/tests/roof/config.py
+++ b/tests/roof/config.py
@@ -22,6 +22,8 @@ class RoofConfig:
self.fan_bins = self.modules * self.get_opt('hardware', 'channels_per_module', 16)
self.fan_projections = self.get_opt('hardware', 'samples_per_rotation', (self.sample_rate / self.imaging_rate) if (self.imaging_rate and self.sample_rate) else 1000)
+ self.sockets_per_thread = self.get_opt('performance', 'sockets_per_stream', 1)
+
if self.args.number is None: self.args.number = 0 if self.args.benchmark else self.planes
# Consistency and default mode
diff --git a/tests/roof/graph.py b/tests/roof/graph.py
index 62aa44e..0f516bc 100644
--- a/tests/roof/graph.py
+++ b/tests/roof/graph.py
@@ -1,3 +1,4 @@
+import math
import re
import gi
@@ -57,7 +58,7 @@ class RoofGraph(RoofConfig):
params = { 'path': path, 'first': first, 'step': step }
if self.args.number:
params['number'] = self.args.number
-
+
print ("Reading {} data from {}".format(self.args.read,path))
# FIXME: handle raw data parameters
return self.get_task('read', **params)
@@ -72,12 +73,7 @@ class RoofGraph(RoofConfig):
# Reconstruction from network or simulated data (also generation of flat/dark-fields)
build_type = "raw" if self.args.noroof else "sino" if self.check_writer_type_is_raw() else "ufo"
- build = self.get_roof_task('roof-build', simulate = self.args.simulate, number = self.args.number, build = build_type)
- for id in range(self.streams):
- read = self.get_roof_task('roof-read', id = id, simulate = self.args.simulate, path = path, first_file_number = first)
- self.graph.connect_nodes(read, build)
- build.bind_property('stop', read, 'stop', GObject.BindingFlags.DEFAULT)
-
+ build = self.get_roof_task('roof-build', simulate = self.args.simulate, path = path, first_file_number = first, number = self.args.number, build = build_type)
return build
def get_writer(self):
diff --git a/tests/vma-analyze.sh b/tests/vma-analyze.sh
index 7000922..77c4eb1 100755
--- a/tests/vma-analyze.sh
+++ b/tests/vma-analyze.sh
@@ -7,9 +7,9 @@ sleep=1
path=/mnt/ands/bin/vma_stats_mlx
#-z seems ignored
#$path -p $(pidof onlineDetectorSimulatorServer) -c 1 -z &> /dev/null
-stats1=($($path -p $(pidof python) -c 1 | grep Rx | awk '{ print $3, $4 }'))
+stats1=($($path -p $(ps xa | grep python3 | grep roof | awk '{print $1}') -c 1 | grep Rx | awk '{ print $3, $4 }'))
sleep $sleep
-stats2=($($path -p $(pidof python) -c 1 | grep Rx | awk '{ print $3, $4 }'))
+stats2=($($path -p $(ps xa | grep pyth | grep roof | awk '{print $1}') -c 1 | grep Rx | awk '{ print $3, $4 }'))
pksum=0
bwsum=0