summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorSuren A. Chilingaryan <csa@suren.me>2020-09-03 03:00:30 +0200
committerSuren A. Chilingaryan <csa@suren.me>2020-09-03 03:00:30 +0200
commit5172421d248250b4ab3b69eb57fd83656e23a4da (patch)
treea499d9f1dd0b74b754816884a59927b3171656fc /src
parent7b2e6168b049be9e7852b2d364d897592eff69fc (diff)
downloadufo-roof-temp-5172421d248250b4ab3b69eb57fd83656e23a4da.tar.gz
ufo-roof-temp-5172421d248250b4ab3b69eb57fd83656e23a4da.tar.bz2
ufo-roof-temp-5172421d248250b4ab3b69eb57fd83656e23a4da.tar.xz
ufo-roof-temp-5172421d248250b4ab3b69eb57fd83656e23a4da.zip
This is unfinished work implemeting out-of-UFO network serversHEADmaster
Diffstat (limited to 'src')
-rw-r--r--src/CMakeLists.txt10
-rw-r--r--src/hw_config.h8
-rw-r--r--src/hw_sched.c398
-rw-r--r--src/hw_sched.h146
-rw-r--r--src/hw_thread.c143
-rw-r--r--src/hw_thread.h76
-rw-r--r--src/meson.build7
-rw-r--r--src/roof-buffer.c (renamed from src/ufo-roof-buffer.c)85
-rw-r--r--src/roof-buffer.h (renamed from src/ufo-roof-buffer.h)29
-rw-r--r--src/roof-config.c (renamed from src/ufo-roof-config.c)52
-rw-r--r--src/roof-config.h (renamed from src/ufo-roof-config.h)20
-rw-r--r--src/roof-error.h (renamed from src/ufo-roof-error.h)9
-rw-r--r--src/roof-read-file.c (renamed from src/ufo-roof-read-file.c)56
-rw-r--r--src/roof-read-file.h8
-rw-r--r--src/roof-read-socket.c (renamed from src/ufo-roof-read-socket.c)89
-rw-r--r--src/roof-read-socket.h8
-rw-r--r--src/roof-read.c107
-rw-r--r--src/roof-read.h45
-rw-r--r--src/roof-thread.c95
-rw-r--r--src/roof-thread.h30
-rw-r--r--src/roof.c133
-rw-r--r--src/roof.h46
-rw-r--r--src/save/memcpy.c344
-rw-r--r--src/save/memcpy.h63
-rw-r--r--src/save/ufo-roof-buffer-build-task.c474
-rw-r--r--src/save/ufo-roof-read-thread.c155
-rw-r--r--src/save/ufo-roof-read-thread.h23
-rw-r--r--src/save/ufo-roof-read.c123
-rw-r--r--src/save/ufo-roof-read.h61
-rw-r--r--src/ufo-roof-build-task.c271
-rw-r--r--src/ufo-roof-read-file.h8
-rw-r--r--src/ufo-roof-read-socket.h8
-rw-r--r--src/ufo-roof-read-task.c68
-rw-r--r--src/ufo-roof-read.h17
-rw-r--r--src/ufo-roof.h16
35 files changed, 2947 insertions, 284 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 720c18f..837a6fb 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -2,7 +2,6 @@ cmake_minimum_required(VERSION 2.6)
#{{{ Sources
set(ufofilter_SRCS
- ufo-roof-read-task.c
ufo-roof-build-task.c
ufo-roof-filter-task.c
ufo-roof-flat-field-correct-task.c
@@ -12,12 +11,11 @@ set(common_SRCS
ufo-roof-config.c
)
-set(roof_read_aux_SRCS
+set(roof_build_aux_SRCS
+ hw_sched.c
+ hw_thread.c
ufo-roof-read-socket.c
ufo-roof-read-file.c
- )
-
-set(roof_build_aux_SRCS
ufo-roof-buffer.c
)
@@ -30,7 +28,7 @@ set(ufofilter_LIBS
${UFO_LIBRARIES}
${OpenCL_LIBRARIES})
-set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -std=gnu18 -pedantic -Wall -Wextra -fPIC -Wno-unused-parameter -Wno-deprecated-declarations")
+set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -std=gnu18 -pedantic -Wall -Wextra -fPIC -Wno-unused-parameter -Wno-deprecated-declarations -g -gdwarf-2 -g3 -fno-omit-frame-pointer")
add_definitions(-D_FILE_OFFSET_BITS=64 -D_LARGE_FILES)
#}}}
diff --git a/src/hw_config.h b/src/hw_config.h
new file mode 100644
index 0000000..2351dea
--- /dev/null
+++ b/src/hw_config.h
@@ -0,0 +1,8 @@
+#ifndef _HW_CONFIG_H
+#define _HW_CONFIG_H
+
+ // enable threading
+#define HW_HAVE_SCHED_HEADERS
+#define HW_USE_THREADS
+
+#endif
diff --git a/src/hw_sched.c b/src/hw_sched.c
new file mode 100644
index 0000000..ec4d812
--- /dev/null
+++ b/src/hw_sched.c
@@ -0,0 +1,398 @@
+/*
+ * The PyHST program is Copyright (C) 2002-2011 of the
+ * European Synchrotron Radiation Facility (ESRF) and
+ * Karlsruhe Institute of Technology (KIT).
+ *
+ * PyHST is free software: you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * hst is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ * See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#define _GNU_SOURCE
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "hw_config.h"
+
+#ifdef HW_HAVE_SCHED_HEADERS
+# include <sys/types.h>
+# include <unistd.h>
+# include <sched.h>
+#endif /* HW_HAVE_SCHED_HEADERS */
+
+#include "hw_sched.h"
+
+
+#ifdef HW_USE_THREADS
+# define MUTEX_INIT(ctx, name) \
+ if (!err) { \
+ ctx->name##_mutex = g_mutex_new(); \
+ if (!ctx->name##_mutex) err = 1; \
+ }
+
+# define MUTEX_FREE(ctx, name) \
+ if (ctx->name##_mutex) g_mutex_free(ctx->name##_mutex);
+
+# define COND_INIT(ctx, name) \
+ MUTEX_INIT(ctx, name##_cond) \
+ if (!err) { \
+ ctx->name##_cond = g_cond_new(); \
+ if (!ctx->name##_cond) { \
+ err = 1; \
+ MUTEX_FREE(ctx, name##_cond) \
+ } \
+ }
+
+# define COND_FREE(ctx, name) \
+ if (ctx->name##_cond) g_cond_free(ctx->name##_cond); \
+ MUTEX_FREE(ctx, name##_cond)
+#else /* HW_USE_THREADS */
+# define MUTEX_INIT(ctx, name)
+# define MUTEX_FREE(ctx, name)
+# define COND_INIT(ctx, name)
+# define COND_FREE(ctx, name)
+#endif /* HW_USE_THREADS */
+
+
+HWRunFunction ppu_run[] = {
+ (HWRunFunction)NULL
+};
+
+static int hw_sched_initialized = 0;
+
+int hw_sched_init(void) {
+ if (!hw_sched_initialized) {
+#ifdef HW_USE_THREADS
+ g_thread_init(NULL);
+#endif /* HW_USE_THREADS */
+ hw_sched_initialized = 1;
+ }
+
+ return 0;
+}
+
+
+int hw_sched_get_cpu_count(void) {
+#ifdef HW_HAVE_SCHED_HEADERS
+ int err;
+
+ int cpu_count;
+ cpu_set_t mask;
+
+ err = sched_getaffinity(getpid(), sizeof(mask), &mask);
+ if (err) return 1;
+
+# ifdef CPU_COUNT
+ cpu_count = CPU_COUNT(&mask);
+# else
+ for (cpu_count = 0; cpu_count < CPU_SETSIZE; cpu_count++) {
+ if (!CPU_ISSET(cpu_count, &mask)) break;
+ }
+# endif
+
+ if (!cpu_count) cpu_count = 1;
+ return cpu_count;
+#else /* HW_HAVE_SCHED_HEADERS */
+ return 1;
+#endif /* HW_HAVE_SCHED_HEADERS */
+}
+
+
+HWSched hw_sched_create(int cpu_count) {
+ int i;
+ int err = 0;
+
+ HWSched ctx;
+
+ //hw_sched_init();
+
+ ctx = (HWSched)malloc(sizeof(HWSchedS));
+ if (!ctx) return NULL;
+
+ memset(ctx, 0, sizeof(HWSchedS));
+
+ ctx->status = 1;
+
+ MUTEX_INIT(ctx, sync);
+ MUTEX_INIT(ctx, data);
+ COND_INIT(ctx, compl);
+ COND_INIT(ctx, job);
+
+ if (err) {
+ hw_sched_destroy(ctx);
+ return NULL;
+ }
+
+ if (!cpu_count) cpu_count = hw_sched_get_cpu_count();
+ if (cpu_count > HW_MAX_THREADS) cpu_count = HW_MAX_THREADS;
+
+ ctx->n_threads = 0;
+ for (i = 0; i < cpu_count; i++) {
+ ctx->thread[ctx->n_threads] = hw_thread_create(ctx, ctx->n_threads, NULL, ppu_run, NULL);
+ if (ctx->thread[ctx->n_threads]) {
+#ifndef HW_USE_THREADS
+ ctx->thread[ctx->n_threads]->status = HW_THREAD_STATUS_STARTING;
+#endif /* HW_USE_THREADS */
+ ++ctx->n_threads;
+ }
+ }
+
+ if (!ctx->n_threads) {
+ hw_sched_destroy(ctx);
+ return NULL;
+ }
+
+ return ctx;
+}
+
+static int hw_sched_wait_threads(HWSched ctx) {
+#ifdef HW_USE_THREADS
+ int i = 0;
+
+ hw_sched_lock(ctx, compl_cond);
+ while (i < ctx->n_threads) {
+ for (; i < ctx->n_threads; i++) {
+ if (ctx->thread[i]->status == HW_THREAD_STATUS_INIT) {
+ hw_sched_wait(ctx, compl);
+ break;
+ }
+ }
+
+ }
+ hw_sched_unlock(ctx, compl_cond);
+#endif /* HW_USE_THREADS */
+
+ ctx->started = 1;
+
+ return 0;
+}
+
+void hw_sched_destroy(HWSched ctx) {
+ int i;
+
+ if (ctx->n_threads > 0) {
+ if (!ctx->started) {
+ hw_sched_wait_threads(ctx);
+ }
+
+ ctx->status = 0;
+ hw_sched_lock(ctx, job_cond);
+ hw_sched_broadcast(ctx, job);
+ hw_sched_unlock(ctx, job_cond);
+
+ for (i = 0; i < ctx->n_threads; i++) {
+ hw_thread_destroy(ctx->thread[i]);
+ }
+ }
+
+ COND_FREE(ctx, job);
+ COND_FREE(ctx, compl);
+ MUTEX_FREE(ctx, data);
+ MUTEX_FREE(ctx, sync);
+
+ free(ctx);
+}
+
+int hw_sched_set_sequential_mode(HWSched ctx, int *n_blocks, int *cur_block, HWSchedFlags flags) {
+ ctx->mode = HW_SCHED_MODE_SEQUENTIAL;
+ ctx->n_blocks = n_blocks;
+ ctx->cur_block = cur_block;
+ ctx->flags = flags;
+
+ return 0;
+}
+
+int hw_sched_get_chunk(HWSched ctx, int thread_id) {
+ int block;
+
+ switch (ctx->mode) {
+ case HW_SCHED_MODE_PREALLOCATED:
+ if (ctx->thread[thread_id]->status == HW_THREAD_STATUS_STARTING) {
+#ifndef HW_USE_THREADS
+ ctx->thread[thread_id]->status = HW_THREAD_STATUS_DONE;
+#endif /* HW_USE_THREADS */
+ return thread_id;
+ } else {
+ return HW_SCHED_CHUNK_INVALID;
+ }
+ case HW_SCHED_MODE_SEQUENTIAL:
+ if ((ctx->flags&HW_SCHED_FLAG_INIT_CALL)&&(ctx->thread[thread_id]->status == HW_THREAD_STATUS_STARTING)) {
+ return HW_SCHED_CHUNK_INIT;
+ }
+ hw_sched_lock(ctx, data);
+ block = *ctx->cur_block;
+ if (block < *ctx->n_blocks) {
+ *ctx->cur_block = *ctx->cur_block + 1;
+ } else {
+ block = HW_SCHED_CHUNK_INVALID;
+ }
+ hw_sched_unlock(ctx, data);
+ if (block == HW_SCHED_CHUNK_INVALID) {
+ if (((ctx->flags&HW_SCHED_FLAG_FREE_CALL)&&(ctx->thread[thread_id]->status == HW_THREAD_STATUS_RUNNING))) {
+ ctx->thread[thread_id]->status = HW_THREAD_STATUS_FINISHING;
+ return HW_SCHED_CHUNK_FREE;
+ }
+ if ((ctx->flags&HW_SCHED_FLAG_TERMINATOR_CALL)&&((ctx->thread[thread_id]->status == HW_THREAD_STATUS_RUNNING)||(ctx->thread[thread_id]->status == HW_THREAD_STATUS_FINISHING))) {
+ int i;
+ hw_sched_lock(ctx, data);
+ for (i = 0; i < ctx->n_threads; i++) {
+ if (thread_id == i) continue;
+ if ((ctx->thread[i]->status != HW_THREAD_STATUS_DONE)&&(ctx->thread[i]->status != HW_THREAD_STATUS_FINISHING2)&&(ctx->thread[i]->status != HW_THREAD_STATUS_IDLE)) {
+ break;
+ }
+ }
+ ctx->thread[thread_id]->status = HW_THREAD_STATUS_FINISHING2;
+ hw_sched_unlock(ctx, data);
+ if (i == ctx->n_threads) {
+ return HW_SCHED_CHUNK_TERMINATOR;
+ }
+ }
+ }
+ return block;
+ default:
+ return HW_SCHED_CHUNK_INVALID;
+ }
+
+ return -1;
+}
+
+
+int hw_sched_schedule_task(HWSched ctx, void *appctx, HWEntry entry) {
+#ifdef HW_USE_THREADS
+ if (!ctx->started) {
+ hw_sched_wait_threads(ctx);
+ }
+#else /* HW_USE_THREADS */
+ int err;
+ int i, chunk_id, n_threads;
+ HWRunFunction run;
+ HWThread thrctx;
+#endif /* HW_USE_THREADS */
+
+ ctx->ctx = appctx;
+ ctx->entry = entry;
+
+ switch (ctx->mode) {
+ case HW_SCHED_MODE_SEQUENTIAL:
+ *ctx->cur_block = 0;
+ break;
+ default:
+ ;
+ }
+
+#ifdef HW_USE_THREADS
+ hw_sched_lock(ctx, compl_cond);
+
+ hw_sched_lock(ctx, job_cond);
+ hw_sched_broadcast(ctx, job);
+ hw_sched_unlock(ctx, job_cond);
+#else /* HW_USE_THREADS */
+ n_threads = ctx->n_threads;
+
+ for (i = 0; i < n_threads; i++) {
+ thrctx = ctx->thread[i];
+ thrctx->err = 0;
+ }
+
+ i = 0;
+ thrctx = ctx->thread[i];
+ chunk_id = hw_sched_get_chunk(ctx, thrctx->thread_id);
+
+ while (chunk_id >= 0) {
+ run = hw_run_entry(thrctx->runs, entry);
+ err = run(thrctx, thrctx->hwctx, chunk_id, appctx);
+ if (err) {
+ thrctx->err = err;
+ break;
+ }
+
+ if ((++i) == n_threads) i = 0;
+ thrctx = ctx->thread[i];
+ chunk_id = hw_sched_get_chunk(ctx, thrctx->thread_id);
+ }
+#endif /* HW_USE_THREADS */
+
+ return 0;
+}
+
+int hw_sched_wait_task(HWSched ctx) {
+ int err = 0;
+ int i = 0, n_threads = ctx->n_threads;
+
+#ifdef HW_USE_THREADS
+ while (i < ctx->n_threads) {
+ for (; i < ctx->n_threads; i++) {
+ if (ctx->thread[i]->status == HW_THREAD_STATUS_DONE) {
+ ctx->thread[i]->status = HW_THREAD_STATUS_IDLE;
+ } else {
+ hw_sched_wait(ctx, compl);
+ break;
+ }
+ }
+
+ }
+
+ hw_sched_unlock(ctx, compl_cond);
+#endif /* HW_USE_THREADS */
+
+ for (i = 0; i < n_threads; i++) {
+ HWThread thrctx = ctx->thread[i];
+ if (thrctx->err) return err = thrctx->err;
+
+#ifndef HW_USE_THREADS
+ thrctx->status = HW_THREAD_STATUS_IDLE;
+#endif /* HW_USE_THREADS */
+ }
+
+ return err;
+}
+
+int hw_sched_execute_task(HWSched ctx, void *appctx, HWEntry entry) {
+ int err;
+
+ err = hw_sched_schedule_task(ctx, appctx, entry);
+ if (err) return err;
+
+ return hw_sched_wait_task(ctx);
+}
+
+int hw_sched_schedule_thread_task(HWSched ctx, void *appctx, HWEntry entry) {
+ int err;
+
+ ctx->saved_mode = ctx->mode;
+ ctx->mode = HW_SCHED_MODE_PREALLOCATED;
+ err = hw_sched_schedule_task(ctx, appctx, entry);
+
+ return err;
+}
+
+
+int hw_sched_wait_thread_task(HWSched ctx) {
+ int err;
+
+ err = hw_sched_wait_task(ctx);
+ ctx->mode = ctx->saved_mode;
+
+ return err;
+}
+
+int hw_sched_execute_thread_task(HWSched ctx, void *appctx, HWEntry entry) {
+ int err;
+ int saved_mode = ctx->mode;
+
+ ctx->mode = HW_SCHED_MODE_PREALLOCATED;
+ err = hw_sched_execute_task(ctx, appctx, entry);
+ ctx->mode = saved_mode;
+
+ return err;
+}
diff --git a/src/hw_sched.h b/src/hw_sched.h
new file mode 100644
index 0000000..af9e363
--- /dev/null
+++ b/src/hw_sched.h
@@ -0,0 +1,146 @@
+/*
+ * The PyHST program is Copyright (C) 2002-2011 of the
+ * European Synchrotron Radiation Facility (ESRF) and
+ * Karlsruhe Institute of Technology (KIT).
+ *
+ * PyHST is free software: you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * hst is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ * See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef _HW_SCHED_H
+#define _HW_SCHED_H
+
+#include <glib.h>
+
+typedef struct HWSchedT *HWSched;
+#ifdef HW_USE_THREADS
+typedef GMutex *HWMutex;
+#else /* HW_USE_THREADS */
+typedef void *HWMutex;
+#endif /* HW_USE_THREADS */
+
+
+#include "hw_thread.h"
+
+enum HWSchedModeT {
+ HW_SCHED_MODE_PREALLOCATED = 0,
+ HW_SCHED_MODE_SEQUENTIAL
+};
+typedef enum HWSchedModeT HWSchedMode;
+
+enum HWSchedChunkT {
+ HW_SCHED_CHUNK_INVALID = -1,
+ HW_SCHED_CHUNK_INIT = -2,
+ HW_SCHED_CHUNK_FREE = -3,
+ HW_SCHED_CHUNK_TERMINATOR = -4
+};
+typedef enum HWSchedChunkT HWSchedChunk;
+
+enum HWSchedFlagsT {
+ HW_SCHED_FLAG_INIT_CALL = 1, //! Executed in each thread before real chunks
+ HW_SCHED_FLAG_FREE_CALL = 2, //! Executed in each thread after real chunks
+ HW_SCHED_FLAG_TERMINATOR_CALL = 4 //! Executed in one of the threads after all threads are done
+};
+typedef enum HWSchedFlagsT HWSchedFlags;
+
+
+#define HW_SINGLE_MODE
+//#define HW_DETECT_CPU_CORES
+#define HW_MAX_THREADS 128
+
+#ifdef HW_SINGLE_MODE
+ typedef HWRunFunction HWEntry;
+# define hw_run_entry(runs, entry) entry
+#else /* HW_SINGLE_MODE */
+ typedef int HWEntry;
+# define hw_run_entry(runs, entry) runs[entry]
+#endif /* HW_SINGLE_MODE */
+
+#ifndef HW_HIDE_DETAILS
+struct HWSchedT {
+ int status;
+ int started;
+
+ int n_threads;
+ HWThread thread[HW_MAX_THREADS];
+
+#ifdef HW_USE_THREADS
+ GCond *job_cond, *compl_cond;
+ GMutex *job_cond_mutex, *compl_cond_mutex, *data_mutex;
+ GMutex *sync_mutex;
+#endif /* HW_USE_THREADS */
+
+ HWSchedMode mode;
+ HWSchedMode saved_mode;
+ HWSchedFlags flags;
+ int *n_blocks;
+ int *cur_block;
+
+ HWEntry entry;
+ void *ctx;
+};
+typedef struct HWSchedT HWSchedS;
+#endif /* HW_HIDE_DETAILS */
+
+# ifdef __cplusplus
+extern "C" {
+# endif
+
+HWSched hw_sched_create(int ppu_count);
+int hw_sched_init(void);
+void hw_sched_destroy(HWSched ctx);
+int hw_sched_get_cpu_count(void);
+
+int hw_sched_set_sequential_mode(HWSched ctx, int *n_blocks, int *cur_block, HWSchedFlags flags);
+int hw_sched_get_chunk(HWSched ctx, int thread_id);
+int hw_sched_schedule_task(HWSched ctx, void *appctx, HWEntry entry);
+int hw_sched_wait_task(HWSched ctx);
+int hw_sched_execute_task(HWSched ctx, void *appctx, HWEntry entry);
+
+int hw_sched_schedule_thread_task(HWSched ctx, void *appctx, HWEntry entry);
+int hw_sched_wait_thread_task(HWSched ctx);
+int hw_sched_execute_thread_task(HWSched ctx, void *appctx, HWEntry entry);
+
+HWMutex hw_sched_create_mutex(void);
+void hw_sched_destroy_mutex(HWMutex ctx);
+
+#ifdef HW_USE_THREADS
+# define hw_sched_lock(ctx, type) g_mutex_lock(ctx->type##_mutex)
+# define hw_sched_unlock(ctx, type) g_mutex_unlock(ctx->type##_mutex)
+# define hw_sched_broadcast(ctx, type) g_cond_broadcast(ctx->type##_cond)
+# define hw_sched_signal(ctx, type) g_cond_signal(ctx->type##_cond)
+# define hw_sched_wait(ctx, type) g_cond_wait(ctx->type##_cond, ctx->type##_cond_mutex)
+
+#define hw_sched_create_mutex(void) g_mutex_new()
+#define hw_sched_destroy_mutex(ctx) g_mutex_free(ctx)
+#define hw_sched_lock_mutex(ctx) g_mutex_lock(ctx)
+#define hw_sched_unlock_mutex(ctx) g_mutex_unlock(ctx)
+#else /* HW_USE_THREADS */
+# define hw_sched_lock(ctx, type)
+# define hw_sched_unlock(ctx, type)
+# define hw_sched_broadcast(ctx, type)
+# define hw_sched_signal(ctx, type)
+# define hw_sched_wait(ctx, type)
+
+#define hw_sched_create_mutex(void) NULL
+#define hw_sched_destroy_mutex(ctx)
+#define hw_sched_lock_mutex(ctx)
+#define hw_sched_unlock_mutex(ctx)
+#endif /* HW_USE_THREADS */
+
+# ifdef __cplusplus
+}
+# endif
+
+#endif /* _HW_SCHED_H */
+
diff --git a/src/hw_thread.c b/src/hw_thread.c
new file mode 100644
index 0000000..0374630
--- /dev/null
+++ b/src/hw_thread.c
@@ -0,0 +1,143 @@
+/*
+ * The PyHST program is Copyright (C) 2002-2011 of the
+ * European Synchrotron Radiation Facility (ESRF) and
+ * Karlsruhe Institute of Technology (KIT).
+ *
+ * PyHST is free software: you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * hst is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ * See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "hw_config.h"
+
+#include "hw_sched.h"
+#include "hw_thread.h"
+
+#ifdef HW_USE_THREADS
+static void *hw_thread_function(HWThread ctx) {
+ int err;
+ int chunk_id;
+
+ HWRunFunction *runs;
+ HWRunFunction run;
+ HWSched sched;
+ void *hwctx;
+
+ sched = ctx->sched;
+ runs = ctx->run;
+ hwctx = ctx->hwctx;
+
+ hw_sched_lock(sched, job_cond);
+
+ hw_sched_lock(sched, compl_cond);
+ ctx->status = HW_THREAD_STATUS_IDLE;
+ hw_sched_broadcast(sched, compl);
+ hw_sched_unlock(sched, compl_cond);
+
+ while (sched->status) {
+ hw_sched_wait(sched, job);
+ if (!sched->status) break;
+
+ ctx->err = 0;
+ ctx->status = HW_THREAD_STATUS_STARTING;
+ hw_sched_unlock(sched, job_cond);
+
+ run = hw_run_entry(runs, sched->entry);
+#if 0
+ // Offset to interleave transfers if the GPUBox is used
+ // Just check with CUDA_LAUNCH_BLOCKED the togpu time and put it here
+ // It should be still significantly less than BP time
+ // We can do callibration during initilization in future
+
+ usleep(12000 * ctx->thread_id);
+#endif
+ chunk_id = hw_sched_get_chunk(sched, ctx->thread_id);
+
+ /* Should be after get_chunk, since we can check if it's first time */
+ ctx->status = HW_THREAD_STATUS_RUNNING;
+ while (chunk_id != HW_SCHED_CHUNK_INVALID) {
+ //printf("Thread %i processing slice %i\n", ctx->thread_id, chunk_id);
+ err = run(ctx, hwctx, chunk_id, sched->ctx);
+ if (err) {
+ ctx->err = err;
+ break;
+ }
+ chunk_id = hw_sched_get_chunk(sched, ctx->thread_id);
+ }
+
+ hw_sched_lock(sched, job_cond);
+
+ hw_sched_lock(sched, compl_cond);
+ ctx->status = HW_THREAD_STATUS_DONE;
+ hw_sched_broadcast(sched, compl);
+ hw_sched_unlock(sched, compl_cond);
+ }
+
+ hw_sched_unlock(sched, job_cond);
+
+ g_thread_exit(NULL);
+ return NULL; /* TODO: check this */
+}
+#endif /* HW_USE_THREADS */
+
+
+HWThread hw_thread_create(HWSched sched, int thread_id, void *hwctx, HWRunFunction *run_func, HWFreeFunction free_func) {
+ GError *err;
+
+ HWThread ctx;
+
+ ctx = (HWThread)malloc(sizeof(HWThreadS));
+ if (!ctx) return ctx;
+
+ memset(ctx, 0, sizeof(HWThreadS));
+
+ ctx->sched = sched;
+ ctx->hwctx = hwctx;
+ ctx->run = run_func;
+ ctx->free = free_func;
+ ctx->thread_id = thread_id;
+ ctx->status = HW_THREAD_STATUS_INIT;
+
+#ifdef HW_USE_THREADS
+ ctx->thread = g_thread_create((GThreadFunc)hw_thread_function, ctx, 1, &err);
+ if (!ctx->thread) {
+ g_error_free(err);
+
+ hw_thread_destroy(ctx);
+ return NULL;
+ }
+#endif /* HW_USE_THREADS */
+
+ return ctx;
+}
+
+void hw_thread_destroy(HWThread ctx) {
+#ifdef HW_USE_THREADS
+ if (ctx->thread) {
+ g_thread_join(ctx->thread);
+ }
+#endif /* HW_USE_THREADS */
+
+ if (ctx->data) {
+ free(ctx->data);
+ }
+
+ if (ctx->free) {
+ ctx->free(ctx->hwctx);
+ }
+
+ free(ctx);
+}
diff --git a/src/hw_thread.h b/src/hw_thread.h
new file mode 100644
index 0000000..de7f60f
--- /dev/null
+++ b/src/hw_thread.h
@@ -0,0 +1,76 @@
+/*
+ * The PyHST program is Copyright (C) 2002-2011 of the
+ * European Synchrotron Radiation Facility (ESRF) and
+ * Karlsruhe Institute of Technology (KIT).
+ *
+ * PyHST is free software: you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * hst is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ * See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef _HW_THREAD_H
+#define _HW_THREAD_H
+
+#include <glib.h>
+
+typedef struct HWThreadT *HWThread;
+typedef int (*HWRunFunction)(HWThread thread, void *ctx, int block, void *attr);
+typedef int (*HWFreeFunction)(void *ctx);
+
+#include "hw_sched.h"
+
+enum HWThreadStatusT {
+ HW_THREAD_STATUS_IDLE = 0,
+ HW_THREAD_STATUS_STARTING = 1,
+ HW_THREAD_STATUS_RUNNING = 2,
+ HW_THREAD_STATUS_FINISHING = 3,
+ HW_THREAD_STATUS_FINISHING2 = 4,
+ HW_THREAD_STATUS_DONE = 5,
+ HW_THREAD_STATUS_INIT = 6
+};
+typedef enum HWThreadStatusT HWThreadStatus;
+
+
+#ifndef HW_HIDE_DETAILS
+struct HWThreadT {
+ int thread_id;
+ HWSched sched;
+
+#ifdef HW_USE_THREADS
+ GThread *thread;
+#endif /* HW_USE_THREADS */
+
+ void *hwctx;
+ HWRunFunction *run;
+ HWFreeFunction free;
+
+ int err;
+ HWThreadStatus status;
+
+ void *data; /**< Per-thread data storage, will be free'd if set */
+};
+typedef struct HWThreadT HWThreadS;
+#endif /* HW_HIDE_DETAILS */
+
+# ifdef __cplusplus
+extern "C" {
+# endif
+
+HWThread hw_thread_create(HWSched sched, int thread_id, void *hwctx, HWRunFunction *run_func, HWFreeFunction free_func);
+void hw_thread_destroy(HWThread ctx);
+
+# ifdef __cplusplus
+}
+# endif
+
+
+#endif /* _HW_THREAD_H */
diff --git a/src/meson.build b/src/meson.build
index 324c744..fd41361 100644
--- a/src/meson.build
+++ b/src/meson.build
@@ -1,5 +1,4 @@
plugins = [
- 'roof-read',
'roof-build',
'roof-filter',
'roof-flat-field-correct'
@@ -10,11 +9,11 @@ roof_common_src = [
]
roof_plugin_src = {
- 'roof-read': [
+ 'roof-build': [
+ 'hw_sched.c',
+ 'hw_thread.c',
'ufo-roof-read-socket.c',
'ufo-roof-read-file.c',
- ],
- 'roof-build': [
'ufo-roof-buffer.c',
],
'roof-filter': [
diff --git a/src/ufo-roof-buffer.c b/src/roof-buffer.c
index 0e0a890..4ca0386 100644
--- a/src/ufo-roof-buffer.c
+++ b/src/roof-buffer.c
@@ -9,14 +9,15 @@
// This is currently not thread safe. With dual-filter architecture this will be called sequentially.
-UfoRoofBuffer *ufo_roof_buffer_new(UfoRoofConfig *cfg, guint n_dims, guint max_datasets, GError **error) {
+RoofBuffer *roof_buffer_new(RoofConfig *cfg, guint n_dims, guint max_datasets, GError **error) {
if ((n_dims < 1)||(n_dims > 2))
roof_new_error(error, "Unsupported number of dimmensions %u (only plain and 2D ROOF structure is currently supported)", n_dims);
- UfoRoofBuffer *buffer = (UfoRoofBuffer*)calloc(1, sizeof(UfoRoofBuffer));
- if (!buffer) roof_new_error(error, "Can't allocate UfoRoofBuffer");
+ RoofBuffer *buffer = (RoofBuffer*)calloc(1, sizeof(RoofBuffer));
+ if (!buffer) roof_new_error(error, "Can't allocate RoofBuffer");
buffer->max_datasets = max_datasets;
+ buffer->n_streams = cfg->n_streams;
buffer->ring_size = cfg->buffer_size;
buffer->drop_buffers = cfg->drop_buffers;
buffer->latency_buffers = cfg->latency_buffers;
@@ -36,26 +37,27 @@ UfoRoofBuffer *ufo_roof_buffer_new(UfoRoofConfig *cfg, guint n_dims, guint max_d
buffer->n_fragments = (_Atomic guint*)calloc(buffer->ring_size, sizeof(_Atomic int));
buffer->stream_fragment = (guint*)calloc(cfg->n_streams, sizeof(guint));
-#ifdef UFO_ROOF_INDEPENDENT_STREAMS
- buffer->first_id = malloc(buffer->ring_size * sizeof(guint64));
+#ifdef ROOF_INDEPENDENT_STREAMS
+ buffer->first_id = malloc(buffer->n_streams * sizeof(guint64));
if (!buffer->first_id) roof_new_error(error, "Can't allocate first_id buffer for ROOF datasets");
- for (guint i = 0; i < buffer->ring_size; i++)
+ for (guint i = 0; i < buffer->n_streams; i++)
buffer->first_id[i] = (guint64)-1;
#else
buffer->first_id = (guint64)-1;
#endif
+ buffer->last_id = malloc(buffer->n_streams * sizeof(guint64));
if ((!buffer->ring_buffer)||(!buffer->n_fragments)||(!buffer->stream_fragment)) {
- ufo_roof_buffer_free(buffer);
+ roof_buffer_free(buffer);
roof_new_error(error, "Can't allocate ring buffer for ROOF datasets, total size %u", buffer->ring_size * buffer->dataset_size);
}
return buffer;
}
-void ufo_roof_buffer_free(UfoRoofBuffer *buffer) {
+void roof_buffer_free(RoofBuffer *buffer) {
if (buffer) {
-#ifdef UFO_ROOF_INDEPENDENT_STREAMS
+#ifdef ROOF_INDEPENDENT_STREAMS
if (buffer->first_id)
free(buffer->first_id);
#endif
@@ -71,7 +73,7 @@ void ufo_roof_buffer_free(UfoRoofBuffer *buffer) {
}
// fragment_id is numbered from 1 (0 - means auto)
-gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, guint64 fragment_id, gconstpointer fragment, GError **error) {
+gboolean roof_buffer_set_fragment(RoofBuffer *buffer, guint stream_id, guint64 fragment_id, gconstpointer fragment, GError **error) {
gboolean ready = FALSE;
guint buffer_id;
guint64 first_id;
@@ -85,7 +87,7 @@ gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, gu
dataset_id = (fragment_id - 1) / buffer->fragments_per_stream;
fragment_id = (fragment_id - 1) % buffer->fragments_per_stream;
-#ifdef UFO_ROOF_INDEPENDENT_STREAMS
+#ifdef ROOF_INDEPENDENT_STREAMS
if (buffer->first_id[stream_id] == (guint64)-1)
buffer->first_id[stream_id] = dataset_id;
first_id = buffer->first_id[stream_id];
@@ -102,7 +104,7 @@ gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, gu
// FIXME: Currently, this produces too much output. Introduce some kind of debugging mode?
if (dataset_id < buffer->current_id) {
- roof_network_error_with_retval(error, FALSE, "Late arrived packet for dataset %li, currently processing %li", dataset_id, buffer->current_id);
+// roof_network_error_with_retval(error, FALSE, "Late arrived packet for dataset %li, currently processing %li", dataset_id, buffer->current_id);
return FALSE;
}
@@ -110,24 +112,39 @@ gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, gu
// printf("Stream %i: dataset %li < %li, first_id: %li\n", stream_id, dataset_id, buffer->max_datasets, first_id);
return FALSE;
}
+
+ buffer->last_id[stream_id] = dataset_id;
+
// We are not fast enough, new packets are arrvining to fast
if (dataset_id >= (buffer->current_id + buffer->ring_size)) {
// FIXME: Broken packets sanity checks? Allocate additional buffers on demand?
- root_set_network_error(error, "Ring buffer exhausted. Get packet for dataset %li. Dropping datasets from %li to %li, dataset %li has %i parts of %i completed",
- dataset_id, buffer->current_id, dataset_id - (buffer->ring_size - buffer->drop_buffers), buffer->current_id, buffer->n_fragments[buffer_id], buffer->fragments_per_dataset);
+ guint64 min_id = (guint64)-1;
+ guint64 max_id = 0;
+ for (guint i = 0; i < buffer->n_streams; i++) {
+ if (min_id > buffer->last_id[stream_id]) min_id = buffer->last_id[stream_id];
+ if (max_id < buffer->last_id[stream_id]) max_id = buffer->last_id[stream_id];
+ }
+ printf("Desync: %lu (Min: %lu, Max: %lu), stream: %u (of %u), dataset: %lu\n", max_id - min_id, min_id, max_id, stream_id, buffer->n_streams, dataset_id);
+ printf("Parts: %i %i %i %i\n", buffer->n_fragments[buffer_id], buffer->n_fragments[(buffer_id + 1)%buffer->ring_size], buffer->n_fragments[(buffer_id + 2)%buffer->ring_size], buffer->n_fragments[(buffer_id + 3)%buffer->ring_size]);
+
+ root_set_network_error(error, "Ring buffer exhausted. Get packet for dataset %li. Dropping datasets from %li to %li, dataset %li has %i parts of %i completed, already reported %lu",
+ dataset_id, buffer->current_id, dataset_id - (buffer->ring_size - buffer->drop_buffers), buffer->current_id, buffer->n_fragments[buffer_id], buffer->fragments_per_dataset, buffer->n_ready);
// FIXME: Send semi-complete buffers further?
// FIXME: Or shall we drop more if larger buffers are allocated?
if ((dataset_id - buffer->current_id) > 2 * buffer->ring_size) {
memset(buffer->n_fragments, 0, buffer->ring_size * sizeof(_Atomic guint));
+ // FIXME: Can finally received old buffer hit the counter here? Locking?
buffer->current_id = dataset_id;
} else {
for (guint i = buffer->current_id; i <= (dataset_id - (buffer->ring_size - buffer->drop_buffers)); i++)
buffer->n_fragments[i%buffer->ring_size] = 0;
+ // FIXME: Can finally received old buffers hit the counters here? Locking?
buffer->current_id = dataset_id - (buffer->ring_size - buffer->drop_buffers) + 1;
}
+ // FIXME: Can other threads obsolete the buffer meanwhile? This is not single threaded any more
if (buffer->n_fragments[buffer->current_id%buffer->ring_size] == buffer->fragments_per_dataset)
ready = TRUE;
@@ -145,6 +162,7 @@ gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, gu
);
*/
+ // FIXME: What if buffer obsolete meanwhile by other threads. We are not single threaded
uint8_t *dataset_buffer = buffer->ring_buffer + buffer_id * buffer->dataset_size;
if (buffer->n_dims == 2) {
uint8_t *fragment_buffer = dataset_buffer +
@@ -161,20 +179,25 @@ gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, gu
}
// FIXME: Sanity checks: verify is not a dublicate fragment?
- atomic_fetch_add(&buffer->n_fragments[buffer_id], 1);
+ // This stuff is anyway is single-threaded which is a problem.
+ //atomic_fetch_add(&buffer->n_fragments[buffer_id], 1);
+ buffer->n_fragments[buffer_id]++;
if (buffer->n_fragments[buffer_id] == buffer->fragments_per_dataset) {
// FIXME: what about a complete dataset blocked by earlier one with only a few framents missing?
if (dataset_id == buffer->current_id)
ready = TRUE;
- else if ((buffer->latency_buffers)&&(dataset_id >= (buffer->current_id + buffer->latency_buffers)))
- ready = ufo_roof_buffer_skip_to_ready(buffer);
+ else if ((buffer->latency_buffers)&&(dataset_id >= (buffer->current_id + buffer->latency_buffers))) {
+ ready = roof_buffer_skip_to_ready(buffer);
+ }
+
+ if (ready) buffer->n_ready++;
}
-
+
return ready;
}
-gboolean ufo_roof_buffer_skip_to_ready(UfoRoofBuffer *buffer) {
+gboolean roof_buffer_skip_to_ready(RoofBuffer *buffer) {
for (guint i = 0; i < buffer->ring_size; i++) {
guint64 id = buffer->current_id + i;
guint buffer_id = id % buffer->ring_size;
@@ -184,7 +207,7 @@ gboolean ufo_roof_buffer_skip_to_ready(UfoRoofBuffer *buffer) {
return TRUE;
}
-// printf("Skipping event %lu (%u), only %u of %u fragments are ready\n", id, buffer_id, buffer->n_fragments[buffer_id], buffer->fragments_per_dataset);
+ printf("Skipping dataset %lu (%u), only %u of %u fragments are ready\n", id, buffer_id, buffer->n_fragments[buffer_id], buffer->fragments_per_dataset);
buffer->n_fragments[buffer_id] = 0;
}
@@ -192,7 +215,7 @@ gboolean ufo_roof_buffer_skip_to_ready(UfoRoofBuffer *buffer) {
}
-gboolean ufo_roof_buffer_get_dataset(UfoRoofBuffer *buffer, gpointer output_buffer, gulong *seqid, GError **error) {
+gboolean roof_buffer_get_dataset(RoofBuffer *buffer, gpointer output_buffer, gulong *seqid, GError **error) {
guint buffer_id = buffer->current_id % buffer->ring_size;
void *dataset_buffer = buffer->ring_buffer + buffer_id * buffer->dataset_size;
@@ -207,3 +230,23 @@ gboolean ufo_roof_buffer_get_dataset(UfoRoofBuffer *buffer, gpointer output_buff
return TRUE;
}
+
+gboolean roof_buffer_wait_dataset(RoofBuffer *buffer, gpointer output_buffer, gulong *seqid, gulong timeout, GError **error) {
+ struct timespec start_time, current_time;
+ clock_gettime(CLOCK_REALTIME, &start_time);
+
+ guint buffer_id = buffer->current_id % buffer->ring_size;
+
+ // FIXME: what about a complete dataset blocked by earlier one with only a few framents missing?
+ while (buffer->n_fragments[buffer_id] < buffer->fragments_per_dataset) {
+ // FIXME: get some sleep? How we can interrupt it?
+ // just small nanosleep?
+ if (timeout) {
+ clock_gettime(CLOCK_REALTIME, &current_time);
+ gulong wait = (current_time.tv_sec - start_time.tv_sec) * 1000000 + (current_time.tv_nsec - start_time.tv_nsec) / 1000;
+ if (wait > timeout) return FALSE;
+ }
+ }
+
+ return roof_buffer_get_dataset(buffer, output_buffer, seqid, error);
+}
diff --git a/src/ufo-roof-buffer.h b/src/roof-buffer.h
index 8e9c00b..a95dbaa 100644
--- a/src/ufo-roof-buffer.h
+++ b/src/roof-buffer.h
@@ -1,20 +1,24 @@
-#ifndef __UFO_ROOF_BUFFER_H
-#define __UFO_ROOF_BUUFER_H
+#ifndef __ROOF_BUFFER_H
+#define __ROOF_BUFFER_H
// This IS harmful! Just for testing
-//#define UFO_ROOF_INDEPENDENT_STREAMS
+#define ROOF_INDEPENDENT_STREAMS
#include <stdatomic.h>
+#include <stdint.h>
+#include "ufo-roof-config.h"
-
-struct _UfoRoofBuffer {
+struct _RoofBuffer {
guint64 current_id; // The ID of the first (active) dataset in the buffer
-#ifdef UFO_ROOF_INDEPENDENT_STREAMS
+#ifdef ROOF_INDEPENDENT_STREAMS
guint64 *first_id; // The ID of the first received dataset (used for numbering), -1 means not yet known
#else
guint64 first_id; // The ID of the first received dataset (used for numbering), -1 means not yet known
#endif
+ guint64 *last_id;
+ guint64 n_ready;
+ guint n_streams;
guint ring_size; // Number of datasets to buffer
guint drop_buffers; // If we need to catch up
guint latency_buffers; // we skip incomplete buffers if current_id + latency_buffers is ready
@@ -35,13 +39,14 @@ struct _UfoRoofBuffer {
guint fragments_per_stream; // Number of packets in each of data streams (used to compute when dataset is ready)
};
-typedef struct _UfoRoofBuffer UfoRoofBuffer;
+typedef struct _RoofBuffer RoofBuffer;
-UfoRoofBuffer *ufo_roof_buffer_new(UfoRoofConfig *cfg, guint n_dims, guint max_datasets, GError **error);
-void ufo_roof_buffer_free(UfoRoofBuffer *buf);
+RoofBuffer *roof_buffer_new(RoofConfig *cfg, guint n_dims, guint max_datasets, GError **error);
+void roof_buffer_free(RoofBuffer *buf);
-gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, guint64 fragment_id, gconstpointer fragment, GError **error);
-gboolean ufo_roof_buffer_skip_to_ready(UfoRoofBuffer *buffer);
-gboolean ufo_roof_buffer_get_dataset(UfoRoofBuffer *buffer, gpointer output_buffer, gulong *seqid, GError **error);
+gboolean roof_buffer_set_fragment(RoofBuffer *buffer, guint stream_id, guint64 fragment_id, gconstpointer fragment, GError **error);
+gboolean roof_buffer_skip_to_ready(RoofBuffer *buffer);
+gboolean roof_buffer_get_dataset(RoofBuffer *buffer, gpointer output_buffer, gulong *seqid, GError **error);
+gboolean roof_buffer_wait_dataset(RoofBuffer *buffer, gpointer output_buffer, gulong *seqid, gulong timeout, GError **error);
#endif
diff --git a/src/ufo-roof-config.c b/src/roof-config.c
index 944ee31..189c08f 100644
--- a/src/ufo-roof-config.c
+++ b/src/roof-config.c
@@ -34,14 +34,14 @@
typedef struct {
- UfoRoofConfig cfg;
+ RoofConfig cfg;
JsonParser *parser;
-} UfoRoofConfigPrivate;
+} RoofConfigPrivate;
-void ufo_roof_config_free(UfoRoofConfig *cfg) {
+void roof_config_free(RoofConfig *cfg) {
if (cfg) {
- UfoRoofConfigPrivate *priv = (UfoRoofConfigPrivate*)cfg;
+ RoofConfigPrivate *priv = (RoofConfigPrivate*)cfg;
if (priv->parser)
g_object_unref (priv->parser);
@@ -50,9 +50,9 @@ void ufo_roof_config_free(UfoRoofConfig *cfg) {
}
}
-UfoRoofConfig *ufo_roof_config_new(const char *config, UfoRoofConfigFlags flags, GError **error) {
- UfoRoofConfigPrivate *priv;
- UfoRoofConfig *cfg;
+RoofConfig *roof_config_new(const char *config, RoofConfigFlags flags, GError **error) {
+ RoofConfigPrivate *priv;
+ RoofConfig *cfg;
// JsonNode *node;
JsonObject *root = NULL;
@@ -67,10 +67,10 @@ UfoRoofConfig *ufo_roof_config_new(const char *config, UfoRoofConfigFlags flags,
GError *gerr = NULL;
- priv = (UfoRoofConfigPrivate*)malloc(sizeof(UfoRoofConfigPrivate));
- if (!priv) roof_new_error(error, "Can't allocate UfoRoofConfig");
+ priv = (RoofConfigPrivate*)malloc(sizeof(RoofConfigPrivate));
+ if (!priv) roof_new_error(error, "Can't allocate RoofConfig");
- memset(priv, 0, sizeof(UfoRoofConfigPrivate));
+ memset(priv, 0, sizeof(RoofConfigPrivate));
// Set defaults
cfg = &priv->cfg;
@@ -87,8 +87,8 @@ UfoRoofConfig *ufo_roof_config_new(const char *config, UfoRoofConfigFlags flags,
cfg->port = 52067;
cfg->n_streams = 1;
cfg->protocol = "udp";
- cfg->network_timeout = 10000000;
- cfg->header_size = sizeof(UfoRoofPacketHeader);
+ cfg->network_timeout = 100000000; // FIXME: remove 0
+ cfg->header_size = sizeof(RoofPacketHeader);
cfg->payload_size = 0;
cfg->max_packet_size = 0;
cfg->max_packets = 100;
@@ -96,6 +96,7 @@ UfoRoofConfig *ufo_roof_config_new(const char *config, UfoRoofConfigFlags flags,
cfg->buffer_size = 2;
cfg->latency_buffers = 0;
cfg->drop_buffers = 0;
+ cfg->sockets_per_thread = 4;
// Read configuration
@@ -104,7 +105,7 @@ UfoRoofConfig *ufo_roof_config_new(const char *config, UfoRoofConfigFlags flags,
if (gerr != NULL) {
g_propagate_prefixed_error(error, gerr, "Error parsing JSON file (%s) with ROOF configuration: ", config);
- ufo_roof_config_free(cfg);
+ roof_config_free(cfg);
return NULL;
}
@@ -119,7 +120,7 @@ UfoRoofConfig *ufo_roof_config_new(const char *config, UfoRoofConfigFlags flags,
roof_config_node_get(performance, root, object, "performance");
roof_config_node_get(data, root, object, "data");
- if (flags&UFO_ROOF_CONFIG_SIMULATION)
+ if (flags&ROOF_CONFIG_SIMULATION)
roof_config_node_get(simulation, root, object, "simulation");
}
@@ -135,12 +136,12 @@ UfoRoofConfig *ufo_roof_config_new(const char *config, UfoRoofConfigFlags flags,
if ((cfg->sample_rate)||(cfg->imaging_rate)) {
if ((!cfg->sample_rate)||(!cfg->imaging_rate)||(cfg->sample_rate%cfg->imaging_rate)) {
- ufo_roof_config_free(cfg);
+ roof_config_free(cfg);
roof_new_error(error, "Invalid sample (%u) and imaging (%u) rates are specified", cfg->sample_rate, cfg->imaging_rate);
}
if ((json_object_get_member(hardware, "samples_per_rotation"))&&(cfg->samples_per_rotation != (cfg->sample_rate / cfg->imaging_rate))) {
- ufo_roof_config_free(cfg);
+ roof_config_free(cfg);
roof_new_error(error, "The specified samples-per-rotation (%u) doesn't match sample/imaging rates (%u / %u)", cfg->samples_per_rotation, cfg->sample_rate, cfg->imaging_rate);
}
@@ -148,7 +149,7 @@ UfoRoofConfig *ufo_roof_config_new(const char *config, UfoRoofConfigFlags flags,
}
if ((cfg->bit_depth%8)||(cfg->bit_depth > 32)) {
- ufo_roof_config_free(cfg);
+ roof_config_free(cfg);
roof_new_error(error, "Invalid bit-depth (%u) is configured, only 8, 16, 24, 32 is currently supported", cfg->bit_depth);
}
@@ -170,13 +171,13 @@ UfoRoofConfig *ufo_roof_config_new(const char *config, UfoRoofConfigFlags flags,
roof_config_node_get(cfg->dataset_size, network, int, "dataset_size");
if (!cfg->payload_size) {
- ufo_roof_config_free(cfg);
+ roof_config_free(cfg);
roof_new_error(error, "Packet payload and header size must be set");
}
- if ((cfg->header_size < sizeof(UfoRoofPacketHeader))&&(!strncmp(cfg->protocol, "udp", 3))) {
- ufo_roof_config_free(cfg);
- roof_new_error(error, "The header with packet id (%lu bytes) is expected for un-ordered protocols", sizeof(UfoRoofPacketHeader));
+ if ((cfg->header_size < sizeof(RoofPacketHeader))&&(!strncmp(cfg->protocol, "udp", 3))) {
+ roof_config_free(cfg);
+ roof_new_error(error, "The header with packet id (%lu bytes) is expected for un-ordered protocols", sizeof(RoofPacketHeader));
}
if (!cfg->dataset_size)
@@ -195,6 +196,7 @@ UfoRoofConfig *ufo_roof_config_new(const char *config, UfoRoofConfigFlags flags,
roof_config_node_get(cfg->buffer_size, performance, int, "buffer_size");
roof_config_node_get(cfg->drop_buffers, performance, int, "drop_buffers");
roof_config_node_get(cfg->latency_buffers, performance, int, "latency_buffers");
+ roof_config_node_get(cfg->sockets_per_thread, performance, int, "sockets_per_thread");
}
@@ -204,13 +206,13 @@ UfoRoofConfig *ufo_roof_config_new(const char *config, UfoRoofConfigFlags flags,
// Dataset should be split in an integer number of network packets (we don't expect data from different datasets in one packet at the moment)
if ((cfg->dataset_size % cfg->payload_size)||(fragments_per_dataset%cfg->n_streams)) {
- ufo_roof_config_free(cfg);
+ roof_config_free(cfg);
roof_new_error(error, "Inconsistent ROOF configuration: dataset_size=%u, packet_size=%u, data_streams=%u", cfg->dataset_size, cfg->payload_size, cfg->n_streams);
}
// Packet should contain an integer number of complete projections (their parts provided by a single module)
if ((cfg->roof_mode)&&(cfg->payload_size % (cfg->channels_per_module * (cfg->bit_depth / 8)))) {
- ufo_roof_config_free(cfg);
+ roof_config_free(cfg);
roof_new_error(error, "Inconsistent ROOF configuration: packet_size=%u, projection_size=%u (%u channels x %u bits)", cfg->payload_size, cfg->channels_per_module * (cfg->bit_depth / 8), cfg->channels_per_module, cfg->bit_depth);
}
@@ -219,12 +221,12 @@ UfoRoofConfig *ufo_roof_config_new(const char *config, UfoRoofConfigFlags flags,
if (hardware) {
if (cfg->n_modules != cfg->n_streams) {
- ufo_roof_config_free(cfg);
+ roof_config_free(cfg);
roof_new_error(error, "Currently, number of ROOF modules (%u) is exepcted to be equal to number of independent data streams (%u)", cfg->n_modules, cfg->n_streams);
}
if (cfg->dataset_size != (cfg->fan_projections * cfg->fan_bins * cfg->bit_depth / 8)) {
- ufo_roof_config_free(cfg);
+ roof_config_free(cfg);
roof_new_error(error, "Specified dataset size (%u) does not match ROOF configuration (modules: %u, channels-per-module: %u, bit-depth: %u, samples-per-rotation: %u)", cfg->dataset_size, cfg->n_modules, cfg->channels_per_module, cfg->bit_depth, cfg->samples_per_rotation);
}
}
diff --git a/src/ufo-roof-config.h b/src/roof-config.h
index 8718381..f28cb60 100644
--- a/src/ufo-roof-config.h
+++ b/src/roof-config.h
@@ -1,5 +1,5 @@
-#ifndef __UFO_ROOF_CONFIG_H
-#define __UFO_ROOF_CONFIG_H
+#ifndef __ROOF_CONFIG_H
+#define __ROOF_CONFIG_H
#include <glib.h>
@@ -39,7 +39,7 @@ typedef struct {
guint drop_buffers; // If we are slow and lost some buffers, we may drop more than minimally necessary to catch up.
guint latency_buffers; // We skip incomplete buffers if later (at least latency_buffer in future) dataset is already ready, 0 - never skip
guint network_timeout; // Maximum time (us) to wait for data on the socket
-
+ guint sockets_per_thread; // Number of sockets per thread. Optimally number of therads should not exceed number of CPU cores
@@ -50,15 +50,15 @@ typedef struct {
-} UfoRoofConfig;
+} RoofConfig;
typedef enum {
- UFO_ROOF_CONFIG_DEFAULT = 0,
- UFO_ROOF_CONFIG_SIMULATION = 1
-} UfoRoofConfigFlags;
+ ROOF_CONFIG_DEFAULT = 0,
+ ROOF_CONFIG_SIMULATION = 1
+} RoofConfigFlags;
-UfoRoofConfig *ufo_roof_config_new(const char *config, UfoRoofConfigFlags flags, GError **error);
-void ufo_roof_config_free(UfoRoofConfig *cfg);
+RoofConfig *roof_config_new(const char *config, RoofConfigFlags flags, GError **error);
+void roof_config_free(RoofConfig *cfg);
-#endif /* __UFO_ROOF_CONFIG_H */
+#endif /* __ROOF_CONFIG_H */
diff --git a/src/ufo-roof-error.h b/src/roof-error.h
index 5491f31..418645c 100644
--- a/src/ufo-roof-error.h
+++ b/src/roof-error.h
@@ -1,5 +1,5 @@
-#ifndef __UFO_ROOF_ERROR_H
-#define __UFO_ROOF_ERROR_H
+#ifndef __ROOF_ERROR_H
+#define __ROOF_ERROR_H
#include <ufo/ufo.h>
@@ -38,6 +38,9 @@
#define roof_setup_error(error, ...) \
roof_error(error, SETUP, __VA_ARGS__)
+#define roof_setup_error_with_retval(error, ...) \
+ roof_error_with_retval(error, retval, SETUP, __VA_ARGS__)
+
#define roof_new_error(error, ...) \
roof_error_with_retval(error, NULL, SETUP, __VA_ARGS__)
@@ -55,4 +58,4 @@
roof_error(error, SETUP, __VA_ARGS__)
-#endif /* __UFO_ROOF_ERROR_H */
+#endif /* __ROOF_ERROR_H */
diff --git a/src/ufo-roof-read-file.c b/src/roof-read-file.c
index 4ee11c6..da8d51c 100644
--- a/src/ufo-roof-read-file.c
+++ b/src/roof-read-file.c
@@ -1,5 +1,6 @@
#include <stdio.h>
#include <errno.h>
+#include <assert.h>
#include <stdint.h>
#include "glib.h"
@@ -8,64 +9,71 @@
#include "ufo-roof-read-file.h"
typedef struct {
- UfoRoofReadInterface iface;
+ RoofReadInterface iface;
+
+ RoofConfig *cfg;
- UfoRoofConfig *cfg;
-
gchar *fname;
FILE *fd;
-} UfoRoofReadFile;
+ uint8_t *buf;
+} RoofReadFile;
+
+static void roof_read_file_free(RoofReadInterface *iface) {
+ RoofReadFile *reader = (RoofReadFile*)iface;
-static void ufo_roof_read_file_free(UfoRoofReadInterface *iface) {
- UfoRoofReadFile *reader = (UfoRoofReadFile*)iface;
-
if (reader) {
if (reader->fname)
g_free(reader->fname);
if (reader->fd)
fclose(reader->fd);
-
+
+ if (reader->buf)
+ free(reader->buf);
free(reader);
}
}
-static guint ufo_roof_read_file(UfoRoofReadInterface *iface, uint8_t *buffers, GError **error) {
- UfoRoofReadFile *reader = (UfoRoofReadFile*)iface;
- UfoRoofConfig *cfg = reader->cfg;
+static guint roof_read_file(RoofReadInterface *iface, uint8_t **buffers, GError **error) {
+ RoofReadFile *reader = (RoofReadFile*)iface;
+ RoofConfig *cfg = reader->cfg;
+
+ assert(iface);
+ assert(buffers);
size_t bytes = 0;
size_t packet_size = cfg->header_size + cfg->payload_size;
size_t expected = cfg->max_packets * packet_size;
while ((!feof(reader->fd))&&(!ferror(reader->fd))&&(bytes < expected)) {
- size_t ret = fread(buffers + bytes, 1, expected - bytes, reader->fd);
+ size_t ret = fread(reader->buf + bytes, 1, expected - bytes, reader->fd);
bytes += ret;
}
guint packets = bytes / packet_size;
-
+
if (ferror(reader->fd)) {
roof_network_error_with_retval(error, 0, "read failed, error %i", ferror(reader->fd));
} else if ((feof(reader->fd))&&(bytes % packet_size)) {
roof_network_error_with_retval(error, packets, "extra data in the end of input");
}
-
+
+ *buffers = reader->buf;
return packets;
}
-UfoRoofReadInterface *ufo_roof_read_file_new(UfoRoofConfig *cfg, const char *path, guint file_id, GError **error) {
- UfoRoofReadFile *reader = (UfoRoofReadFile*)calloc(1, sizeof(UfoRoofReadFile));
- if (!reader) roof_new_error(error, "Can't allocate UfoRoofReadFile");
+RoofReadInterface *roof_read_file_new(RoofConfig *cfg, const char *path, guint file_id, GError **error) {
+ RoofReadFile *reader = (RoofReadFile*)calloc(1, sizeof(RoofReadFile));
+ if (!reader) roof_new_error(error, "Can't allocate RoofReadFile");
// FIXME: Shall we jump if max_packet_size > header+payload (or will be extra data included in the data files)? Report error for now.
if ((cfg->header_size + cfg->payload_size) != cfg->max_packet_size)
- roof_new_error(error, "packet_size (%u) should be equal to max_packet_size (%u) if UfoRoofReadFile is used", cfg->header_size + cfg->payload_size, cfg->max_packet_size);
+ roof_new_error(error, "packet_size (%u) should be equal to max_packet_size (%u) if RoofReadFile is used", cfg->header_size + cfg->payload_size, cfg->max_packet_size);
reader->cfg = cfg;
- reader->iface.close = ufo_roof_read_file_free;
- reader->iface.read =ufo_roof_read_file;
+ reader->iface.close = roof_read_file_free;
+ reader->iface.read =roof_read_file;
reader->fname = g_strdup_printf(path, file_id);
if (!reader->fname) {
@@ -80,5 +88,11 @@ UfoRoofReadInterface *ufo_roof_read_file_new(UfoRoofConfig *cfg, const char *pat
roof_new_error(error, "Can't open file %i at path %s", file_id, path);
}
- return (UfoRoofReadInterface*)reader;
+ reader->buf = (uint8_t*)malloc(cfg->max_packets * (cfg->header_size + cfg->payload_size));
+ if (!reader->buf) {
+ roof_read_file_free((RoofReadInterface*)reader);
+ roof_new_error(error, "Can't allocate file buffer");
+ }
+
+ return (RoofReadInterface*)reader;
}
diff --git a/src/roof-read-file.h b/src/roof-read-file.h
new file mode 100644
index 0000000..60f60ba
--- /dev/null
+++ b/src/roof-read-file.h
@@ -0,0 +1,8 @@
+#ifndef __ROOF_READ_FILE_H
+#define __ROOF_READ_FILE_H
+
+#include "ufo-roof-read.h"
+
+RoofReadInterface *roof_read_file_new(RoofConfig *cfg, const char *path, guint file_id, GError **error);
+
+#endif
diff --git a/src/ufo-roof-read-socket.c b/src/roof-read-socket.c
index e8f7ce4..1b98b44 100644
--- a/src/ufo-roof-read-socket.c
+++ b/src/roof-read-socket.c
@@ -2,6 +2,7 @@
#include <stdio.h>
#include <unistd.h>
+#include <assert.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/socket.h>
@@ -13,48 +14,51 @@
#include "ufo-roof-read-socket.h"
typedef struct {
- UfoRoofReadInterface iface;
+ RoofReadInterface iface;
+
+ RoofConfig *cfg;
- UfoRoofConfig *cfg;
int socket;
-} UfoRoofReadSocket;
+ struct mmsghdr *msg;
+ struct iovec *msgvec;
+ uint8_t *buf;
+
+ int libvma;
+} RoofReadSocket;
+
+static void roof_read_socket_free(RoofReadInterface *iface) {
+ RoofReadSocket *reader = (RoofReadSocket*)iface;
-static void ufo_roof_read_socket_free(UfoRoofReadInterface *iface) {
- UfoRoofReadSocket *reader = (UfoRoofReadSocket*)iface;
-
if (reader) {
if (reader->socket >= 0)
close(reader->socket);
+ if (reader->msgvec)
+ free(reader->msgvec);
+ if (reader->msg)
+ free(reader->msg);
+ if (reader->buf)
+ free(reader->buf);
free(reader);
}
}
-static guint ufo_roof_read_socket(UfoRoofReadInterface *iface, uint8_t *buf, GError **error) {
+static guint roof_read_socket(RoofReadInterface *iface, uint8_t **buf, GError **error) {
int packets;
struct timespec timeout_ts;
- UfoRoofReadSocket *reader = (UfoRoofReadSocket*)iface;
- UfoRoofConfig *cfg = reader->cfg;
+ assert(iface);
+ assert(buf);
- struct mmsghdr msg[cfg->max_packets];
- struct iovec msgvec[cfg->max_packets];
+ RoofReadSocket *reader = (RoofReadSocket*)iface;
+ RoofConfig *cfg = reader->cfg;
timeout_ts.tv_sec = cfg->network_timeout / 1000000;
timeout_ts.tv_nsec = 1000 * (cfg->network_timeout % 1000000);
- // FIXME: Is it optimal? Auto-tune max_packets? Combine read & build?
- memset(msg, 0, sizeof(msg));
- memset(msgvec, 0, sizeof(msgvec));
- for (guint i = 0; i < cfg->max_packets; i++) {
- msgvec[i].iov_base = buf + i * cfg->max_packet_size;
- msgvec[i].iov_len = cfg->max_packet_size;
- msg[i].msg_hdr.msg_iov = &msgvec[i];
- msg[i].msg_hdr.msg_iovlen = 1;
- }
//retry:
// Timeout seems broken, see BUGS in 'recvmmsg' bugs page
- packets = recvmmsg(reader->socket, msg, reader->cfg->max_packets, MSG_WAITFORONE, &timeout_ts);
+ packets = recvmmsg(reader->socket, reader->msg, reader->cfg->max_packets, MSG_WAITFORONE, &timeout_ts);
if (packets < 0) roof_network_error_with_retval(error, 0, "recvmmsg failed, error %i", errno);
/*
@@ -64,25 +68,25 @@ static guint ufo_roof_read_socket(UfoRoofReadInterface *iface, uint8_t *buf, GEr
}
*/
+ *buf = reader->buf;
return (guint)packets;
}
-
-UfoRoofReadInterface *ufo_roof_read_socket_new(UfoRoofConfig *cfg, guint id, GError **error) {
+RoofReadInterface *roof_read_socket_new(RoofConfig *cfg, guint id, GError **error) {
int err;
int port = cfg->port + id;
char port_str[16];
- const char *addr_str = "0.0.0.0";
+ const char *addr_str = "192.168.100.8";
struct addrinfo sockaddr_hints;
struct addrinfo *sockaddr_info;
- UfoRoofReadSocket *reader = (UfoRoofReadSocket*)calloc(1, sizeof(UfoRoofReadSocket));
- if (!reader) roof_new_error(error, "Can't allocate UfoRoofReadSocket");
+ RoofReadSocket *reader = (RoofReadSocket*)calloc(1, sizeof(RoofReadSocket));
+ if (!reader) roof_new_error(error, "Can't allocate RoofReadSocket");
reader->cfg = cfg;
- reader->iface.close = ufo_roof_read_socket_free;
- reader->iface.read =ufo_roof_read_socket;
-
+ reader->iface.close = roof_read_socket_free;
+ reader->iface.read =roof_read_socket;
+
snprintf(port_str, sizeof(port_str), "%d", port);
port_str[sizeof(port_str) / sizeof(port_str[0]) - 1] = '\0';
@@ -121,14 +125,35 @@ UfoRoofReadInterface *ufo_roof_read_socket_new(UfoRoofConfig *cfg, guint id, GEr
}
/*
- // Send ping request to force initialization
+ // Check that FPGA module is ready
char msg[4];
addr_str = "192.168.34.83";
getaddrinfo(addr_str, port_str, &sockaddr_hints, &sockaddr_info);
sendto(reader->socket, msg, sizeof(msg), 0, sockaddr_info->ai_addr, sockaddr_info->ai_addrlen);
*/
-
freeaddrinfo(sockaddr_info);
- return (UfoRoofReadInterface*)reader;
+ if (reader->libvma) {
+ } else {
+ reader->buf = (uint8_t*)malloc(cfg->max_packets * cfg->max_packet_size);
+ reader->msg = (struct mmsghdr*)malloc(cfg->max_packets * sizeof(struct mmsghdr));
+ reader->msgvec = (struct iovec*)malloc(cfg->max_packets * sizeof(struct iovec));
+
+ if ((!reader->buf)||(!reader->msg)||(!reader->msgvec)) {
+ roof_read_socket_free((RoofReadInterface*)reader);
+ roof_new_error(error, "Can't allocate socket buffer");
+ }
+
+ memset(reader->msg, 0, cfg->max_packets * sizeof(struct mmsghdr));
+ memset(reader->msgvec, 0, cfg->max_packets * sizeof(struct iovec));
+ for (guint i = 0; i < cfg->max_packets; i++) {
+ reader->msgvec[i].iov_base = reader->buf + i * cfg->max_packet_size;
+ reader->msgvec[i].iov_len = cfg->max_packet_size;
+ reader->msg[i].msg_hdr.msg_iov = &reader->msgvec[i];
+ reader->msg[i].msg_hdr.msg_iovlen = 1;
+ }
+ }
+
+
+ return (RoofReadInterface*)reader;
}
diff --git a/src/roof-read-socket.h b/src/roof-read-socket.h
new file mode 100644
index 0000000..34a98e8
--- /dev/null
+++ b/src/roof-read-socket.h
@@ -0,0 +1,8 @@
+#ifndef __ROOF_READ_SOCKET_H
+#define __ROOF_READ_SOCKET_H
+
+#include "ufo-roof-read.h"
+
+RoofReadInterface *roof_read_socket_new(RoofConfig *cfg, guint id, GError **error);
+
+#endif
diff --git a/src/roof-read.c b/src/roof-read.c
new file mode 100644
index 0000000..18a7508
--- /dev/null
+++ b/src/roof-read.c
@@ -0,0 +1,107 @@
+#include <stdio.h>
+
+#include "roof.h"
+#include "roof-read.h"
+
+RoofReadContext *roof_read_context_new(RoofConfig *cfg, RoofReadInterface *rdi, GError **error) {
+ guint i;
+ GError *gerr = NULL;
+
+ RoofReadContext *rdc = (RoofReadContext*)calloc(1, sizeof(RoofReadContext));
+ if (!rdc) roof_new_error(error, "Can't allocate RoofReadContext");
+
+ rdc->cfg = cfg;
+ rdc->rdi = rdi;
+
+ return rdc;
+}
+
+void roof_read_context_free(RoofReadContext *rdc) {
+ if (rdc) {
+ free(rdc);
+ }
+}
+
+void roof_read(RoofReadContext *rdc, RoofBufferInterface *bfi, GError *error) {
+ uint8_t *rdbuf = rdc->rdbuf;
+ guint n_packets = rdc->n_packets;
+ guint packet_id = rdc->packet_id;
+ guint fragment_id = rdc->fragment_id;
+ GError *gerr = NULL;
+
+ ReadRoofInterface *rdi = rdc->rdi;
+
+ if (rdc->rdbuf) {
+ rdbuf = rdc->rdbuf;
+ packets
+ else {
+ guint packets = rdi->read(rdi[stream_id], &rdbuf, &gerr);
+ if (gerr) roof_propagate_error(error, gerr, "roof_read: ");
+ packet_id = 0;
+ fragment_id = 0;
+ }
+
+ packet = rdbuf + packet_id * (size + padding
+
+ while (packet) {
+
+
+ while (rdbuf) {
+ //
+
+ for (guint i = 0; i < packets; i++) {
+ guint64 packet_id = 0;
+
+ // Otherwise considered consecutive and handled by the buffer
+ if (cfg->header_size >= sizeof(RoofPacketHeader)) {
+ RoofPacketHeader *pheader = ROOF_PACKET_HEADER(fragment);
+ packet_id = be64toh(pheader->packet_id) + 1;
+ } else {
+ // FIXME: consider consecutive
+ //fragment_id = ++buffer->stream_fragment[stream_id];
+ }
+
+ // FIXME: packet may contain fragments for multiple datasets
+ fragment_id = packet_id * fragments_per_packet;
+ guint64 dataset_id = (packet_id - 1) / cfg->fragments_per_stream;
+ guint64 fragment_id = (packet_id - 1) % cfg->fragments_per_stream;
+
+ // FIXME: verify that packet is consecutive
+ // if
+
+ // Drop packets of already skipped datasets
+ if (dataset_id < priv->current_id)
+ continue;
+
+ // FIXME: stop processing and return....
+ if (dataset_id < priv->current_id)
+
+ uint8_t *fragment_buffer = dataset_buffer +
+ stream_id * fragment_dims[0] + // x-coordinate
+ (fragment_id * fragment_dims[1]) * dataset_dims[0]; // y-coordinate
+
+ for (guint i = 0; i < buffer->fragment_dims[1]; ++i) {
+ memcpy(fragment_buffer + i * buffer->dataset_dims[0], (uint8_t*)fragment + i * buffer->fragment_dims[0], buffer->fragment_dims[0]);
+
+
+// buffer->last_id[stream_id] = dataset_id;
+
+ ready |= roof_buffer_set_fragment(buf, sid, packet_id, fragment, &gerr);
+ if (gerr) roof_print_error(gerr);
+
+ fragment += cfg->max_packet_size;
+
+
+ }
+
+
+ }
+
+
+
+
+ void *buf = roof_buffer_get_dataset(bfi, ?);
+
+ // Computing offsets each time, etc.
+ roof_buffer_set_fragment
+} \ No newline at end of file
diff --git a/src/roof-read.h b/src/roof-read.h
new file mode 100644
index 0000000..1ab846a
--- /dev/null
+++ b/src/roof-read.h
@@ -0,0 +1,45 @@
+#ifndef __ROOF_READ_H
+#define __ROOF_READ_H
+
+typedef struct _RoofRead RoofRead;
+
+#include "roof-config.h"
+
+G_BEGIN_DECLS
+
+typedef struct _RoofReadContext RoofReadContext;
+typedef struct _RoofReadInterface RoofReadInterface;
+typedef struct _RoofReadInterfaceSettings RoofReadInterfaceSettings;
+
+typedef guint (*RoofReaderRead)(RoofReadInterface *reader, uint8_t **buf, GError **error);
+typedef void (*RoofReaderClose)(RoofReadInterface *reader);
+
+struct _RoofReadInterfaceSettings {
+ guint padding; // Packet size + padding
+};
+
+struct _RoofReadInterface {
+ RoofReaderRead read;
+ RoofReaderClose close;
+
+ RoofReadInterfaceSettings settings;
+};
+
+struct _RoofReadContext {
+ RoofConfig *cfg;
+ RoofReadInterface *rdi;
+
+ void *rdbuf; // The buffer we are currently processing
+ guint n_packets; // Number of packets in the buffer
+ guint packet_id; // Last processed packet in the buffer
+ guint fragment_id; // Last processed fragment in the packet
+};
+
+RoofReadContext *roof_read_context_new(RoofConfig *cfg, RoofReadInterface *rdi, GError **error);
+void roof_read_context_free(RoofReadContext *ctx);
+const RoofReadInterfaceSettings *roof_read_get_settings(UFORoofRead *ctx, GError **error);
+
+
+G_END_DECLS
+
+#endif /* __ROOF_READ_H */
diff --git a/src/roof-thread.c b/src/roof-thread.c
new file mode 100644
index 0000000..570300f
--- /dev/null
+++ b/src/roof-thread.c
@@ -0,0 +1,95 @@
+#include <stdio.h>
+
+#include "roof.h"
+#include "roof-thread.h"
+
+
+RoofThreadContext *roof_thread_context_new(RoofConfig *cfg, Roof *roof, guint from, guint to, GError **error) {
+ GError *gerr = NULL;
+
+ RoofThreadContext *rdt = (RoofThreadContext*)calloc(1, sizeof(RoofThreadContext));
+ if (!rdt) roof_new_error(error, "Can't allocate RoofThreadContext");
+
+ rdt->cfg = cfg;
+ rdt->rdi = rdi;
+
+ return rdt;
+
+}
+
+void roof_thread_context_free(RoofThreadContext *rdt) {
+ if (rdt) {
+ free(rdt);
+ }
+}
+
+
+int roof_thread_read_socket(Roof *
+
+int roof_thread_read(HWThread thr, void *hwctx, int id, void *data) {
+ Roof *ctx = (Roof*)data;
+
+ RoofConfig *cfg = ctx->cfg;
+ RoofThreadContext *rdt = ctx->rdt[id];
+
+ guint dataset_dims[2] = { cfg->fan_bins * cfg->bit_depth / 8, cfg->fan_projections };
+ guint fragment_dims[2] = { cfg->channels_per_module * cfg->bit_depth / 8, ????cfg->payload_size / buffer->fragment_dims[0] };
+
+ packet_id
+ fragment_id =
+
+ for (guint stream_id = from; stream_id < to; stream_id++) {
+
+ uint8_t *rdbuf;
+ guint packets = priv->rdi[stream_id]->read(priv->rdi[stream_id], &rdbuf, &gerr);
+ if (gerr) roof_print_error(gerr);
+
+ for (guint i = 0; i < packets; i++) {
+ guint64 packet_id = 0;
+
+ // Otherwise considered consecutive and handled by the buffer
+ if (cfg->header_size >= sizeof(RoofPacketHeader)) {
+ RoofPacketHeader *pheader = ROOF_PACKET_HEADER(fragment);
+ packet_id = be64toh(pheader->packet_id) + 1;
+ } else {
+ // FIXME: consider consecutive
+ //fragment_id = ++buffer->stream_fragment[stream_id];
+ }
+
+ // FIXME: packet may contain fragments for multiple datasets
+ fragment_id = packet_id * fragments_per_packet;
+ guint64 dataset_id = (packet_id - 1) / cfg->fragments_per_stream;
+ guint64 fragment_id = (packet_id - 1) % cfg->fragments_per_stream;
+
+ // FIXME: verify that packet is consecutive
+ // if
+
+ // Drop packets of already skipped datasets
+ if (dataset_id < priv->current_id)
+ continue;
+
+ // FIXME: stop processing and return....
+ if (dataset_id < priv->current_id)
+
+ uint8_t *fragment_buffer = dataset_buffer +
+ stream_id * fragment_dims[0] + // x-coordinate
+ (fragment_id * fragment_dims[1]) * dataset_dims[0]; // y-coordinate
+
+ for (guint i = 0; i < buffer->fragment_dims[1]; ++i) {
+ memcpy(fragment_buffer + i * buffer->dataset_dims[0], (uint8_t*)fragment + i * buffer->fragment_dims[0], buffer->fragment_dims[0]);
+
+
+// buffer->last_id[stream_id] = dataset_id;
+
+ ready |= roof_buffer_set_fragment(buf, sid, packet_id, fragment, &gerr);
+ if (gerr) roof_print_error(gerr);
+
+ fragment += cfg->max_packet_size;
+
+
+ }
+
+ }
+
+}
+
diff --git a/src/roof-thread.h b/src/roof-thread.h
new file mode 100644
index 0000000..a180b29
--- /dev/null
+++ b/src/roof-thread.h
@@ -0,0 +1,30 @@
+#ifndef __ROOF_THREAD_H
+#define __ROOF_THREAD_H
+
+typedef struct _RoofThreadContext RoofThreadContext;
+
+#include "ufo-roof-read.h"
+
+struct _RoofThreadContext {
+ RoofConfig *cfg;
+ RoofBuffer *buf;
+ RoofReadInterface *rdi;
+ guint from, to; // Determines ports/files which are read by this thread (from is inclusive and to - exclusive)
+};
+
+/*
+RoofReadThread *guint roof_read_thread_new(RoofRead *rd, guint from, guint to, GError **error);
+void roof_read_thread_free(UFORoofReadThread *thr, GError **error);
+gboolean roof_read_thread_start(UFORoofReadThread *thr, GError **error);
+gboolean roof_read_thread_stop(UFORoofReadThread *thr, GError **error);
+*/
+
+int roof_thread_read(HWThread thr, void *hwctx, int id, void *data);
+
+RoofThreadContext *roof_thread_context_new(RoofConfig *cfg, RoofReadContext *rdc, guint from, guint to, GError **error);
+void roof_thread_context_free(RoofThreadContext *ctx);
+
+
+
+#endif /* __ROOF_THREAD_H */
+
diff --git a/src/roof.c b/src/roof.c
new file mode 100644
index 0000000..505e3aa
--- /dev/null
+++ b/src/roof.c
@@ -0,0 +1,133 @@
+void roof_init() {
+ hw_sched_init();
+}
+
+Roof *roof_new(UfoRoofConfig *cfg, GError **error) {
+ guint i;
+ GError *gerr = NULL;
+
+ Roof *ctx = (Roof*)calloc(1, sizeof(Roof));
+ if (!ctx) roof_new_error(error, "Can't allocate Roof context");
+
+ ctx->cfg = cfg;
+
+ ctx->n_threads = cfg->n_streams / cfg->sockets_per_thread;
+ if (cfg->n_streams % cfg->sockets_per_thread) ctx->n_threads++;
+
+ ctx->rdi = (RoofReadInterface**)calloc(cfg->n_streams, sizeof(RoofReadInterface*));
+ ctx->rdc = (RoofReadContext**)calloc(cfg->n_streams, sizeof(RoofReadContext*));
+ ctx->rdt = (RoofThreadContext**)calloc(ctx->n_threads, sizeof(RoofThreadContext*));
+ ctx->sched = hw_sched_create(cfg->n_threads);
+
+ if ((!ctx->rdi)||(!ctx->rdc)||(!ctx->rdt)||(!ctx->sched)) {
+ roof_free(ctx);
+ roof_setup_error(error, "Failed to allocate memory for various Roof contexts");
+ }
+
+ return ctx;
+}
+
+void roof_configure_simulation(Roof *ctx, const gchar *path, guint first_file_number, GError **error) {
+ assert(ctx);
+
+ ctx->simulate = 1;
+ ctx->path = path;
+ ctx->first_file_number = first_file_number;
+}
+
+void roof_configure_stop_mode(Roof *ctx, const gulong max, GError **error) {
+ assert(ctx);
+
+ ctx->max_datasets = max;
+}
+
+
+void roof_setup(Roof *ctx, GError **error) {
+ guint i;
+ GError *gerr = NULL;
+
+ assert(ctx);
+
+ RoofConfig *cfg = ctx->cfg;
+
+/*
+ ctx->buf = roof_buffer_new(cfg, 2, ctx->max_datasets, &gerr);
+ if ((gerr)||(!ctx->buf))
+ roof_propagate_error(error, gerr, "roof_buffer_new: ");
+*/
+
+ for (i = 0; i < cfg->n_streams; i++) {
+ if (ctx->simulate) {
+ if (!ctx->path)
+ roof_setup_error(error, "Path to simulated data should be specified");
+
+ ctx->rdi[i] = roof_read_file_new(cfg, ctx->path, ctx->first_file_number + i, &gerr);
+ } else {
+ ctx->rdi[i] = roof_read_socket_new(cfg, i, &gerr);
+ }
+
+ if (!ctx->rdi[i])
+ roof_propagate_error(error, gerr, "roof_read_interface_new: ");
+
+ ctx->rdc[i] = roof_read_context_new(cfg, ctx->rdi[i], &gerr);
+ if (!ctx->rdc[i])
+ roof_propagate_error(error, gerr, "roof_read_context_new: ");
+ }
+
+ // We try to distribute sockets uniformly respecting sockets_per_thread as maximum limit
+ guint extra = 0, sockets_per_thread = cfg->n_streams / ctx->n_threads;
+ if (cfg->n_streams % ctx->n_threads) extra = cfg->n_streams - ctx->n_threads * sockets_per_thread;
+
+ guint from, to;
+ for (i = 0; i < ctx->n_threads; i++) {
+ guint to = from + sockets_per_thread;
+ if (i < extra) to++;
+
+ ctx->thr[i]= roof_thread_new(cfg, ctx, from, to, &gerr);
+ if (!ctx->thr[i]) roof_propagate_error(error, gerr, "roof_thread_new (%i): ", i);
+ }
+}
+
+
+void roof_free(Roof *ctx) {
+ guint i;
+
+ if (ctx) {
+ RoofConfig *cfg = ctx->cfg;
+ if (ctx->sched) hw_sched_destroy(priv->sched);
+
+ if (ctx->rdt) {
+ for (i = 0; i < ctx->n_threads; i++)
+ if (ctx->rdt[i]) roof_thread_context_free(ctx->rdt[i]);
+ free(ctx->rdt);
+ }
+
+ if (ctx->rdc) {
+ for (i = 0; i < cfg->n_streams; i++)
+ if (ctx->rdc[i]) roof_read_context_free(ctx->rdc[i]);
+ free(ctx->rdc);
+ }
+
+ if (ctx->rdi) {
+ for (i = 0; i < cfg->n_streams; i++)
+ if (ctx->rdi[i]) roof_read_interface_free(ctx->rdi[i]);
+ free(ctx->rdi);
+ }
+
+ if (ctx->buf) roof_buffer_free(ctx->buf);
+ free(ctx);
+ }
+}
+
+
+void roof_read_dataset(Roof *ctx, void *buffer, GError **error) {
+ priv->current_dataset;
+ priv->current_buffer = buffer;
+
+ err = hw_sched_schedule_thread_task(sched, (void*)ctx, roof_thread_read);
+ if (!err) err = hw_sched_wait_task(sched);
+ if (err) { fprintf(stderr, "Error %i scheduling init threads", err); exit(-1); }
+}
+
+
+}
diff --git a/src/roof.h b/src/roof.h
new file mode 100644
index 0000000..316e8ed
--- /dev/null
+++ b/src/roof.h
@@ -0,0 +1,46 @@
+#ifndef __ROOF_H
+#define __ROOF_H
+
+typedef struct _Roof Roof;
+
+#include "roof-config.h"
+#include "roof-buffer.h"
+#include "roof-read.h"
+#include "roof-thread.h"
+
+struct _Roof {
+ RoofConfig *cfg; // Parsed ROOF parameters
+ RoofBuffer *buf; // Ring buffer for incomming UDP packet
+ RoofReadInterface **rdi; // Reader interface abstraction, one per socket (no threading)
+ RoofReadContext **rdc; // Reader context: common structures, one per socket (no threading)
+ RoofThread **rdt; // Threading context: multiple reader contexts per thread
+
+ guint n_threads; // Number of schedulled threads
+ HWSched sched; // OpenMP-style thread scheduler
+
+ gboolean simulate; // Indicates if we are running in network or simulation modes
+ gchar *path; // UFO file path for simulation mode
+ guint first_file_number; // Number of a first simulated file (0 or 1)
+
+ guint max_datasets; // Number of datasets to read
+
+// guint64 announced; // For debugging
+// guint64 generated; // Total number for control
+
+// struct timespec last_fragment_timestamp;
+
+};
+
+
+Roof *roof_new(RoofConfig *cfg, GError **error);
+void roof_free(Roof *ctx);
+
+void roof_configure_simulation(Roof *ctx, const gchar *path, guint first_file_number, GError **error);
+void roof_configure_stop_mode(Roof *ctx, const gulong max, GError **error);
+//void roof_configure_writer(Roof *ctx, ...);
+//void roof_configure_filter(Roof *ctx, ...);
+void roof_setup(Roof *ctx, GError **error);
+
+void roof_read(Roof *ctx, void *buffer, GError **error);
+
+#endif \ No newline at end of file
diff --git a/src/save/memcpy.c b/src/save/memcpy.c
new file mode 100644
index 0000000..5c29d01
--- /dev/null
+++ b/src/save/memcpy.c
@@ -0,0 +1,344 @@
+/********************************************************************
+ ** File: memcpy.c
+ **
+ ** Copyright (C) 1999-2010 Daniel Vik
+ **
+ ** This software is provided 'as-is', without any express or implied
+ ** warranty. In no event will the authors be held liable for any
+ ** damages arising from the use of this software.
+ ** Permission is granted to anyone to use this software for any
+ ** purpose, including commercial applications, and to alter it and
+ ** redistribute it freely, subject to the following restrictions:
+ **
+ ** 1. The origin of this software must not be misrepresented; you
+ ** must not claim that you wrote the original software. If you
+ ** use this software in a product, an acknowledgment in the
+ ** use this software in a product, an acknowledgment in the
+ ** product documentation would be appreciated but is not
+ ** required.
+ **
+ ** 2. Altered source versions must be plainly marked as such, and
+ ** must not be misrepresented as being the original software.
+ **
+ ** 3. This notice may not be removed or altered from any source
+ ** distribution.
+ **
+ **
+ ** Description: Implementation of the standard library function memcpy.
+ ** This implementation of memcpy() is ANSI-C89 compatible.
+ **
+ ** The following configuration options can be set:
+ **
+ ** LITTLE_ENDIAN - Uses processor with little endian
+ ** addressing. Default is big endian.
+ **
+ ** PRE_INC_PTRS - Use pre increment of pointers.
+ ** Default is post increment of
+ ** pointers.
+ **
+ ** INDEXED_COPY - Copying data using array indexing.
+ ** Using this option, disables the
+ ** PRE_INC_PTRS option.
+ **
+ ** MEMCPY_64BIT - Compiles memcpy for 64 bit
+ ** architectures
+ **
+ **
+ ** Best Settings:
+ **
+ ** Intel x86: LITTLE_ENDIAN and INDEXED_COPY
+ **
+ *******************************************************************/
+
+
+
+/********************************************************************
+ ** Configuration definitions.
+ *******************************************************************/
+
+#define LITTLE_ENDIAN
+#define INDEXED_COPY
+
+
+/********************************************************************
+ ** Includes for size_t definition
+ *******************************************************************/
+
+#include <stddef.h>
+
+
+/********************************************************************
+ ** Typedefs
+ *******************************************************************/
+
+typedef unsigned char UInt8;
+typedef unsigned short UInt16;
+typedef unsigned int UInt32;
+#ifdef _WIN32
+typedef unsigned __int64 UInt64;
+#else
+typedef unsigned long long UInt64;
+#endif
+
+#ifdef MEMCPY_64BIT
+typedef UInt64 UIntN;
+#define TYPE_WIDTH 8L
+#else
+typedef UInt32 UIntN;
+#define TYPE_WIDTH 4L
+#endif
+
+
+/********************************************************************
+ ** Remove definitions when INDEXED_COPY is defined.
+ *******************************************************************/
+
+#if defined (INDEXED_COPY)
+#if defined (PRE_INC_PTRS)
+#undef PRE_INC_PTRS
+#endif /*PRE_INC_PTRS*/
+#endif /*INDEXED_COPY*/
+
+
+
+/********************************************************************
+ ** Definitions for pre and post increment of pointers.
+ *******************************************************************/
+
+#if defined (PRE_INC_PTRS)
+
+#define START_VAL(x) (x)--
+#define INC_VAL(x) *++(x)
+#define CAST_TO_U8(p, o) ((UInt8*)p + o + TYPE_WIDTH)
+#define WHILE_DEST_BREAK (TYPE_WIDTH - 1)
+#define PRE_LOOP_ADJUST - (TYPE_WIDTH - 1)
+#define PRE_SWITCH_ADJUST + 1
+
+#else /*PRE_INC_PTRS*/
+
+#define START_VAL(x)
+#define INC_VAL(x) *(x)++
+#define CAST_TO_U8(p, o) ((UInt8*)p + o)
+#define WHILE_DEST_BREAK 0
+#define PRE_LOOP_ADJUST
+#define PRE_SWITCH_ADJUST
+
+#endif /*PRE_INC_PTRS*/
+
+
+
+/********************************************************************
+ ** Definitions for endians
+ *******************************************************************/
+
+#if defined (LITTLE_ENDIAN)
+
+#define SHL >>
+#define SHR <<
+
+#else /* LITTLE_ENDIAN */
+
+#define SHL <<
+#define SHR >>
+
+#endif /* LITTLE_ENDIAN */
+
+
+
+/********************************************************************
+ ** Macros for copying words of different alignment.
+ ** Uses incremening pointers.
+ *******************************************************************/
+
+#define CP_INCR() { \
+ INC_VAL(dstN) = INC_VAL(srcN); \
+}
+
+#define CP_INCR_SH(shl, shr) { \
+ dstWord = srcWord SHL shl; \
+ srcWord = INC_VAL(srcN); \
+ dstWord |= srcWord SHR shr; \
+ INC_VAL(dstN) = dstWord; \
+}
+
+
+
+/********************************************************************
+ ** Macros for copying words of different alignment.
+ ** Uses array indexes.
+ *******************************************************************/
+
+#define CP_INDEX(idx) { \
+ dstN[idx] = srcN[idx]; \
+}
+
+#define CP_INDEX_SH(x, shl, shr) { \
+ dstWord = srcWord SHL shl; \
+ srcWord = srcN[x]; \
+ dstWord |= srcWord SHR shr; \
+ dstN[x] = dstWord; \
+}
+
+
+
+/********************************************************************
+ ** Macros for copying words of different alignment.
+ ** Uses incremening pointers or array indexes depending on
+ ** configuration.
+ *******************************************************************/
+
+#if defined (INDEXED_COPY)
+
+#define CP(idx) CP_INDEX(idx)
+#define CP_SH(idx, shl, shr) CP_INDEX_SH(idx, shl, shr)
+
+#define INC_INDEX(p, o) ((p) += (o))
+
+#else /* INDEXED_COPY */
+
+#define CP(idx) CP_INCR()
+#define CP_SH(idx, shl, shr) CP_INCR_SH(shl, shr)
+
+#define INC_INDEX(p, o)
+
+#endif /* INDEXED_COPY */
+
+
+#define COPY_REMAINING(count) { \
+ START_VAL(dst8); \
+ START_VAL(src8); \
+ \
+ switch (count) { \
+ case 7: INC_VAL(dst8) = INC_VAL(src8); \
+ case 6: INC_VAL(dst8) = INC_VAL(src8); \
+ case 5: INC_VAL(dst8) = INC_VAL(src8); \
+ case 4: INC_VAL(dst8) = INC_VAL(src8); \
+ case 3: INC_VAL(dst8) = INC_VAL(src8); \
+ case 2: INC_VAL(dst8) = INC_VAL(src8); \
+ case 1: INC_VAL(dst8) = INC_VAL(src8); \
+ case 0: \
+ default: break; \
+ } \
+}
+
+#define COPY_NO_SHIFT() { \
+ UIntN* dstN = (UIntN*)(dst8 PRE_LOOP_ADJUST); \
+ UIntN* srcN = (UIntN*)(src8 PRE_LOOP_ADJUST); \
+ size_t length = count / TYPE_WIDTH; \
+ \
+ while (length & 7) { \
+ CP_INCR(); \
+ length--; \
+ } \
+ \
+ length /= 8; \
+ \
+ while (length--) { \
+ CP(0); \
+ CP(1); \
+ CP(2); \
+ CP(3); \
+ CP(4); \
+ CP(5); \
+ CP(6); \
+ CP(7); \
+ \
+ INC_INDEX(dstN, 8); \
+ INC_INDEX(srcN, 8); \
+ } \
+ \
+ src8 = CAST_TO_U8(srcN, 0); \
+ dst8 = CAST_TO_U8(dstN, 0); \
+ \
+ COPY_REMAINING(count & (TYPE_WIDTH - 1)); \
+ \
+ return dest; \
+}
+
+
+
+#define COPY_SHIFT(shift) { \
+ UIntN* dstN = (UIntN*)((((UIntN)dst8) PRE_LOOP_ADJUST) & \
+ ~(TYPE_WIDTH - 1)); \
+ UIntN* srcN = (UIntN*)((((UIntN)src8) PRE_LOOP_ADJUST) & \
+ ~(TYPE_WIDTH - 1)); \
+ size_t length = count / TYPE_WIDTH; \
+ UIntN srcWord = INC_VAL(srcN); \
+ UIntN dstWord; \
+ \
+ while (length & 7) { \
+ CP_INCR_SH(8 * shift, 8 * (TYPE_WIDTH - shift)); \
+ length--; \
+ } \
+ \
+ length /= 8; \
+ \
+ while (length--) { \
+ CP_SH(0, 8 * shift, 8 * (TYPE_WIDTH - shift)); \
+ CP_SH(1, 8 * shift, 8 * (TYPE_WIDTH - shift)); \
+ CP_SH(2, 8 * shift, 8 * (TYPE_WIDTH - shift)); \
+ CP_SH(3, 8 * shift, 8 * (TYPE_WIDTH - shift)); \
+ CP_SH(4, 8 * shift, 8 * (TYPE_WIDTH - shift)); \
+ CP_SH(5, 8 * shift, 8 * (TYPE_WIDTH - shift)); \
+ CP_SH(6, 8 * shift, 8 * (TYPE_WIDTH - shift)); \
+ CP_SH(7, 8 * shift, 8 * (TYPE_WIDTH - shift)); \
+ \
+ INC_INDEX(dstN, 8); \
+ INC_INDEX(srcN, 8); \
+ } \
+ \
+ src8 = CAST_TO_U8(srcN, (shift - TYPE_WIDTH)); \
+ dst8 = CAST_TO_U8(dstN, 0); \
+ \
+ COPY_REMAINING(count & (TYPE_WIDTH - 1)); \
+ \
+ return dest; \
+}
+
+
+/********************************************************************
+ **
+ ** void *memcpy(void *dest, const void *src, size_t count)
+ **
+ ** Args: dest - pointer to destination buffer
+ ** src - pointer to source buffer
+ ** count - number of bytes to copy
+ **
+ ** Return: A pointer to destination buffer
+ **
+ ** Purpose: Copies count bytes from src to dest.
+ ** No overlap check is performed.
+ **
+ *******************************************************************/
+
+void *fast_memcpy(void *dest, const void *src, size_t count)
+{
+ UInt8* dst8 = (UInt8*)dest;
+ UInt8* src8 = (UInt8*)src;
+
+ if (count < 8) {
+ COPY_REMAINING(count);
+ return dest;
+ }
+
+ START_VAL(dst8);
+ START_VAL(src8);
+
+ while (((UIntN)dst8 & (TYPE_WIDTH - 1)) != WHILE_DEST_BREAK) {
+ INC_VAL(dst8) = INC_VAL(src8);
+ count--;
+ }
+
+ switch ((((UIntN)src8) PRE_SWITCH_ADJUST) & (TYPE_WIDTH - 1)) {
+ case 0: COPY_NO_SHIFT(); break;
+ case 1: COPY_SHIFT(1); break;
+ case 2: COPY_SHIFT(2); break;
+ case 3: COPY_SHIFT(3); break;
+#if TYPE_WIDTH > 4
+ case 4: COPY_SHIFT(4); break;
+ case 5: COPY_SHIFT(5); break;
+ case 6: COPY_SHIFT(6); break;
+ case 7: COPY_SHIFT(7); break;
+#endif
+ }
+}
diff --git a/src/save/memcpy.h b/src/save/memcpy.h
new file mode 100644
index 0000000..0714823
--- /dev/null
+++ b/src/save/memcpy.h
@@ -0,0 +1,63 @@
+/********************************************************************
+ ** File: memcpy.h
+ **
+ ** Copyright (C) 2005 Daniel Vik
+ **
+ ** This software is provided 'as-is', without any express or implied
+ ** warranty. In no event will the authors be held liable for any
+ ** damages arising from the use of this software.
+ ** Permission is granted to anyone to use this software for any
+ ** purpose, including commercial applications, and to alter it and
+ ** redistribute it freely, subject to the following restrictions:
+ **
+ ** 1. The origin of this software must not be misrepresented; you
+ ** must not claim that you wrote the original software. If you
+ ** use this software in a product, an acknowledgment in the
+ ** use this software in a product, an acknowledgment in the
+ ** product documentation would be appreciated but is not
+ ** required.
+ **
+ ** 2. Altered source versions must be plainly marked as such, and
+ ** must not be misrepresented as being the original software.
+ **
+ ** 3. This notice may not be removed or altered from any source
+ ** distribution.
+ **
+ **
+ ** Description: Implementation of the standard library function memcpy.
+ ** This implementation of memcpy() is ANSI-C89 compatible.
+ **
+ *******************************************************************/
+
+
+/********************************************************************
+ ** Includes for size_t definition
+ *******************************************************************/
+
+#include <stddef.h>
+
+
+/********************************************************************
+ **
+ ** void *memcpy(void *dest, const void *src, size_t count)
+ **
+ ** Args: dest - pointer to destination buffer
+ ** src - pointer to source buffer
+ ** count - number of bytes to copy
+ **
+ ** Return: A pointer to destination buffer
+ **
+ ** Purpose: Copies count bytes from src to dest. No overlap check
+ ** is performed.
+ **
+ *******************************************************************/
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+void *fast_memcpy(void *dest, const void *src, size_t count);
+
+#ifdef __cplusplus
+}
+#endif
diff --git a/src/save/ufo-roof-buffer-build-task.c b/src/save/ufo-roof-buffer-build-task.c
new file mode 100644
index 0000000..bdb7a7d
--- /dev/null
+++ b/src/save/ufo-roof-buffer-build-task.c
@@ -0,0 +1,474 @@
+/*
+ * Copyright (C) 2011-2015 Karlsruhe Institute of Technology
+ *
+ * This file is part of Ufo.
+ *
+ * This library is free software: you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation, either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include <stdio.h>
+#include <endian.h>
+#include <threads.h>
+
+#ifdef __APPLE__
+#include <OpenCL/cl.h>
+#else
+#include <CL/cl.h>
+#endif
+
+#include "ufo-roof.h"
+#include "ufo-roof-buffer.h"
+#include "ufo-roof-build-task.h"
+
+typedef enum {
+ BUILD_AUTO = 0,
+ BUILD_RAW,
+ BUILD_SINO,
+ BUILD_UFO
+} BuildType;
+
+struct _UfoRoofBuildTaskPrivate {
+ gchar *config; // ROOF configuration file name
+ UfoRoofConfig *cfg; // Parsed ROOF parameters
+ UfoRoofBuffer *buf; // Ring buffer for incomming UDP packet
+ UfoRoofReadInterface; *rdi; // Reader interfaces, one per socket (no threading)
+ UfoRoofRead *rd; // Threading interface
+
+ gchar *path; // UFO file path for simulation mode
+ guint first_file_number; // Number of a first simulated file (0 or 1)
+
+ BuildType build; // What dataset do we build: ROOF sinogram or raw network data
+ guint number; // Number of datasets to read
+ gboolean stop; // Stop flag
+ gboolean simulate; // Indicates if we are running in network or simulation modes
+
+ guint64 announced; // For debugging
+ guint64 generated; // Total number for control
+
+ struct timespec last_fragment_timestamp;
+};
+
+static void ufo_task_interface_init (UfoTaskIface *iface);
+
+G_DEFINE_TYPE_WITH_CODE (UfoRoofBuildTask, ufo_roof_build_task, UFO_TYPE_TASK_NODE,
+ G_IMPLEMENT_INTERFACE (UFO_TYPE_TASK,
+ ufo_task_interface_init))
+
+#define UFO_ROOF_BUILD_TASK_GET_PRIVATE(obj) (G_TYPE_INSTANCE_GET_PRIVATE((obj), UFO_TYPE_ROOF_BUILD_TASK, UfoRoofBuildTaskPrivate))
+
+
+
+static GEnumValue build_values[] = {
+ { BUILD_AUTO, "BUILD_AUTO", "auto" },
+ { BUILD_RAW, "BUILD_RAW", "raw" },
+ { BUILD_SINO, "BUILD_SINO", "sino" },
+ { BUILD_UFO, "BUILD_UFO", "ufo" },
+ { 0, NULL, NULL}
+};
+
+enum {
+ PROP_0,
+ PROP_STOP,
+ PROP_SIMULATE,
+ PROP_PATH,
+ PROP_FIRST,
+ PROP_NUMBER,
+ PROP_BUILD,
+ PROP_CONFIG,
+ N_PROPERTIES
+};
+
+static GParamSpec *properties[N_PROPERTIES] = { NULL, };
+
+UfoNode *
+ufo_roof_build_task_new (void)
+{
+ return UFO_NODE (g_object_new (UFO_TYPE_ROOF_BUILD_TASK, NULL));
+}
+
+static void
+ufo_roof_build_task_setup (UfoTask *task,
+ UfoResources *resources,
+ GError **error)
+{
+ guint i;
+ GError *gerr = NULL;
+
+ UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task);
+
+ if (!priv->config)
+ roof_setup_error(error, "ROOF configuration is not specified");
+
+ priv->cfg = ufo_roof_config_new(priv->config, priv->simulate?UFO_ROOF_CONFIG_SIMULATION:UFO_ROOF_CONFIG_DEFAULT, &gerr);
+ if (!priv->cfg)
+ roof_propagate_error(error, gerr, "roof-build-setup: ");
+
+ for (i = 0; i < priv->cfg->n_streams; i++) {
+ if (priv->simulate) {
+ if (!priv->path)
+ roof_setup_error(error, "Path to simulated data should be specified");
+
+ priv->rdi[i] = ufo_roof_read_file_new(priv->cfg, priv->path, priv->id * priv->cfg->sockets_per_thread + i + priv->first_file_number, &gerr);
+ } else
+ priv->rdi[i] = ufo_roof_read_socket_new(priv->cfg, priv->id * priv->cfg->sockets_per_thread + i, &gerr);
+
+ if (!priv->rdi[i])
+ roof_propagate_error(error, gerr, "roof_read_interface_new: ");
+ }
+
+ if (priv->build == BUILD_AUTO) {
+ if (priv->cfg->roof_mode) priv->build = BUILD_SINO;
+ else priv->build = BUILD_RAW;
+ g_object_notify_by_pspec (G_OBJECT(task), properties[PROP_BUILD]);
+ }
+
+ priv->buf = ufo_roof_buffer_new(priv->cfg, (priv->build == BUILD_RAW)?1:2, priv->number, &gerr);
+ if ((gerr)||(!priv->buf))
+ roof_propagate_error(error, gerr, "roof_buffer_new: ");
+
+ priv->rd = ufo_roof_read_new(priv->cfg, priv->rdi, priv->buf, &gerr);
+ if (gerr)
+ roof_propagate_error(error, gerr, "roof_read_new: ");
+
+ clock_gettime(CLOCK_REALTIME, &priv->last_fragment_timestamp);
+}
+
+static void
+ufo_roof_build_task_finalize (GObject *object)
+{
+ UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (object);
+
+ if (priv->rd) {
+ ufo_roof_read_free(priv->rd);
+ priv->rd = NULL;
+ }
+
+ if (priv->buf) {
+ ufo_roof_buffer_free(priv->buf);
+ priv->buf = NULL;
+ }
+
+ if (priv->cfg) {
+ ufo_roof_config_free(priv->cfg);
+ priv->cfg = NULL;
+ }
+
+ if (priv->config) {
+ g_free(priv->config);
+ priv->config = NULL;
+ }
+
+ if (priv->path) {
+ g_free(priv->path);
+ priv->path = NULL;
+ }
+
+ G_OBJECT_CLASS (ufo_roof_build_task_parent_class)->finalize (object);
+}
+
+
+
+static void
+ufo_roof_build_task_get_requisition (UfoTask *task,
+ UfoBuffer **inputs,
+ UfoRequisition *requisition,
+ GError **error)
+{
+ UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task);
+
+ // FIXME: Can we handle data types more elegant?
+ if (priv->build == BUILD_RAW) {
+ guint bytes = priv->cfg->dataset_size;
+ requisition->n_dims = 1;
+ requisition->dims[0] = bytes / sizeof(float) + ((bytes%sizeof(float))?1:0);
+ } else if (priv->build == BUILD_SINO) {
+ guint bytes = priv->cfg->fan_bins * priv->cfg->bit_depth / 8;
+ requisition->n_dims = 2;
+ requisition->dims[0] = bytes / sizeof(float) + ((bytes%sizeof(float))?1:0);
+ requisition->dims[1] = priv->cfg->fan_projections;
+ } else if (priv->build == BUILD_UFO) {
+ requisition->n_dims = 2;
+ requisition->dims[0] = priv->cfg->fan_bins;
+ requisition->dims[1] = priv->cfg->fan_projections;
+ }
+}
+
+static guint
+ufo_roof_build_task_get_num_inputs (UfoTask *task)
+{
+ return 1;
+}
+
+static guint
+ufo_roof_build_task_get_num_dimensions (UfoTask *task,
+ guint input)
+{
+ return 1;
+}
+
+static UfoTaskMode
+ufo_roof_build_task_get_mode (UfoTask *task)
+{
+ return UFO_TASK_MODE_CPU | UFO_TASK_MODE_GENERATOR;
+}
+
+
+static gboolean
+ufo_roof_build_task_generate (UfoTask *task,
+ UfoBuffer *output,
+ UfoRequisition *requisition)
+{
+ gboolean ready = FALSE;
+ gulong seqid;
+ GError *gerr = NULL;
+ GValue ival = G_VALUE_INIT;
+ GValue lval = G_VALUE_INIT;
+
+ UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task);
+ UfoRoofConfig *cfg = priv->cfg;
+ UfoRoofBuffer *buf = priv->buf;
+
+ void *output_buffer = ufo_buffer_get_host_array(output, NULL);
+
+ if (priv->stop)
+ return FALSE;
+
+ // FIXME: Wait or break. Not both.
+ do {
+ ready = ufo_roof_buffer_wait_dataset(buf, output_buffer, &seqid, cfg->network_timeout, &gerr);
+ if (gerr) roof_print_error(gerr);
+
+ if (!ready) {
+ ready = ufo_roof_buffer_skip_to_ready(buf);
+ if (!ready) {
+ priv->stop = TRUE;
+ g_object_notify_by_pspec (G_OBJECT(task), properties[PROP_STOP]);
+ return FALSE;
+ }
+ }
+ } while (!ready);
+
+ // FIXME: integrate fastwriter somewhere here?
+
+ if (priv->build == BUILD_UFO) {
+ switch (cfg->bit_depth) {
+ case 8:
+ ufo_buffer_convert(output, UFO_BUFFER_DEPTH_8U);
+ break;
+ case 16:
+ ufo_buffer_convert(output, UFO_BUFFER_DEPTH_16U);
+ break;
+ case 32:
+ ufo_buffer_convert(output, UFO_BUFFER_DEPTH_32U);
+ break;
+ default:
+ printf("Usupported bit-depth %u\n", cfg->bit_depth);
+ }
+ }
+
+ // Metadata: plane and sequential number within the plane
+ g_value_init (&ival, G_TYPE_UINT);
+ g_value_init (&lval, G_TYPE_ULONG);
+ if (priv->build != BUILD_UFO) {
+ g_value_set_uint (&ival, cfg->bit_depth);
+ ufo_buffer_set_metadata (output, "bpp", &ival);
+ }
+ g_value_set_uint (&ival, 1 + seqid % cfg->n_planes);
+ ufo_buffer_set_metadata (output, "plane", &ival);
+ g_value_set_ulong (&lval, seqid / cfg->n_planes);
+ ufo_buffer_set_metadata (output, "plane_id", &lval);
+ g_value_set_ulong (&lval, seqid);
+ ufo_buffer_set_metadata (output, "seqid", &lval);
+ g_value_unset(&lval);
+ g_value_unset(&ival);
+
+ // FIXME: Or shall we start from counting from the ID of the first registerd dataset
+ if ((priv->number)&&(buf->current_id >= priv->number)) {
+// printf("%lu datasets processed, stopping\n", buf->current_id);
+ priv->stop = TRUE;
+ g_object_notify_by_pspec (G_OBJECT(task), properties[PROP_STOP]);
+ }
+
+ if (ready) priv->generated++;
+
+ if (((priv->number > 0)&&(priv->number <= 100))||((buf->current_id - priv->announced) > 1000)) {
+ if (ready)
+ printf("Processing dataset %li (ready ), next : %u out of %u\n", buf->current_id, buf->n_fragments[buf->current_id%buf->ring_size], buf->fragments_per_dataset);
+ else
+ printf("Skipping dataset %li (timeout), acquired: %u out of %u\n", buf->current_id + 1, buf->n_fragments[buf->current_id%buf->ring_size], buf->fragments_per_dataset);
+ priv->announced = buf->current_id;
+ }
+
+ return ready;
+}
+
+static void
+ufo_roof_build_task_set_property (GObject *object,
+ guint property_id,
+ const GValue *value,
+ GParamSpec *pspec)
+{
+ UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (object);
+
+ switch (property_id) {
+ case PROP_CONFIG:
+ if (priv->config) g_free(priv->config);
+ priv->config = g_value_dup_string(value);
+ break;
+ case PROP_STOP:
+ priv->stop = g_value_get_boolean (value);
+ break;
+ case PROP_SIMULATE:
+ priv->simulate = g_value_get_boolean (value);
+ break;
+ case PROP_PATH:
+ if (priv->path) g_free(priv->path);
+ priv->path = g_value_dup_string(value);
+ break;
+ case PROP_FIRST:
+ priv->first_file_number = g_value_get_uint (value);
+ break;
+ case PROP_NUMBER:
+ priv->number = g_value_get_uint (value);
+ break;
+ case PROP_BUILD:
+ priv->build = g_value_get_enum (value);
+ if ((priv->build == BUILD_AUTO)&&(priv->cfg)) {
+ if (priv->cfg->roof_mode) priv->build = BUILD_SINO;
+ else priv->build = BUILD_RAW;
+ }
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
+ break;
+ }
+}
+
+static void
+ufo_roof_build_task_get_property (GObject *object,
+ guint property_id,
+ GValue *value,
+ GParamSpec *pspec)
+{
+ UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (object);
+
+ switch (property_id) {
+ case PROP_CONFIG:
+ g_value_set_string(value, priv->config);
+ break;
+ case PROP_STOP:
+ g_value_set_boolean (value, priv->stop);
+ break;
+ case PROP_SIMULATE:
+ g_value_set_boolean (value, priv->simulate);
+ break;
+ case PROP_PATH:
+ g_value_set_string(value, priv->path?priv->path:"");
+ break;
+ case PROP_FIRST:
+ g_value_set_uint (value, priv->first_file_number);
+ break;
+ case PROP_NUMBER:
+ g_value_set_uint (value, priv->number);
+ break;
+ case PROP_BUILD:
+ g_value_set_enum (value, priv->build);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
+ break;
+ }
+}
+
+static void
+ufo_task_interface_init (UfoTaskIface *iface)
+{
+ iface->setup = ufo_roof_build_task_setup;
+ iface->get_num_inputs = ufo_roof_build_task_get_num_inputs;
+ iface->get_num_dimensions = ufo_roof_build_task_get_num_dimensions;
+ iface->get_mode = ufo_roof_build_task_get_mode;
+ iface->get_requisition = ufo_roof_build_task_get_requisition;
+ iface->generate = ufo_roof_build_task_generate;
+}
+
+static void
+ufo_roof_build_task_class_init (UfoRoofBuildTaskClass *klass)
+{
+ GObjectClass *oclass = G_OBJECT_CLASS (klass);
+
+ oclass->set_property = ufo_roof_build_task_set_property;
+ oclass->get_property = ufo_roof_build_task_get_property;
+ oclass->finalize = ufo_roof_build_task_finalize;
+
+ properties[PROP_CONFIG] =
+ g_param_spec_string ("config",
+ "ROOF configuration",
+ "Path to ROOF configuration file",
+ "",
+ G_PARAM_READWRITE);
+
+ properties[PROP_STOP] =
+ g_param_spec_boolean ("stop",
+ "Stop flag",
+ "Stop socket servers and terminates filter execution",
+ FALSE,
+ G_PARAM_READWRITE);
+
+ properties[PROP_SIMULATE] =
+ g_param_spec_boolean ("simulate",
+ "Simulation mode",
+ "Read data from the specified files instead of network",
+ FALSE,
+ G_PARAM_READWRITE);
+
+ properties[PROP_PATH] =
+ g_param_spec_string ("path",
+ "Input files for simulation mode",
+ "Optional path to input files for simulation mode (parameter from configuration file is used if not specified)",
+ "",
+ G_PARAM_READWRITE);
+
+ properties[PROP_FIRST] =
+ g_param_spec_uint ("first_file_number",
+ "Offset to the first read file",
+ "Offset to the first read file",
+ 0, G_MAXUINT, 0,
+ G_PARAM_READWRITE);
+
+ properties[PROP_NUMBER] =
+ g_param_spec_uint("number",
+ "Number of datasets to receive",
+ "Number of datasets to receive",
+ 0, G_MAXUINT, 0,
+ G_PARAM_READWRITE);
+
+ properties[PROP_BUILD] =
+ g_param_spec_enum ("build",
+ "Build type (\"raw\", \"sino\", \"ufo\")",
+ "Build type (\"raw\" - raw data, \"sino\" - arrange in sinogram, \"ufo\" - arrange in sinogram and convert UFO floating-point format)",
+ g_enum_register_static ("build", build_values),
+ 0, G_PARAM_READWRITE);
+
+
+ for (guint i = PROP_0 + 1; i < N_PROPERTIES; i++)
+ g_object_class_install_property (oclass, i, properties[i]);
+
+ g_type_class_add_private (oclass, sizeof(UfoRoofBuildTaskPrivate));
+}
+
+static void
+ufo_roof_build_task_init(UfoRoofBuildTask *self)
+{
+ self->priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE(self);
+}
diff --git a/src/save/ufo-roof-read-thread.c b/src/save/ufo-roof-read-thread.c
new file mode 100644
index 0000000..60868d2
--- /dev/null
+++ b/src/save/ufo-roof-read-thread.c
@@ -0,0 +1,155 @@
+#include <stdio.h>
+#include <threads.h>
+
+#include <ufo/ufo.h>
+
+#include "ufo-roof-buffer.h"
+#include "ufo-roof-read-thread.h"
+#include "ufo-roof-read.h"
+
+
+UfoRoofReadThread *guint ufo_roof_read_thread_new(UfoRoofRead *rd, guint from, guint to, GError **error) {
+ int i;
+ GError *gerr = NULL;
+
+ UfoRoofReadThread *thr = (UfoRoofReadThread*)calloc(1, sizeof(UfoRoofReadThread));
+ if (!ctx) roof_new_error(error, "Can't allocate UfoRoofReadThread context");
+
+ thr->rdbuf = malloc(cfg->max_packets * cfg->max_packet_size);
+ if (!thr->rdbuf) {
+ ufo_roof_read_thread_free(thr);
+ roof_new_error(error, "Can't allocate memory for temporary packet receiver buffer");
+ }
+
+ thr->rd = rd;
+ thr->from = from;
+ thr->to = to;
+
+
+ return thr;
+
+}
+
+void ufo_roof_read_thread_free(UFORoofReadThread *thr, GError **error) {
+ if (!thr) return;
+ if (thr->rdbuf) free(thr->rdbuf);
+
+ ufo_roof_thread_stop(thr, error);
+ free(thr);
+}
+
+static int ufo_roof_read_thread_run(void *arg) {
+ GError *gerr = NULL;
+
+ UfoRoofReadThread *thr = (UfoRoofReadThread*)arg;
+
+ UfoRoofConfig *cfg = thr->rd->cfg;
+ UfoRoofBuffer *buf = thr->rd->buf;
+ UfoRoofReadInterface *rdi = thr->rd->rdi;
+
+ guint from = thr->from;
+ guint to = thr->to;
+
+ void *rdbuf = thr->rdbuf;
+
+ uint64_t current_id[to - from] = {0};
+ uint64_t grabbed[to - from] = {0};
+
+ static uint64_t errors = 0;
+
+ while (thr->op != UFO_ROOF_OP_STOP) {
+ for (guint sid = from; sid < to; sid++) {
+ // FIXME break waiting on stop? If no packets are send
+ guint packets = rdi[sid]->read(priv->reader[sid], rdbuf, &gerr);
+ if (gerr) roof_print_error(gerr);
+
+ guint ready = false;
+ const uint8_t *fragment = (uint8_t*)rdbuf;
+ for (guint i = 0; i < packets; i++) {
+ guint64 packet_id = 0;
+
+ // Otherwise considered consecutive and handled by the buffer
+ if (cfg->header_size >= sizeof(UfoRoofPacketHeader)) {
+ UfoRoofPacketHeader *pheader = UFO_ROOF_PACKET_HEADER(fragment);
+ packet_id = be64toh(pheader->packet_id) + 1;
+ }
+
+#ifdef UFO_ROOF_DEBUG
+ if ((current_id[sid - from])&&(current_id[sid - from] + 1 != packet_id)) {
+ printf("Channel %i(%i): =======> Missing %lu packets, expecting %lu but got %lu (N %i from total packets in pack %u)\n", priv->id * cfg->sockets_per_thread + sid, sid, packet_id - current_id[sid] - 1, current_id[sid] + 1, packet_id, i, packets);
+ //if (++errors > 1000) exit(1);
+ }
+
+ current_id[sid - from] = packet_id;
+ grabbed[sid - from]++;
+ if ((grabbed[sid - from]%1000000)==0)
+ printf("Channel %i: Grabbed %lu Mpackets\n", sid, grabbed[sid - from]/1000000);
+#endif
+
+ ready |= ufo_roof_buffer_set_fragment(buf, sid, packet_id, fragment, &gerr);
+ if (gerr) roof_print_error(gerr);
+
+ fragment += cfg->max_packet_size;
+ } // fragment-loop
+
+ // send notification? Broadcast blocks, we don't want it.
+ if (ready) {
+ }
+
+ } // socket-loop
+ } // operation-loop
+
+
+#ifdef UFO_ROOF_DEBUG
+ // Store first received packet on each channel...
+ static int debug = 1;
+ if (debug) {
+ char fname[256];
+ sprintf(fname, "channel%i_packet0.raw", priv->id);
+ FILE *f = fopen(fname, "w");
+ if (f) {
+ fwrite(output_buffer, 1, cfg->max_packets * cfg->max_packet_size, f);
+ fclose(f);
+ }
+ debug = 0;
+ }
+#endif /* UFO_ROOF_DEBUG */
+
+ // FIXME: End of data (shall we restart in the network case?)
+// if (!packets)
+// return FALSE;
+
+ // Shall I use UFO metadata (ufo_buffer_set_metadata) insead?
+ header->channel_id = priv->id;
+// header->n_packets = packets;
+
+ return TRUE;
+}
+
+
+}
+
+gboolean ufo_roof_read_thread_start(UFORoofReadThread *thr, GError **error) {
+ int err;
+ if (!thr) return FALSE;
+
+ err = thrd_create(&thr->thread, ufo_roof_read_thread_run, thr);
+ if (err != thrd_success) roof_setup_error_with_retval(error, FALSE, "Error (%i) spawning new read thread", err);
+
+ ctx->launched = TRUE;
+ return TRUE;
+}
+
+gboolean ufo_roof_read_thread_stop(UFORoofReadThread *thr, GError **error) {
+ int err, ret;
+ if (!thr) return FALSE;
+ if (!thr->launched) return TRUE;
+
+ // Signal thread termination
+
+ err = thrd_join(&thr->thread, &ret);
+ if (err != thrd_success) roof_setup_error_with_retval(error, FALSE, "Error (%i) waiting for read thread termination", err);
+
+ return TRUE;
+}
+
diff --git a/src/save/ufo-roof-read-thread.h b/src/save/ufo-roof-read-thread.h
new file mode 100644
index 0000000..ebe8989
--- /dev/null
+++ b/src/save/ufo-roof-read-thread.h
@@ -0,0 +1,23 @@
+#ifndef __UFO_ROOF_READ_THREAD_H
+#define __UFO_ROOF_READ_THREAD_H
+
+typedef struct _UfoRoofReadThread UfoRoofReadThread;
+
+#include "ufo-roof-read.h"
+
+struct _UfoRoofReadThread {
+ UfoRoofRead *rd; // ROOF Reader Cotext
+ guint from, to; // Determines ports/files which are read by this thread (from is inclusive and to - exclusive)
+
+ gboolean launched; // Flag indicating if thread is launched
+ thrd_t thread; // Thread ID
+};
+
+/*
+UfoRoofReadThread *guint ufo_roof_read_thread_new(UfoRoofRead *rd, guint from, guint to, GError **error);
+void ufo_roof_read_thread_free(UFORoofReadThread *thr, GError **error);
+gboolean ufo_roof_read_thread_start(UFORoofReadThread *thr, GError **error);
+gboolean ufo_roof_read_thread_stop(UFORoofReadThread *thr, GError **error);
+*/
+
+#endif /* __UFO_ROOF_READ_THREAD_H */ \ No newline at end of file
diff --git a/src/save/ufo-roof-read.c b/src/save/ufo-roof-read.c
new file mode 100644
index 0000000..f3d790d
--- /dev/null
+++ b/src/save/ufo-roof-read.c
@@ -0,0 +1,123 @@
+#include <stdio.h>
+#include <assert.h>
+
+#include <ufo/ufo.h>
+
+#include "ufo-roof-buffer.h"
+#include "ufo-roof-read-thread.h"
+#include "ufo-roof-read.h"
+
+
+
+#include <errno.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+
+static void ufo_roof_read_optimize(UfoRoofConfig *cfg) {
+ // FIXME: request real-time permissions?
+ // FIXME: do we need this?
+/*
+ uint32_t lat = 0;
+ int fd = open("/dev/cpu_dma_latency", O_RDWR);
+ if (fd == -1) {
+ fprintf(stderr, "Failed to open cpu_dma_latency: error %s", strerror(errno));
+ } else {
+ write(fd, &lat, sizeof(lat));
+ close(fd);
+ }
+*/
+}
+
+
+const UfoRoofReadInterfaceSettings *ufo_roof_read_get_settings(UFORoofRead *ctx, GError **error) {
+ assert(ctx);
+ return ctx->settings;
+}
+
+
+UfoRoofRead *ufo_roof_read_new(UfoRoofConfig *cfg, UfoRoofReadInterface *rdi, UfoRoofBuffer *buf, GError **error) {
+ guint i;
+ GError *gerr = NULL;
+
+ UfoRoofRead *ctx = (UfoRoofRead*)calloc(1, sizeof(UfoRoofRead));
+ if (!ctx) roof_new_error(error, "Can't allocate UfoRoofRead context");
+
+ ufo_roof_read_optimize(ctx);
+
+ ctx->n_threads = cfg->n_read_threads;
+ ctx->cfg = cfg;
+ ctx->buf = buf;
+ ctx->rdi = rdi;
+
+ ctx->thr = (UfoRoofReadThread*)calloc(cfg->n_read_threads, sizeof(UfoRoofReadThread));
+ if (!ctx->thr) {
+ ufo_roof_read_free(ctx);
+ roof_new_error(error, "Error allocating UfoRoofReadThread contexts");
+ }
+
+ // We try to distribute sockets uniformly respecting sockets_per_thread as maximum limit
+ guint n_threads = priv->cfg->n_streams / priv->cfg->sockets_per_thread;
+ if (priv->cfg->n_streams % priv->cfg->sockets_per_thread) n_threads++;
+ ctx->n_threads = n_threads;
+
+ guint extra = 0, sockets_per_thread = priv->cfg->n_streams / n_threads;
+ if (priv->cfg->n_streams % n_threads) extra = priv->cfg->n_streams - n_threads * sockets_per_thread;
+
+ guint from, to;
+ for (i = 0, from = 0; i < n_threads; i++, from = to) {
+ guint to = from + sockets_per_thread;
+ if (i < extra) to++;
+
+ ctx->thr[i]= ufo_roof_thread_new(ctx, from, to, &gerr);
+ if (!ctx->thr[i]) roof_propagate_error(error, gerr, "ufo_roof_thread_new (%i): ", i);
+ }
+
+ return ctx;
+}
+
+void ufo_roof_read_free(UfoRoofRead *ctx) {
+ if (!ctx) return;
+
+ if (ctx->thr) {
+ int i;
+ ufo_roof_read_stop(ctx);
+ for (i = 0; i < ctx->n_threads; i++) {
+ if (ctx->thr[i])
+ ufo_roof_read_thread_free(ctx->thr[i]);
+ }
+ free(ctx->thr);
+ }
+ free(ctx);
+}
+
+gboolean ufo_roof_read_start(UFORoofRead *ctx, GError **error) {
+ gboolean ret;
+ GError *gerr;
+
+ if ((!ctx)||(!ctx->thr)) return FALSE;
+
+ for (int i = 0; i < ctx->n_threads; i++) {
+ if (!ctx->thr[i]) return FALSE;
+ ret = ufo_roof_read_thread_start(ctx, &gerr);
+ if (!ret) roof_propagate_error_with_retval(error, FALSE, gerr, "ufo_roof_read_thread_start (%i): ", i);
+ }
+ return TRUE;
+}
+
+gboolean ufo_roof_read_stop(UFORoofRead *ctx, GError **error) {
+ gboolean ret, res = FALSE;
+ GError *gerr;
+
+ if ((!ctx)||(!ctx->thr)) return FALSE;
+
+ for (int i = 0; i < ctx->n_threads; i++) {
+ if (!ctx->thr[i]) return FALSE;
+ ret = ufo_roof_read_thread_stop(ctx, &gerr);
+ if (!ret) g_propagate_perfixed_error(error, gerr, "ufo_roof_read_thread_stop (%i): ", i);
+ res |= !ret;
+ }
+ return !res;
+}
+
+
diff --git a/src/save/ufo-roof-read.h b/src/save/ufo-roof-read.h
new file mode 100644
index 0000000..16e910b
--- /dev/null
+++ b/src/save/ufo-roof-read.h
@@ -0,0 +1,61 @@
+#ifndef __UFO_ROOF_READ_H
+#define __UFO_ROOF_READ_H
+
+typedef struct _UfoRoofRead UfoRoofRead;
+
+#include "ufo-roof-config.h"
+#include "ufo-roof-buffer.h"
+//#include "ufo-roof-read-thread.h"
+
+G_BEGIN_DECLS
+
+typedef struct _UfoRoofReadInterface UfoRoofReadInterface;
+typedef struct _UfoRoofReadInterfaceSettings UfoRoofReadInterfaceSettings;
+
+//typedef guint (*UfoRoofReaderRead)(UfoRoofReadInterface *reader, uint8_t *buf, GError **error);
+
+typedef guint (*UfoRoofReaderRead)(UfoRoofReadInterface *reader, uint8_t **buf, GError **error);
+typedef void (*UfoRoofReaderClose)(UfoRoofReadInterface *reader);
+
+struct _UfoRoofReadInterfaceSettings {
+ guint padding; // Packet size + padding
+};
+
+struct _UfoRoofReadInterface {
+ UfoRoofReaderRead read;
+ UfoRoofReaderClose close;
+
+ UfoRoofReadInterfaceSettings settings;
+};
+
+typedef enum {
+ UFO_ROOF_OP_STOP = 0,
+ UFO_ROOF_OP_READ
+} UfoRoofOp;
+
+struct _UfoRoofReadContext {
+ UfoRoofConfig *cfg;
+ UfoRoofBuffer *buf;
+ UfoRoofReadInterface *rdi;
+
+ gulong packet
+// UfoRoofReadThread *thr;
+
+ guint n_threads;
+ UfoRoofOp op; // Current operation (reading by default)
+};
+
+
+
+/*
+UfoRoofRead *ufo_roof_read_new(UfoRoofConfig *cfg, UfoRoofReadInterface *rdi, UfoRoofBuffer *buf, GError **error);
+void ufo_roof_read_free(UfoRoofRead *ctx);
+gboolean ufo_roof_read_start(UFORoofRead *ctx, GError **error);
+gboolean ufo_roof_read_stop(UFORoofRead *ctx, GError **error);
+
+const UfoRoofReadInterfaceSettings *ufo_roof_read_get_settings(UFORoofRead *ctx, GError **error);
+*/
+
+G_END_DECLS
+
+#endif /* __UFO_ROOF_READ_H */
diff --git a/src/ufo-roof-build-task.c b/src/ufo-roof-build-task.c
index a39ed11..ee1cec5 100644
--- a/src/ufo-roof-build-task.c
+++ b/src/ufo-roof-build-task.c
@@ -19,6 +19,7 @@
#include <stdio.h>
#include <endian.h>
+#include <threads.h>
#ifdef __APPLE__
#include <OpenCL/cl.h>
@@ -26,10 +27,16 @@
#include <CL/cl.h>
#endif
+#include "hw_sched.h"
+
#include "ufo-roof.h"
+#include "ufo-roof-read.h"
#include "ufo-roof-buffer.h"
+#include "ufo-roof-read-socket.h"
+#include "ufo-roof-read-file.h"
#include "ufo-roof-build-task.h"
+
typedef enum {
BUILD_AUTO = 0,
BUILD_RAW,
@@ -37,10 +44,16 @@ typedef enum {
BUILD_UFO
} BuildType;
-struct _UfoRoofBuildTaskPrivate {
+struct _RoofBuildTaskPrivate {
gchar *config; // ROOF configuration file name
- UfoRoofConfig *cfg; // Parsed ROOF parameters
- UfoRoofBuffer *buf; // Ring buffer for incomming UDP packet
+ RoofConfig *cfg; // Parsed ROOF parameters
+// RoofBuffer *buf; // Ring buffer for incomming UDP packet
+ RoofReadInterface **rdi; // Reader interfaces, one per socket (no threading)
+// RoofRead *rd; // Threading interface
+ HWSched sched;
+
+ gchar *path; // UFO file path for simulation mode
+ guint first_file_number; // Number of a first simulated file (0 or 1)
BuildType build; // What dataset do we build: ROOF sinogram or raw network data
guint number; // Number of datasets to read
@@ -48,17 +61,21 @@ struct _UfoRoofBuildTaskPrivate {
gboolean simulate; // Indicates if we are running in network or simulation modes
guint64 announced; // For debugging
+ guint64 generated; // Total number for control
+
+ guint n_threads; // Number of schedulled threads
+
struct timespec last_fragment_timestamp;
};
static void ufo_task_interface_init (UfoTaskIface *iface);
-G_DEFINE_TYPE_WITH_CODE (UfoRoofBuildTask, ufo_roof_build_task, UFO_TYPE_TASK_NODE,
+G_DEFINE_TYPE_WITH_CODE (RoofBuildTask, ufo_roof_build_task, UFO_TYPE_TASK_NODE,
G_IMPLEMENT_INTERFACE (UFO_TYPE_TASK,
ufo_task_interface_init))
-#define UFO_ROOF_BUILD_TASK_GET_PRIVATE(obj) (G_TYPE_INSTANCE_GET_PRIVATE((obj), UFO_TYPE_ROOF_BUILD_TASK, UfoRoofBuildTaskPrivate))
+#define UFO_ROOF_BUILD_TASK_GET_PRIVATE(obj) (G_TYPE_INSTANCE_GET_PRIVATE((obj), UFO_TYPE_ROOF_BUILD_TASK, RoofBuildTaskPrivate))
@@ -74,6 +91,8 @@ enum {
PROP_0,
PROP_STOP,
PROP_SIMULATE,
+ PROP_PATH,
+ PROP_FIRST,
PROP_NUMBER,
PROP_BUILD,
PROP_CONFIG,
@@ -93,9 +112,10 @@ ufo_roof_build_task_setup (UfoTask *task,
UfoResources *resources,
GError **error)
{
+ guint i;
GError *gerr = NULL;
- UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task);
+ RoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task);
if (!priv->config)
roof_setup_error(error, "ROOF configuration is not specified");
@@ -104,15 +124,65 @@ ufo_roof_build_task_setup (UfoTask *task,
if (!priv->cfg)
roof_propagate_error(error, gerr, "roof-build-setup: ");
+ priv->rdi = (RoofReadInterface**)calloc(priv->cfg->n_streams, sizeof(RoofReadInterface*));
+ if (!priv->rdi)
+ roof_setup_error(error, "Failed to allocate memory for RoofReadInterface array");
+
+
+ for (i = 0; i < priv->cfg->n_streams; i++) {
+ if (priv->simulate) {
+ if (!priv->path)
+ roof_setup_error(error, "Path to simulated data should be specified");
+
+ priv->rdi[i] = ufo_roof_read_file_new(priv->cfg, priv->path, priv->first_file_number + i, &gerr);
+ } else
+ priv->rdi[i] = ufo_roof_read_socket_new(priv->cfg, i, &gerr);
+
+ if (!priv->rdi[i])
+ roof_propagate_error(error, gerr, "roof_read_interface_new: ");
+
+ priv->rdc[i] = ufo_roof_read_context_new(priv->cfg, priv->rdi[i], &gerr);
+ if (!priv->rdc[i])
+ roof_propagate_error(error, gerr, "roof_read_context_new: ");
+ }
+
+
+ // We try to distribute sockets uniformly respecting sockets_per_thread as maximum limit
+ priv->n_threads = priv->cfg->n_streams / priv->cfg->sockets_per_thread;
+ if (priv->cfg->n_streams % priv->cfg->sockets_per_thread) priv->n_threads++;
+
+ guint extra = 0, sockets_per_thread = priv->cfg->n_streams / priv->n_threads;
+ if (priv->cfg->n_streams % priv->n_threads) extra = priv->cfg->n_streams - priv->n_threads * sockets_per_thread;
+
+ guint from, to;
+ for (i = 0; i < priv->n_threads; i++) {
+ guint to = from + sockets_per_thread;
+ if (i < extra) to++;
+
+ ctx->thr[i]= ufo_roof_thread_new(priv->cfg, priv->rdc, from, to, &gerr);
+ if (!ctx->thr[i]) roof_propagate_error(error, gerr, "ufo_roof_thread_new (%i): ", i);
+ }
+
if (priv->build == BUILD_AUTO) {
if (priv->cfg->roof_mode) priv->build = BUILD_SINO;
else priv->build = BUILD_RAW;
g_object_notify_by_pspec (G_OBJECT(task), properties[PROP_BUILD]);
}
+/*
priv->buf = ufo_roof_buffer_new(priv->cfg, (priv->build == BUILD_RAW)?1:2, priv->number, &gerr);
- if (!priv->buf)
- roof_propagate_error(error, gerr, "roof-build-setup: ");
+ if ((gerr)||(!priv->buf))
+ roof_propagate_error(error, gerr, "roof_buffer_new: ");
+
+ priv->rd = ufo_roof_read_new(priv->cfg, priv->rdi, priv->buf, &gerr);
+ if (gerr)
+ roof_propagate_error(error, gerr, "roof_read_new: ");
+*/
+
+ priv->sched = hw_sched_create(priv->cfg->n_read_threads);
+ if (!priv->sched)
+ roof_setup_error(error, "Failed to schedule builder threads");
+
clock_gettime(CLOCK_REALTIME, &priv->last_fragment_timestamp);
}
@@ -120,12 +190,34 @@ ufo_roof_build_task_setup (UfoTask *task,
static void
ufo_roof_build_task_finalize (GObject *object)
{
- UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (object);
-
+ RoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (object);
+
+ if (priv->sched) {
+ hw_sched_destroy(priv->sched);
+ priv->sched = NULL;
+ }
+
+/*
+ if (priv->rd) {
+ ufo_roof_read_free(priv->rd);
+ priv->rd = NULL;
+ }
+
if (priv->buf) {
ufo_roof_buffer_free(priv->buf);
priv->buf = NULL;
}
+*/
+
+ if (priv->rdi) {
+ guint i;
+ for (i = 0; i < priv->cfg->n_streams; i++) {
+ if (priv->rdi[i])
+ priv->rdi[i]->close(priv->rdi[i]);
+ }
+ free(priv->rdi);
+ priv->rdi = NULL;
+ }
if (priv->cfg) {
ufo_roof_config_free(priv->cfg);
@@ -137,6 +229,10 @@ ufo_roof_build_task_finalize (GObject *object)
priv->config = NULL;
}
+ if (priv->path) {
+ g_free(priv->path);
+ priv->path = NULL;
+ }
G_OBJECT_CLASS (ufo_roof_build_task_parent_class)->finalize (object);
}
@@ -149,9 +245,10 @@ ufo_roof_build_task_get_requisition (UfoTask *task,
UfoRequisition *requisition,
GError **error)
{
- UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task);
+ RoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task);
// FIXME: Can we handle data types more elegant?
+ // FIXME: Kill BUILD_RAW ?
if (priv->build == BUILD_RAW) {
guint bytes = priv->cfg->dataset_size;
requisition->n_dims = 1;
@@ -184,100 +281,56 @@ ufo_roof_build_task_get_num_dimensions (UfoTask *task,
static UfoTaskMode
ufo_roof_build_task_get_mode (UfoTask *task)
{
- return UFO_TASK_MODE_CPU | UFO_TASK_MODE_REDUCTOR;
+ return UFO_TASK_MODE_CPU | UFO_TASK_MODE_GENERATOR;
}
+
static gboolean
-ufo_roof_build_task_process (UfoTask *task,
- UfoBuffer **inputs,
+ufo_roof_build_task_generate (UfoTask *task,
UfoBuffer *output,
UfoRequisition *requisition)
{
- GError *gerr = NULL;
-
- UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task);
- UfoRoofConfig *cfg = priv->cfg;
- UfoRoofBuffer *buf = priv->buf;
gboolean ready = FALSE;
+ gulong seqid;
+ GError *gerr = NULL;
+ GValue ival = G_VALUE_INIT;
+ GValue lval = G_VALUE_INIT;
-// UfoRequisition in_req;
-// ufo_buffer_get_requisition (inputs[0], &in_req);
+ RoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task);
+ RoofConfig *cfg = priv->cfg;
+ RoofBuffer *buf = priv->buf;
- const uint8_t *data = (uint8_t*)ufo_buffer_get_host_array(inputs[0], NULL);
- UfoRoofPacketBlockHeader *header = UFO_ROOF_PACKET_BLOCK_HEADER(data, cfg);
+ void *output_buffer = ufo_buffer_get_host_array(output, NULL);
if (priv->stop)
return FALSE;
- const uint8_t *fragment = data;
- for (guint i = 0; i < header->n_packets; i++) {
- guint64 packet_id = 0;
- // Otherwise considered consecutive and handled by the buffer
- if (cfg->header_size >= sizeof(UfoRoofPacketHeader)) {
- UfoRoofPacketHeader *pheader = UFO_ROOF_PACKET_HEADER(fragment);
- packet_id = be64toh(pheader->packet_id) + 1;
- }
+ priv->current_dataset;
+ priv->current_buffer = output_buffer;
- // FIXME: Can we kill here the dataset finished during the previous step of iteration
- ready |= ufo_roof_buffer_set_fragment(buf, header->channel_id, packet_id, fragment + cfg->header_size, &gerr);
- if (gerr) roof_print_error(gerr);
+ err = hw_sched_schedule_thread_task(sched, (void*)&tnv_ctx, ufo_roof_build_task_read);
+ if (!err) err = hw_sched_wait_task(sched);
+ if (err) { fprintf(stderr, "Error %i scheduling init threads", err); exit(-1); }
- fragment += cfg->max_packet_size;
- }
-
- // FIXME: if 2nd dataset is ready (2nd and 3rd?), skip the first one?
-
- if (ready) {
- clock_gettime(CLOCK_REALTIME, &priv->last_fragment_timestamp);
- } else {
- struct timespec current_time;
- clock_gettime(CLOCK_REALTIME, &current_time);
-
- // No new accepted events within timeout
- if (((current_time.tv_sec - priv->last_fragment_timestamp.tv_sec) * 1000000 + (current_time.tv_nsec - priv->last_fragment_timestamp.tv_nsec) / 1000) > cfg->network_timeout) {
- ready = ufo_roof_buffer_skip_to_ready(buf);
- if (ready) {
- // FIXME: shall we really reset timer here?
- clock_gettime(CLOCK_REALTIME, &priv->last_fragment_timestamp);
- } else {
+/*
+ // FIXME: Wait or break. Not both.
+ do {
+ ready = ufo_roof_buffer_wait_dataset(buf, output_buffer, &seqid, cfg->network_timeout, &gerr);
+ if (gerr) roof_print_error(gerr);
+
+ if (!ready) {
+ ready = ufo_roof_buffer_skip_to_ready(buf);
+ if (!ready) {
priv->stop = TRUE;
g_object_notify_by_pspec (G_OBJECT(task), properties[PROP_STOP]);
- }
+ return FALSE;
+ }
}
- }
-
-
-/*
- printf("proc (%s - %u of %u) - channel: %i, packets: %i, first dataset: %i\n", ready?"yes":" no", buf->n_fragments[(buf->current_id)%buf->ring_size], buf->fragments_per_dataset, header->channel_id, header->n_packets,
- (cfg->header_size >= sizeof(UfoRoofPacketHeader))?UFO_ROOF_PACKET_HEADER(data)->packet_id / (cfg->dataset_size / cfg->payload_size / cfg->n_streams):0);
+ } while (!ready);
*/
- return !ready;
-}
-
-static gboolean
-ufo_roof_build_task_generate (UfoTask *task,
- UfoBuffer *output,
- UfoRequisition *requisition)
-{
- gboolean ready = FALSE;
- gulong seqid;
- GError *gerr = NULL;
- GValue ival = G_VALUE_INIT;
- GValue lval = G_VALUE_INIT;
-
- UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task);
- UfoRoofConfig *cfg = priv->cfg;
- UfoRoofBuffer *buf = priv->buf;
-
- void *output_buffer = ufo_buffer_get_host_array(output, NULL);
-
- if (priv->stop)
- return FALSE;
-
- ready = ufo_roof_buffer_get_dataset(buf, output_buffer, &seqid, &gerr);
- if (gerr) roof_print_error(gerr);
+ // FIXME: integrate fastwriter somewhere here?
if (priv->build == BUILD_UFO) {
switch (cfg->bit_depth) {
@@ -318,8 +371,13 @@ ufo_roof_build_task_generate (UfoTask *task,
g_object_notify_by_pspec (G_OBJECT(task), properties[PROP_STOP]);
}
+ if (ready) priv->generated++;
+
if (((priv->number > 0)&&(priv->number <= 100))||((buf->current_id - priv->announced) > 1000)) {
- printf("Processing dataset %li (%s), next: %u out of %u\n", buf->current_id + (ready?0:1), (ready?"ready ":"timeout "), buf->n_fragments[buf->current_id%buf->ring_size], buf->fragments_per_dataset);
+ if (ready)
+ printf("Processing dataset %li (ready ), next : %u out of %u\n", buf->current_id, buf->n_fragments[buf->current_id%buf->ring_size], buf->fragments_per_dataset);
+ else
+ printf("Skipping dataset %li (timeout), acquired: %u out of %u\n", buf->current_id + 1, buf->n_fragments[buf->current_id%buf->ring_size], buf->fragments_per_dataset);
priv->announced = buf->current_id;
}
@@ -332,7 +390,7 @@ ufo_roof_build_task_set_property (GObject *object,
const GValue *value,
GParamSpec *pspec)
{
- UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (object);
+ RoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (object);
switch (property_id) {
case PROP_CONFIG:
@@ -345,6 +403,13 @@ ufo_roof_build_task_set_property (GObject *object,
case PROP_SIMULATE:
priv->simulate = g_value_get_boolean (value);
break;
+ case PROP_PATH:
+ if (priv->path) g_free(priv->path);
+ priv->path = g_value_dup_string(value);
+ break;
+ case PROP_FIRST:
+ priv->first_file_number = g_value_get_uint (value);
+ break;
case PROP_NUMBER:
priv->number = g_value_get_uint (value);
break;
@@ -367,11 +432,11 @@ ufo_roof_build_task_get_property (GObject *object,
GValue *value,
GParamSpec *pspec)
{
- UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (object);
+ RoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (object);
switch (property_id) {
case PROP_CONFIG:
- g_value_set_string(value, priv->config);
+ g_value_set_string(value, priv->config);
break;
case PROP_STOP:
g_value_set_boolean (value, priv->stop);
@@ -379,6 +444,12 @@ ufo_roof_build_task_get_property (GObject *object,
case PROP_SIMULATE:
g_value_set_boolean (value, priv->simulate);
break;
+ case PROP_PATH:
+ g_value_set_string(value, priv->path?priv->path:"");
+ break;
+ case PROP_FIRST:
+ g_value_set_uint (value, priv->first_file_number);
+ break;
case PROP_NUMBER:
g_value_set_uint (value, priv->number);
break;
@@ -394,17 +465,18 @@ ufo_roof_build_task_get_property (GObject *object,
static void
ufo_task_interface_init (UfoTaskIface *iface)
{
+ roof_init();
+
iface->setup = ufo_roof_build_task_setup;
iface->get_num_inputs = ufo_roof_build_task_get_num_inputs;
iface->get_num_dimensions = ufo_roof_build_task_get_num_dimensions;
iface->get_mode = ufo_roof_build_task_get_mode;
iface->get_requisition = ufo_roof_build_task_get_requisition;
- iface->process = ufo_roof_build_task_process;
iface->generate = ufo_roof_build_task_generate;
}
static void
-ufo_roof_build_task_class_init (UfoRoofBuildTaskClass *klass)
+ufo_roof_build_task_class_init (RoofBuildTaskClass *klass)
{
GObjectClass *oclass = G_OBJECT_CLASS (klass);
@@ -426,7 +498,6 @@ ufo_roof_build_task_class_init (UfoRoofBuildTaskClass *klass)
FALSE,
G_PARAM_READWRITE);
-
properties[PROP_SIMULATE] =
g_param_spec_boolean ("simulate",
"Simulation mode",
@@ -434,6 +505,20 @@ ufo_roof_build_task_class_init (UfoRoofBuildTaskClass *klass)
FALSE,
G_PARAM_READWRITE);
+ properties[PROP_PATH] =
+ g_param_spec_string ("path",
+ "Input files for simulation mode",
+ "Optional path to input files for simulation mode (parameter from configuration file is used if not specified)",
+ "",
+ G_PARAM_READWRITE);
+
+ properties[PROP_FIRST] =
+ g_param_spec_uint ("first_file_number",
+ "Offset to the first read file",
+ "Offset to the first read file",
+ 0, G_MAXUINT, 0,
+ G_PARAM_READWRITE);
+
properties[PROP_NUMBER] =
g_param_spec_uint("number",
"Number of datasets to receive",
@@ -452,11 +537,11 @@ ufo_roof_build_task_class_init (UfoRoofBuildTaskClass *klass)
for (guint i = PROP_0 + 1; i < N_PROPERTIES; i++)
g_object_class_install_property (oclass, i, properties[i]);
- g_type_class_add_private (oclass, sizeof(UfoRoofBuildTaskPrivate));
+ g_type_class_add_private (oclass, sizeof(RoofBuildTaskPrivate));
}
static void
-ufo_roof_build_task_init(UfoRoofBuildTask *self)
+ufo_roof_build_task_init(RoofBuildTask *self)
{
self->priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE(self);
}
diff --git a/src/ufo-roof-read-file.h b/src/ufo-roof-read-file.h
deleted file mode 100644
index 787b441..0000000
--- a/src/ufo-roof-read-file.h
+++ /dev/null
@@ -1,8 +0,0 @@
-#ifndef __UFO_ROOF_READ_FILE_H
-#define __UFO_ROOF_READ_FILE_H
-
-#include "ufo-roof-read.h"
-
-UfoRoofReadInterface *ufo_roof_read_file_new(UfoRoofConfig *cfg, const char *path, guint file_id, GError **error);
-
-#endif
diff --git a/src/ufo-roof-read-socket.h b/src/ufo-roof-read-socket.h
deleted file mode 100644
index 74b0742..0000000
--- a/src/ufo-roof-read-socket.h
+++ /dev/null
@@ -1,8 +0,0 @@
-#ifndef __UFO_ROOF_READ_SOCKET_H
-#define __UFO_ROOF_READ_SOCKET_H
-
-#include "ufo-roof-read.h"
-
-UfoRoofReadInterface *ufo_roof_read_socket_new(UfoRoofConfig *cfg, guint id, GError **error);
-
-#endif
diff --git a/src/ufo-roof-read-task.c b/src/ufo-roof-read-task.c
index 7d55b79..83b9627 100644
--- a/src/ufo-roof-read-task.c
+++ b/src/ufo-roof-read-task.c
@@ -35,7 +35,7 @@
struct _UfoRoofReadTaskPrivate {
gchar *config; // ROOF configuration file name
UfoRoofConfig *cfg; // Parsed ROOF parameters
- UfoRoofReadInterface *reader;
+ UfoRoofReadInterface *reader[16];
guint id; // Reader ID (defince sequential port number)
gboolean stop; // Flag requiring termination
@@ -76,6 +76,7 @@ ufo_roof_read_task_setup (UfoTask *task,
UfoResources *resources,
GError **error)
{
+ guint i;
GError *gerr = NULL;
UfoRoofReadTaskPrivate *priv = UFO_ROOF_READ_TASK_GET_PRIVATE (task);
@@ -91,17 +92,19 @@ ufo_roof_read_task_setup (UfoTask *task,
roof_setup_error(error, "Specified Stream ID is %u, but only %u data streams is configured", priv->id, priv->cfg->n_streams);
// Start actual reader
- if (priv->simulate) {
- if (!priv->path)
- roof_setup_error(error, "Path to simulated data should be specified");
-
- priv->reader = ufo_roof_read_file_new(priv->cfg, priv->path, priv->id + priv->first_file_number, &gerr);
- } else
- priv->reader = ufo_roof_read_socket_new(priv->cfg, priv->id, &gerr);
- if (!priv->reader)
- roof_propagate_error(error, gerr, "roof_read_new: ");
+ for (i = 0; (i < priv->cfg->sockets_per_thread)&&((priv->id * priv->cfg->sockets_per_thread + i) < priv->cfg->n_streams); i++) {
+ if (priv->simulate) {
+ if (!priv->path)
+ roof_setup_error(error, "Path to simulated data should be specified");
+
+ priv->reader[i] = ufo_roof_read_file_new(priv->cfg, priv->path, priv->id * priv->cfg->sockets_per_thread + i + priv->first_file_number, &gerr);
+ } else
+ priv->reader[i] = ufo_roof_read_socket_new(priv->cfg, priv->id * priv->cfg->sockets_per_thread + i, &gerr);
+ if (!priv->reader[i])
+ roof_propagate_error(error, gerr, "roof_read_new: ");
+ }
}
@@ -110,8 +113,10 @@ ufo_roof_read_task_finalize (GObject *object)
{
UfoRoofReadTaskPrivate *priv = UFO_ROOF_READ_TASK_GET_PRIVATE (object);
- if (priv->reader) {
- priv->reader->close(priv->reader);
+ for (guint i = 0; i < priv->cfg->sockets_per_thread; i++) {
+ if (priv->reader[i]) {
+ priv->reader[i]->close(priv->reader[i]);
+ }
}
if (priv->cfg) {
@@ -180,16 +185,47 @@ ufo_roof_read_task_generate (UfoTask *task,
void *output_buffer = ufo_buffer_get_host_array(output, NULL);
UfoRoofPacketBlockHeader *header = UFO_ROOF_PACKET_BLOCK_HEADER(output_buffer, cfg);
+ uint64_t current_id[16] = {0};
+ uint64_t grabbed[16] = {0};
+
+ static uint64_t errors = 0;
+retry:
if (priv->stop)
return FALSE;
- guint packets = priv->reader->read(priv->reader, output_buffer, &gerr);
+ for (guint sid = 0; (sid < cfg->sockets_per_thread)&&((priv->id * cfg->sockets_per_thread + sid) < priv->cfg->n_streams); sid++) {
+
+ guint packets = priv->reader[sid]->read(priv->reader[sid], output_buffer, &gerr);
if (gerr) {
g_warning("Error reciving data: %s", gerr->message);
g_error_free(gerr);
return FALSE;
}
+ const uint8_t *fragment = output_buffer;
+ for (guint i = 0; i < packets; i++) {
+ guint64 packet_id = 0;
+
+ // Otherwise considered consecutive and handled by the buffer
+ if (cfg->header_size >= sizeof(UfoRoofPacketHeader)) {
+ UfoRoofPacketHeader *pheader = UFO_ROOF_PACKET_HEADER(fragment);
+ packet_id = be64toh(pheader->packet_id) + 1;
+ }
+
+ if ((current_id[sid])&&(current_id[sid] + 1 != packet_id)) {
+ printf("Channel %i(%i): =======> Missing %lu packets, expecting %lu but got %lu (N %i from total packets in pack %u)\n", priv->id * cfg->sockets_per_thread + sid, sid, packet_id - current_id[sid] - 1, current_id[sid] + 1, packet_id, i, packets);
+ //if (++errors > 1000) exit(1);
+ }
+ current_id[sid] = packet_id;
+ grabbed[sid]++;
+ if ((grabbed[sid]%1000000)==0) printf("Channel %i(%i): Grabbed %lu Mpackets\n", priv->id * cfg->sockets_per_thread + sid, sid, grabbed[sid]/1000000);
+
+ fragment += cfg->max_packet_size;
+ }
+ }
+
+ goto retry;
+
#ifdef UFO_ROOF_DEBUG
// Store first received packet on each channel...
static int debug = 1;
@@ -206,12 +242,12 @@ ufo_roof_read_task_generate (UfoTask *task,
#endif /* UFO_ROOF_DEBUG */
// FIXME: End of data (shall we restart in the network case?)
- if (!packets)
- return FALSE;
+// if (!packets)
+// return FALSE;
// Shall I use UFO metadata (ufo_buffer_set_metadata) insead?
header->channel_id = priv->id;
- header->n_packets = packets;
+// header->n_packets = packets;
return TRUE;
}
diff --git a/src/ufo-roof-read.h b/src/ufo-roof-read.h
deleted file mode 100644
index 5f0853c..0000000
--- a/src/ufo-roof-read.h
+++ /dev/null
@@ -1,17 +0,0 @@
-#ifndef __UFO_ROOF_READ_H
-#define __UFO_ROOF_READ_H
-
-#include "ufo-roof-config.h"
-
-typedef struct _UfoRoofReadInterface UfoRoofReadInterface;
-
-typedef guint (*UfoRoofReaderRead)(UfoRoofReadInterface *reader, uint8_t *buf, GError **error);
-typedef void (*UfoRoofReaderClose)(UfoRoofReadInterface *reader);
-
-struct _UfoRoofReadInterface {
- UfoRoofReaderRead read;
- UfoRoofReaderClose close;
-};
-
-
-#endif /* __UFO_ROOF_READ_H */
diff --git a/src/ufo-roof.h b/src/ufo-roof.h
index 23f8429..ea422e2 100644
--- a/src/ufo-roof.h
+++ b/src/ufo-roof.h
@@ -1,21 +1,21 @@
-#ifndef __UFO_ROOF_H
-#define __UFO_ROOF_H
+#ifndef __ROOF_H
+#define __ROOF_H
#include "ufo-roof-config.h"
#include "ufo-roof-error.h"
-//#define UFO_ROOF_DEBUG
-#define UFO_ROOF_PACKET_HEADER(buf) ((UfoRoofPacketHeader*)(buf))
-#define UFO_ROOF_PACKET_BLOCK_HEADER(buf, cfg) ((UfoRoofPacketBlockHeader*)(((uint8_t*)buf) + cfg->max_packets * cfg->max_packet_size))
+//#define ROOF_DEBUG
+#define ROOF_PACKET_HEADER(buf) ((RoofPacketHeader*)(buf))
+#define ROOF_PACKET_BLOCK_HEADER(buf, cfg) ((RoofPacketBlockHeader*)(((uint8_t*)buf) + cfg->max_packets * cfg->max_packet_size))
typedef struct {
uint64_t packet_id; // Sequential Packet ID (numbered from 0)
-} UfoRoofPacketHeader;
+} RoofPacketHeader;
typedef struct {
uint32_t channel_id; // Specifies channel on which the data were received (numbered from 0)
uint32_t n_packets; // Number of packets
-} UfoRoofPacketBlockHeader;
+} RoofPacketBlockHeader;
-#endif /* __UFO_ROOF_H */
+#endif /* __ROOF_H */