From 5172421d248250b4ab3b69eb57fd83656e23a4da Mon Sep 17 00:00:00 2001 From: "Suren A. Chilingaryan" Date: Thu, 3 Sep 2020 03:00:30 +0200 Subject: This is unfinished work implemeting out-of-UFO network servers --- src/CMakeLists.txt | 10 +- src/hw_config.h | 8 + src/hw_sched.c | 398 ++++++++++++++++++++++++++++ src/hw_sched.h | 146 +++++++++++ src/hw_thread.c | 143 ++++++++++ src/hw_thread.h | 76 ++++++ src/meson.build | 7 +- src/roof-buffer.c | 252 ++++++++++++++++++ src/roof-buffer.h | 52 ++++ src/roof-config.c | 245 ++++++++++++++++++ src/roof-config.h | 64 +++++ src/roof-error.h | 61 +++++ src/roof-read-file.c | 98 +++++++ src/roof-read-file.h | 8 + src/roof-read-socket.c | 159 ++++++++++++ src/roof-read-socket.h | 8 + src/roof-read.c | 107 ++++++++ src/roof-read.h | 45 ++++ src/roof-thread.c | 95 +++++++ src/roof-thread.h | 30 +++ src/roof.c | 133 ++++++++++ src/roof.h | 46 ++++ src/save/memcpy.c | 344 ++++++++++++++++++++++++ src/save/memcpy.h | 63 +++++ src/save/ufo-roof-buffer-build-task.c | 474 ++++++++++++++++++++++++++++++++++ src/save/ufo-roof-read-thread.c | 155 +++++++++++ src/save/ufo-roof-read-thread.h | 23 ++ src/save/ufo-roof-read.c | 123 +++++++++ src/save/ufo-roof-read.h | 61 +++++ src/ufo-roof-buffer.c | 209 --------------- src/ufo-roof-buffer.h | 47 ---- src/ufo-roof-build-task.c | 271 ++++++++++++------- src/ufo-roof-config.c | 243 ----------------- src/ufo-roof-config.h | 64 ----- src/ufo-roof-error.h | 58 ----- src/ufo-roof-read-file.c | 84 ------ src/ufo-roof-read-file.h | 8 - src/ufo-roof-read-socket.c | 134 ---------- src/ufo-roof-read-socket.h | 8 - src/ufo-roof-read-task.c | 68 +++-- src/ufo-roof-read.h | 17 -- src/ufo-roof.h | 16 +- 42 files changed, 3662 insertions(+), 999 deletions(-) create mode 100644 src/hw_config.h create mode 100644 src/hw_sched.c create mode 100644 src/hw_sched.h create mode 100644 src/hw_thread.c create mode 100644 src/hw_thread.h create mode 100644 src/roof-buffer.c create mode 100644 src/roof-buffer.h create mode 100644 src/roof-config.c create mode 100644 src/roof-config.h create mode 100644 src/roof-error.h create mode 100644 src/roof-read-file.c create mode 100644 src/roof-read-file.h create mode 100644 src/roof-read-socket.c create mode 100644 src/roof-read-socket.h create mode 100644 src/roof-read.c create mode 100644 src/roof-read.h create mode 100644 src/roof-thread.c create mode 100644 src/roof-thread.h create mode 100644 src/roof.c create mode 100644 src/roof.h create mode 100644 src/save/memcpy.c create mode 100644 src/save/memcpy.h create mode 100644 src/save/ufo-roof-buffer-build-task.c create mode 100644 src/save/ufo-roof-read-thread.c create mode 100644 src/save/ufo-roof-read-thread.h create mode 100644 src/save/ufo-roof-read.c create mode 100644 src/save/ufo-roof-read.h delete mode 100644 src/ufo-roof-buffer.c delete mode 100644 src/ufo-roof-buffer.h delete mode 100644 src/ufo-roof-config.c delete mode 100644 src/ufo-roof-config.h delete mode 100644 src/ufo-roof-error.h delete mode 100644 src/ufo-roof-read-file.c delete mode 100644 src/ufo-roof-read-file.h delete mode 100644 src/ufo-roof-read-socket.c delete mode 100644 src/ufo-roof-read-socket.h delete mode 100644 src/ufo-roof-read.h (limited to 'src') diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 720c18f..837a6fb 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -2,7 +2,6 @@ cmake_minimum_required(VERSION 2.6) #{{{ Sources set(ufofilter_SRCS - ufo-roof-read-task.c ufo-roof-build-task.c ufo-roof-filter-task.c ufo-roof-flat-field-correct-task.c @@ -12,12 +11,11 @@ set(common_SRCS ufo-roof-config.c ) -set(roof_read_aux_SRCS +set(roof_build_aux_SRCS + hw_sched.c + hw_thread.c ufo-roof-read-socket.c ufo-roof-read-file.c - ) - -set(roof_build_aux_SRCS ufo-roof-buffer.c ) @@ -30,7 +28,7 @@ set(ufofilter_LIBS ${UFO_LIBRARIES} ${OpenCL_LIBRARIES}) -set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -std=gnu18 -pedantic -Wall -Wextra -fPIC -Wno-unused-parameter -Wno-deprecated-declarations") +set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -std=gnu18 -pedantic -Wall -Wextra -fPIC -Wno-unused-parameter -Wno-deprecated-declarations -g -gdwarf-2 -g3 -fno-omit-frame-pointer") add_definitions(-D_FILE_OFFSET_BITS=64 -D_LARGE_FILES) #}}} diff --git a/src/hw_config.h b/src/hw_config.h new file mode 100644 index 0000000..2351dea --- /dev/null +++ b/src/hw_config.h @@ -0,0 +1,8 @@ +#ifndef _HW_CONFIG_H +#define _HW_CONFIG_H + + // enable threading +#define HW_HAVE_SCHED_HEADERS +#define HW_USE_THREADS + +#endif diff --git a/src/hw_sched.c b/src/hw_sched.c new file mode 100644 index 0000000..ec4d812 --- /dev/null +++ b/src/hw_sched.c @@ -0,0 +1,398 @@ +/* + * The PyHST program is Copyright (C) 2002-2011 of the + * European Synchrotron Radiation Facility (ESRF) and + * Karlsruhe Institute of Technology (KIT). + * + * PyHST is free software: you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * hst is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program. If not, see . + */ + +#define _GNU_SOURCE +#include +#include +#include + +#include "hw_config.h" + +#ifdef HW_HAVE_SCHED_HEADERS +# include +# include +# include +#endif /* HW_HAVE_SCHED_HEADERS */ + +#include "hw_sched.h" + + +#ifdef HW_USE_THREADS +# define MUTEX_INIT(ctx, name) \ + if (!err) { \ + ctx->name##_mutex = g_mutex_new(); \ + if (!ctx->name##_mutex) err = 1; \ + } + +# define MUTEX_FREE(ctx, name) \ + if (ctx->name##_mutex) g_mutex_free(ctx->name##_mutex); + +# define COND_INIT(ctx, name) \ + MUTEX_INIT(ctx, name##_cond) \ + if (!err) { \ + ctx->name##_cond = g_cond_new(); \ + if (!ctx->name##_cond) { \ + err = 1; \ + MUTEX_FREE(ctx, name##_cond) \ + } \ + } + +# define COND_FREE(ctx, name) \ + if (ctx->name##_cond) g_cond_free(ctx->name##_cond); \ + MUTEX_FREE(ctx, name##_cond) +#else /* HW_USE_THREADS */ +# define MUTEX_INIT(ctx, name) +# define MUTEX_FREE(ctx, name) +# define COND_INIT(ctx, name) +# define COND_FREE(ctx, name) +#endif /* HW_USE_THREADS */ + + +HWRunFunction ppu_run[] = { + (HWRunFunction)NULL +}; + +static int hw_sched_initialized = 0; + +int hw_sched_init(void) { + if (!hw_sched_initialized) { +#ifdef HW_USE_THREADS + g_thread_init(NULL); +#endif /* HW_USE_THREADS */ + hw_sched_initialized = 1; + } + + return 0; +} + + +int hw_sched_get_cpu_count(void) { +#ifdef HW_HAVE_SCHED_HEADERS + int err; + + int cpu_count; + cpu_set_t mask; + + err = sched_getaffinity(getpid(), sizeof(mask), &mask); + if (err) return 1; + +# ifdef CPU_COUNT + cpu_count = CPU_COUNT(&mask); +# else + for (cpu_count = 0; cpu_count < CPU_SETSIZE; cpu_count++) { + if (!CPU_ISSET(cpu_count, &mask)) break; + } +# endif + + if (!cpu_count) cpu_count = 1; + return cpu_count; +#else /* HW_HAVE_SCHED_HEADERS */ + return 1; +#endif /* HW_HAVE_SCHED_HEADERS */ +} + + +HWSched hw_sched_create(int cpu_count) { + int i; + int err = 0; + + HWSched ctx; + + //hw_sched_init(); + + ctx = (HWSched)malloc(sizeof(HWSchedS)); + if (!ctx) return NULL; + + memset(ctx, 0, sizeof(HWSchedS)); + + ctx->status = 1; + + MUTEX_INIT(ctx, sync); + MUTEX_INIT(ctx, data); + COND_INIT(ctx, compl); + COND_INIT(ctx, job); + + if (err) { + hw_sched_destroy(ctx); + return NULL; + } + + if (!cpu_count) cpu_count = hw_sched_get_cpu_count(); + if (cpu_count > HW_MAX_THREADS) cpu_count = HW_MAX_THREADS; + + ctx->n_threads = 0; + for (i = 0; i < cpu_count; i++) { + ctx->thread[ctx->n_threads] = hw_thread_create(ctx, ctx->n_threads, NULL, ppu_run, NULL); + if (ctx->thread[ctx->n_threads]) { +#ifndef HW_USE_THREADS + ctx->thread[ctx->n_threads]->status = HW_THREAD_STATUS_STARTING; +#endif /* HW_USE_THREADS */ + ++ctx->n_threads; + } + } + + if (!ctx->n_threads) { + hw_sched_destroy(ctx); + return NULL; + } + + return ctx; +} + +static int hw_sched_wait_threads(HWSched ctx) { +#ifdef HW_USE_THREADS + int i = 0; + + hw_sched_lock(ctx, compl_cond); + while (i < ctx->n_threads) { + for (; i < ctx->n_threads; i++) { + if (ctx->thread[i]->status == HW_THREAD_STATUS_INIT) { + hw_sched_wait(ctx, compl); + break; + } + } + + } + hw_sched_unlock(ctx, compl_cond); +#endif /* HW_USE_THREADS */ + + ctx->started = 1; + + return 0; +} + +void hw_sched_destroy(HWSched ctx) { + int i; + + if (ctx->n_threads > 0) { + if (!ctx->started) { + hw_sched_wait_threads(ctx); + } + + ctx->status = 0; + hw_sched_lock(ctx, job_cond); + hw_sched_broadcast(ctx, job); + hw_sched_unlock(ctx, job_cond); + + for (i = 0; i < ctx->n_threads; i++) { + hw_thread_destroy(ctx->thread[i]); + } + } + + COND_FREE(ctx, job); + COND_FREE(ctx, compl); + MUTEX_FREE(ctx, data); + MUTEX_FREE(ctx, sync); + + free(ctx); +} + +int hw_sched_set_sequential_mode(HWSched ctx, int *n_blocks, int *cur_block, HWSchedFlags flags) { + ctx->mode = HW_SCHED_MODE_SEQUENTIAL; + ctx->n_blocks = n_blocks; + ctx->cur_block = cur_block; + ctx->flags = flags; + + return 0; +} + +int hw_sched_get_chunk(HWSched ctx, int thread_id) { + int block; + + switch (ctx->mode) { + case HW_SCHED_MODE_PREALLOCATED: + if (ctx->thread[thread_id]->status == HW_THREAD_STATUS_STARTING) { +#ifndef HW_USE_THREADS + ctx->thread[thread_id]->status = HW_THREAD_STATUS_DONE; +#endif /* HW_USE_THREADS */ + return thread_id; + } else { + return HW_SCHED_CHUNK_INVALID; + } + case HW_SCHED_MODE_SEQUENTIAL: + if ((ctx->flags&HW_SCHED_FLAG_INIT_CALL)&&(ctx->thread[thread_id]->status == HW_THREAD_STATUS_STARTING)) { + return HW_SCHED_CHUNK_INIT; + } + hw_sched_lock(ctx, data); + block = *ctx->cur_block; + if (block < *ctx->n_blocks) { + *ctx->cur_block = *ctx->cur_block + 1; + } else { + block = HW_SCHED_CHUNK_INVALID; + } + hw_sched_unlock(ctx, data); + if (block == HW_SCHED_CHUNK_INVALID) { + if (((ctx->flags&HW_SCHED_FLAG_FREE_CALL)&&(ctx->thread[thread_id]->status == HW_THREAD_STATUS_RUNNING))) { + ctx->thread[thread_id]->status = HW_THREAD_STATUS_FINISHING; + return HW_SCHED_CHUNK_FREE; + } + if ((ctx->flags&HW_SCHED_FLAG_TERMINATOR_CALL)&&((ctx->thread[thread_id]->status == HW_THREAD_STATUS_RUNNING)||(ctx->thread[thread_id]->status == HW_THREAD_STATUS_FINISHING))) { + int i; + hw_sched_lock(ctx, data); + for (i = 0; i < ctx->n_threads; i++) { + if (thread_id == i) continue; + if ((ctx->thread[i]->status != HW_THREAD_STATUS_DONE)&&(ctx->thread[i]->status != HW_THREAD_STATUS_FINISHING2)&&(ctx->thread[i]->status != HW_THREAD_STATUS_IDLE)) { + break; + } + } + ctx->thread[thread_id]->status = HW_THREAD_STATUS_FINISHING2; + hw_sched_unlock(ctx, data); + if (i == ctx->n_threads) { + return HW_SCHED_CHUNK_TERMINATOR; + } + } + } + return block; + default: + return HW_SCHED_CHUNK_INVALID; + } + + return -1; +} + + +int hw_sched_schedule_task(HWSched ctx, void *appctx, HWEntry entry) { +#ifdef HW_USE_THREADS + if (!ctx->started) { + hw_sched_wait_threads(ctx); + } +#else /* HW_USE_THREADS */ + int err; + int i, chunk_id, n_threads; + HWRunFunction run; + HWThread thrctx; +#endif /* HW_USE_THREADS */ + + ctx->ctx = appctx; + ctx->entry = entry; + + switch (ctx->mode) { + case HW_SCHED_MODE_SEQUENTIAL: + *ctx->cur_block = 0; + break; + default: + ; + } + +#ifdef HW_USE_THREADS + hw_sched_lock(ctx, compl_cond); + + hw_sched_lock(ctx, job_cond); + hw_sched_broadcast(ctx, job); + hw_sched_unlock(ctx, job_cond); +#else /* HW_USE_THREADS */ + n_threads = ctx->n_threads; + + for (i = 0; i < n_threads; i++) { + thrctx = ctx->thread[i]; + thrctx->err = 0; + } + + i = 0; + thrctx = ctx->thread[i]; + chunk_id = hw_sched_get_chunk(ctx, thrctx->thread_id); + + while (chunk_id >= 0) { + run = hw_run_entry(thrctx->runs, entry); + err = run(thrctx, thrctx->hwctx, chunk_id, appctx); + if (err) { + thrctx->err = err; + break; + } + + if ((++i) == n_threads) i = 0; + thrctx = ctx->thread[i]; + chunk_id = hw_sched_get_chunk(ctx, thrctx->thread_id); + } +#endif /* HW_USE_THREADS */ + + return 0; +} + +int hw_sched_wait_task(HWSched ctx) { + int err = 0; + int i = 0, n_threads = ctx->n_threads; + +#ifdef HW_USE_THREADS + while (i < ctx->n_threads) { + for (; i < ctx->n_threads; i++) { + if (ctx->thread[i]->status == HW_THREAD_STATUS_DONE) { + ctx->thread[i]->status = HW_THREAD_STATUS_IDLE; + } else { + hw_sched_wait(ctx, compl); + break; + } + } + + } + + hw_sched_unlock(ctx, compl_cond); +#endif /* HW_USE_THREADS */ + + for (i = 0; i < n_threads; i++) { + HWThread thrctx = ctx->thread[i]; + if (thrctx->err) return err = thrctx->err; + +#ifndef HW_USE_THREADS + thrctx->status = HW_THREAD_STATUS_IDLE; +#endif /* HW_USE_THREADS */ + } + + return err; +} + +int hw_sched_execute_task(HWSched ctx, void *appctx, HWEntry entry) { + int err; + + err = hw_sched_schedule_task(ctx, appctx, entry); + if (err) return err; + + return hw_sched_wait_task(ctx); +} + +int hw_sched_schedule_thread_task(HWSched ctx, void *appctx, HWEntry entry) { + int err; + + ctx->saved_mode = ctx->mode; + ctx->mode = HW_SCHED_MODE_PREALLOCATED; + err = hw_sched_schedule_task(ctx, appctx, entry); + + return err; +} + + +int hw_sched_wait_thread_task(HWSched ctx) { + int err; + + err = hw_sched_wait_task(ctx); + ctx->mode = ctx->saved_mode; + + return err; +} + +int hw_sched_execute_thread_task(HWSched ctx, void *appctx, HWEntry entry) { + int err; + int saved_mode = ctx->mode; + + ctx->mode = HW_SCHED_MODE_PREALLOCATED; + err = hw_sched_execute_task(ctx, appctx, entry); + ctx->mode = saved_mode; + + return err; +} diff --git a/src/hw_sched.h b/src/hw_sched.h new file mode 100644 index 0000000..af9e363 --- /dev/null +++ b/src/hw_sched.h @@ -0,0 +1,146 @@ +/* + * The PyHST program is Copyright (C) 2002-2011 of the + * European Synchrotron Radiation Facility (ESRF) and + * Karlsruhe Institute of Technology (KIT). + * + * PyHST is free software: you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * hst is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program. If not, see . + */ + +#ifndef _HW_SCHED_H +#define _HW_SCHED_H + +#include + +typedef struct HWSchedT *HWSched; +#ifdef HW_USE_THREADS +typedef GMutex *HWMutex; +#else /* HW_USE_THREADS */ +typedef void *HWMutex; +#endif /* HW_USE_THREADS */ + + +#include "hw_thread.h" + +enum HWSchedModeT { + HW_SCHED_MODE_PREALLOCATED = 0, + HW_SCHED_MODE_SEQUENTIAL +}; +typedef enum HWSchedModeT HWSchedMode; + +enum HWSchedChunkT { + HW_SCHED_CHUNK_INVALID = -1, + HW_SCHED_CHUNK_INIT = -2, + HW_SCHED_CHUNK_FREE = -3, + HW_SCHED_CHUNK_TERMINATOR = -4 +}; +typedef enum HWSchedChunkT HWSchedChunk; + +enum HWSchedFlagsT { + HW_SCHED_FLAG_INIT_CALL = 1, //! Executed in each thread before real chunks + HW_SCHED_FLAG_FREE_CALL = 2, //! Executed in each thread after real chunks + HW_SCHED_FLAG_TERMINATOR_CALL = 4 //! Executed in one of the threads after all threads are done +}; +typedef enum HWSchedFlagsT HWSchedFlags; + + +#define HW_SINGLE_MODE +//#define HW_DETECT_CPU_CORES +#define HW_MAX_THREADS 128 + +#ifdef HW_SINGLE_MODE + typedef HWRunFunction HWEntry; +# define hw_run_entry(runs, entry) entry +#else /* HW_SINGLE_MODE */ + typedef int HWEntry; +# define hw_run_entry(runs, entry) runs[entry] +#endif /* HW_SINGLE_MODE */ + +#ifndef HW_HIDE_DETAILS +struct HWSchedT { + int status; + int started; + + int n_threads; + HWThread thread[HW_MAX_THREADS]; + +#ifdef HW_USE_THREADS + GCond *job_cond, *compl_cond; + GMutex *job_cond_mutex, *compl_cond_mutex, *data_mutex; + GMutex *sync_mutex; +#endif /* HW_USE_THREADS */ + + HWSchedMode mode; + HWSchedMode saved_mode; + HWSchedFlags flags; + int *n_blocks; + int *cur_block; + + HWEntry entry; + void *ctx; +}; +typedef struct HWSchedT HWSchedS; +#endif /* HW_HIDE_DETAILS */ + +# ifdef __cplusplus +extern "C" { +# endif + +HWSched hw_sched_create(int ppu_count); +int hw_sched_init(void); +void hw_sched_destroy(HWSched ctx); +int hw_sched_get_cpu_count(void); + +int hw_sched_set_sequential_mode(HWSched ctx, int *n_blocks, int *cur_block, HWSchedFlags flags); +int hw_sched_get_chunk(HWSched ctx, int thread_id); +int hw_sched_schedule_task(HWSched ctx, void *appctx, HWEntry entry); +int hw_sched_wait_task(HWSched ctx); +int hw_sched_execute_task(HWSched ctx, void *appctx, HWEntry entry); + +int hw_sched_schedule_thread_task(HWSched ctx, void *appctx, HWEntry entry); +int hw_sched_wait_thread_task(HWSched ctx); +int hw_sched_execute_thread_task(HWSched ctx, void *appctx, HWEntry entry); + +HWMutex hw_sched_create_mutex(void); +void hw_sched_destroy_mutex(HWMutex ctx); + +#ifdef HW_USE_THREADS +# define hw_sched_lock(ctx, type) g_mutex_lock(ctx->type##_mutex) +# define hw_sched_unlock(ctx, type) g_mutex_unlock(ctx->type##_mutex) +# define hw_sched_broadcast(ctx, type) g_cond_broadcast(ctx->type##_cond) +# define hw_sched_signal(ctx, type) g_cond_signal(ctx->type##_cond) +# define hw_sched_wait(ctx, type) g_cond_wait(ctx->type##_cond, ctx->type##_cond_mutex) + +#define hw_sched_create_mutex(void) g_mutex_new() +#define hw_sched_destroy_mutex(ctx) g_mutex_free(ctx) +#define hw_sched_lock_mutex(ctx) g_mutex_lock(ctx) +#define hw_sched_unlock_mutex(ctx) g_mutex_unlock(ctx) +#else /* HW_USE_THREADS */ +# define hw_sched_lock(ctx, type) +# define hw_sched_unlock(ctx, type) +# define hw_sched_broadcast(ctx, type) +# define hw_sched_signal(ctx, type) +# define hw_sched_wait(ctx, type) + +#define hw_sched_create_mutex(void) NULL +#define hw_sched_destroy_mutex(ctx) +#define hw_sched_lock_mutex(ctx) +#define hw_sched_unlock_mutex(ctx) +#endif /* HW_USE_THREADS */ + +# ifdef __cplusplus +} +# endif + +#endif /* _HW_SCHED_H */ + diff --git a/src/hw_thread.c b/src/hw_thread.c new file mode 100644 index 0000000..0374630 --- /dev/null +++ b/src/hw_thread.c @@ -0,0 +1,143 @@ +/* + * The PyHST program is Copyright (C) 2002-2011 of the + * European Synchrotron Radiation Facility (ESRF) and + * Karlsruhe Institute of Technology (KIT). + * + * PyHST is free software: you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * hst is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program. If not, see . + */ + +#include +#include +#include + +#include "hw_config.h" + +#include "hw_sched.h" +#include "hw_thread.h" + +#ifdef HW_USE_THREADS +static void *hw_thread_function(HWThread ctx) { + int err; + int chunk_id; + + HWRunFunction *runs; + HWRunFunction run; + HWSched sched; + void *hwctx; + + sched = ctx->sched; + runs = ctx->run; + hwctx = ctx->hwctx; + + hw_sched_lock(sched, job_cond); + + hw_sched_lock(sched, compl_cond); + ctx->status = HW_THREAD_STATUS_IDLE; + hw_sched_broadcast(sched, compl); + hw_sched_unlock(sched, compl_cond); + + while (sched->status) { + hw_sched_wait(sched, job); + if (!sched->status) break; + + ctx->err = 0; + ctx->status = HW_THREAD_STATUS_STARTING; + hw_sched_unlock(sched, job_cond); + + run = hw_run_entry(runs, sched->entry); +#if 0 + // Offset to interleave transfers if the GPUBox is used + // Just check with CUDA_LAUNCH_BLOCKED the togpu time and put it here + // It should be still significantly less than BP time + // We can do callibration during initilization in future + + usleep(12000 * ctx->thread_id); +#endif + chunk_id = hw_sched_get_chunk(sched, ctx->thread_id); + + /* Should be after get_chunk, since we can check if it's first time */ + ctx->status = HW_THREAD_STATUS_RUNNING; + while (chunk_id != HW_SCHED_CHUNK_INVALID) { + //printf("Thread %i processing slice %i\n", ctx->thread_id, chunk_id); + err = run(ctx, hwctx, chunk_id, sched->ctx); + if (err) { + ctx->err = err; + break; + } + chunk_id = hw_sched_get_chunk(sched, ctx->thread_id); + } + + hw_sched_lock(sched, job_cond); + + hw_sched_lock(sched, compl_cond); + ctx->status = HW_THREAD_STATUS_DONE; + hw_sched_broadcast(sched, compl); + hw_sched_unlock(sched, compl_cond); + } + + hw_sched_unlock(sched, job_cond); + + g_thread_exit(NULL); + return NULL; /* TODO: check this */ +} +#endif /* HW_USE_THREADS */ + + +HWThread hw_thread_create(HWSched sched, int thread_id, void *hwctx, HWRunFunction *run_func, HWFreeFunction free_func) { + GError *err; + + HWThread ctx; + + ctx = (HWThread)malloc(sizeof(HWThreadS)); + if (!ctx) return ctx; + + memset(ctx, 0, sizeof(HWThreadS)); + + ctx->sched = sched; + ctx->hwctx = hwctx; + ctx->run = run_func; + ctx->free = free_func; + ctx->thread_id = thread_id; + ctx->status = HW_THREAD_STATUS_INIT; + +#ifdef HW_USE_THREADS + ctx->thread = g_thread_create((GThreadFunc)hw_thread_function, ctx, 1, &err); + if (!ctx->thread) { + g_error_free(err); + + hw_thread_destroy(ctx); + return NULL; + } +#endif /* HW_USE_THREADS */ + + return ctx; +} + +void hw_thread_destroy(HWThread ctx) { +#ifdef HW_USE_THREADS + if (ctx->thread) { + g_thread_join(ctx->thread); + } +#endif /* HW_USE_THREADS */ + + if (ctx->data) { + free(ctx->data); + } + + if (ctx->free) { + ctx->free(ctx->hwctx); + } + + free(ctx); +} diff --git a/src/hw_thread.h b/src/hw_thread.h new file mode 100644 index 0000000..de7f60f --- /dev/null +++ b/src/hw_thread.h @@ -0,0 +1,76 @@ +/* + * The PyHST program is Copyright (C) 2002-2011 of the + * European Synchrotron Radiation Facility (ESRF) and + * Karlsruhe Institute of Technology (KIT). + * + * PyHST is free software: you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * hst is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program. If not, see . + */ + +#ifndef _HW_THREAD_H +#define _HW_THREAD_H + +#include + +typedef struct HWThreadT *HWThread; +typedef int (*HWRunFunction)(HWThread thread, void *ctx, int block, void *attr); +typedef int (*HWFreeFunction)(void *ctx); + +#include "hw_sched.h" + +enum HWThreadStatusT { + HW_THREAD_STATUS_IDLE = 0, + HW_THREAD_STATUS_STARTING = 1, + HW_THREAD_STATUS_RUNNING = 2, + HW_THREAD_STATUS_FINISHING = 3, + HW_THREAD_STATUS_FINISHING2 = 4, + HW_THREAD_STATUS_DONE = 5, + HW_THREAD_STATUS_INIT = 6 +}; +typedef enum HWThreadStatusT HWThreadStatus; + + +#ifndef HW_HIDE_DETAILS +struct HWThreadT { + int thread_id; + HWSched sched; + +#ifdef HW_USE_THREADS + GThread *thread; +#endif /* HW_USE_THREADS */ + + void *hwctx; + HWRunFunction *run; + HWFreeFunction free; + + int err; + HWThreadStatus status; + + void *data; /**< Per-thread data storage, will be free'd if set */ +}; +typedef struct HWThreadT HWThreadS; +#endif /* HW_HIDE_DETAILS */ + +# ifdef __cplusplus +extern "C" { +# endif + +HWThread hw_thread_create(HWSched sched, int thread_id, void *hwctx, HWRunFunction *run_func, HWFreeFunction free_func); +void hw_thread_destroy(HWThread ctx); + +# ifdef __cplusplus +} +# endif + + +#endif /* _HW_THREAD_H */ diff --git a/src/meson.build b/src/meson.build index 324c744..fd41361 100644 --- a/src/meson.build +++ b/src/meson.build @@ -1,5 +1,4 @@ plugins = [ - 'roof-read', 'roof-build', 'roof-filter', 'roof-flat-field-correct' @@ -10,11 +9,11 @@ roof_common_src = [ ] roof_plugin_src = { - 'roof-read': [ + 'roof-build': [ + 'hw_sched.c', + 'hw_thread.c', 'ufo-roof-read-socket.c', 'ufo-roof-read-file.c', - ], - 'roof-build': [ 'ufo-roof-buffer.c', ], 'roof-filter': [ diff --git a/src/roof-buffer.c b/src/roof-buffer.c new file mode 100644 index 0000000..4ca0386 --- /dev/null +++ b/src/roof-buffer.c @@ -0,0 +1,252 @@ +#include +#include +#include + +#include "glib.h" + +#include "ufo-roof.h" +#include "ufo-roof-buffer.h" + +// This is currently not thread safe. With dual-filter architecture this will be called sequentially. + +RoofBuffer *roof_buffer_new(RoofConfig *cfg, guint n_dims, guint max_datasets, GError **error) { + if ((n_dims < 1)||(n_dims > 2)) + roof_new_error(error, "Unsupported number of dimmensions %u (only plain and 2D ROOF structure is currently supported)", n_dims); + + RoofBuffer *buffer = (RoofBuffer*)calloc(1, sizeof(RoofBuffer)); + if (!buffer) roof_new_error(error, "Can't allocate RoofBuffer"); + + buffer->max_datasets = max_datasets; + buffer->n_streams = cfg->n_streams; + buffer->ring_size = cfg->buffer_size; + buffer->drop_buffers = cfg->drop_buffers; + buffer->latency_buffers = cfg->latency_buffers; + buffer->n_dims = n_dims; + buffer->dataset_size = cfg->dataset_size; + buffer->dataset_dims[0] = cfg->fan_bins * cfg->bit_depth / 8; + buffer->dataset_dims[1] = cfg->fan_projections; + buffer->fragment_size = cfg->payload_size; + buffer->fragment_dims[0] = cfg->channels_per_module * cfg->bit_depth / 8; + buffer->fragment_dims[1] = buffer->fragment_size / buffer->fragment_dims[0]; + + buffer->fragments_per_dataset = buffer->dataset_size / buffer->fragment_size; + buffer->fragments_per_stream = buffer->fragments_per_dataset / cfg->n_streams; +// printf("Configuration: dataset: %u - %u fragments (%u streams x %u) x %u bytes\n", buffer->dataset_size, buffer->fragments_per_dataset, cfg->n_streams, buffer->fragments_per_stream, buffer->fragment_size); + + buffer->ring_buffer = malloc(buffer->ring_size * buffer->dataset_size); + buffer->n_fragments = (_Atomic guint*)calloc(buffer->ring_size, sizeof(_Atomic int)); + buffer->stream_fragment = (guint*)calloc(cfg->n_streams, sizeof(guint)); + +#ifdef ROOF_INDEPENDENT_STREAMS + buffer->first_id = malloc(buffer->n_streams * sizeof(guint64)); + if (!buffer->first_id) roof_new_error(error, "Can't allocate first_id buffer for ROOF datasets"); + for (guint i = 0; i < buffer->n_streams; i++) + buffer->first_id[i] = (guint64)-1; +#else + buffer->first_id = (guint64)-1; +#endif + buffer->last_id = malloc(buffer->n_streams * sizeof(guint64)); + + if ((!buffer->ring_buffer)||(!buffer->n_fragments)||(!buffer->stream_fragment)) { + roof_buffer_free(buffer); + roof_new_error(error, "Can't allocate ring buffer for ROOF datasets, total size %u", buffer->ring_size * buffer->dataset_size); + } + + return buffer; +} + +void roof_buffer_free(RoofBuffer *buffer) { + if (buffer) { +#ifdef ROOF_INDEPENDENT_STREAMS + if (buffer->first_id) + free(buffer->first_id); +#endif + if (buffer->ring_buffer) + free(buffer->ring_buffer); + if (buffer->n_fragments) + free(buffer->n_fragments); + if (buffer->stream_fragment) + free(buffer->stream_fragment); + + free(buffer); + } +} + + // fragment_id is numbered from 1 (0 - means auto) +gboolean roof_buffer_set_fragment(RoofBuffer *buffer, guint stream_id, guint64 fragment_id, gconstpointer fragment, GError **error) { + gboolean ready = FALSE; + guint buffer_id; + guint64 first_id; + guint64 dataset_id; + + if (!fragment_id) { + fragment_id = ++buffer->stream_fragment[stream_id]; + } + + // If we have packets of arbitrary size, we would need dataset_id transferred along with packet_id (otherwise complex guessing is required) + dataset_id = (fragment_id - 1) / buffer->fragments_per_stream; + fragment_id = (fragment_id - 1) % buffer->fragments_per_stream; + +#ifdef ROOF_INDEPENDENT_STREAMS + if (buffer->first_id[stream_id] == (guint64)-1) + buffer->first_id[stream_id] = dataset_id; + first_id = buffer->first_id[stream_id]; +#else + if (buffer->first_id == (guint64)-1) + buffer->first_id = dataset_id; + first_id = buffer->first_id; +#endif + if (dataset_id < first_id) + return FALSE; + + dataset_id -= first_id; + buffer_id = dataset_id % buffer->ring_size; + + // FIXME: Currently, this produces too much output. Introduce some kind of debugging mode? + if (dataset_id < buffer->current_id) { +// roof_network_error_with_retval(error, FALSE, "Late arrived packet for dataset %li, currently processing %li", dataset_id, buffer->current_id); + return FALSE; + } + + if ((buffer->max_datasets)&&(dataset_id >= buffer->max_datasets)) { +// printf("Stream %i: dataset %li < %li, first_id: %li\n", stream_id, dataset_id, buffer->max_datasets, first_id); + return FALSE; + } + + buffer->last_id[stream_id] = dataset_id; + + // We are not fast enough, new packets are arrvining to fast + if (dataset_id >= (buffer->current_id + buffer->ring_size)) { + // FIXME: Broken packets sanity checks? Allocate additional buffers on demand? + + guint64 min_id = (guint64)-1; + guint64 max_id = 0; + for (guint i = 0; i < buffer->n_streams; i++) { + if (min_id > buffer->last_id[stream_id]) min_id = buffer->last_id[stream_id]; + if (max_id < buffer->last_id[stream_id]) max_id = buffer->last_id[stream_id]; + } + printf("Desync: %lu (Min: %lu, Max: %lu), stream: %u (of %u), dataset: %lu\n", max_id - min_id, min_id, max_id, stream_id, buffer->n_streams, dataset_id); + printf("Parts: %i %i %i %i\n", buffer->n_fragments[buffer_id], buffer->n_fragments[(buffer_id + 1)%buffer->ring_size], buffer->n_fragments[(buffer_id + 2)%buffer->ring_size], buffer->n_fragments[(buffer_id + 3)%buffer->ring_size]); + + root_set_network_error(error, "Ring buffer exhausted. Get packet for dataset %li. Dropping datasets from %li to %li, dataset %li has %i parts of %i completed, already reported %lu", + dataset_id, buffer->current_id, dataset_id - (buffer->ring_size - buffer->drop_buffers), buffer->current_id, buffer->n_fragments[buffer_id], buffer->fragments_per_dataset, buffer->n_ready); + + // FIXME: Send semi-complete buffers further? + // FIXME: Or shall we drop more if larger buffers are allocated? + if ((dataset_id - buffer->current_id) > 2 * buffer->ring_size) { + memset(buffer->n_fragments, 0, buffer->ring_size * sizeof(_Atomic guint)); + // FIXME: Can finally received old buffer hit the counter here? Locking? + buffer->current_id = dataset_id; + } else { + for (guint i = buffer->current_id; i <= (dataset_id - (buffer->ring_size - buffer->drop_buffers)); i++) + buffer->n_fragments[i%buffer->ring_size] = 0; + // FIXME: Can finally received old buffers hit the counters here? Locking? + buffer->current_id = dataset_id - (buffer->ring_size - buffer->drop_buffers) + 1; + } + + // FIXME: Can other threads obsolete the buffer meanwhile? This is not single threaded any more + if (buffer->n_fragments[buffer->current_id%buffer->ring_size] == buffer->fragments_per_dataset) + ready = TRUE; + + // FIXME: In mult-threaded case, we need to ensure that all threads are stopped writting here (and generator is not reading) before we can reassign buffer to the new dataset. + // To avoid locking, we can store per-thread 'current_id' and only proceed to writting when all per-threads current_ids are equal or above the global value + // The updates may happen after writting/reading is finished. + } + +/* + printf("dataset: %lu (%u) is %u of %u complete, new packet: %lu (%ux%u %lu), packet_size: %u [%x]\n", + dataset_id, buffer_id, + buffer->n_fragments[buffer_id] + 1, buffer->fragments_per_dataset, + stream_id * buffer->fragments_per_stream + fragment_id, stream_id, buffer->fragments_per_stream, fragment_id, + buffer->fragment_size, ((uint32_t*)fragment)[0] + ); +*/ + + // FIXME: What if buffer obsolete meanwhile by other threads. We are not single threaded + uint8_t *dataset_buffer = buffer->ring_buffer + buffer_id * buffer->dataset_size; + if (buffer->n_dims == 2) { + uint8_t *fragment_buffer = dataset_buffer + + stream_id * buffer->fragment_dims[0] + // x-coordinate + (fragment_id * buffer->fragment_dims[1]) * buffer->dataset_dims[0]; // y-coordinate + + for (guint i = 0; i < buffer->fragment_dims[1]; ++i) { + memcpy(fragment_buffer + i * buffer->dataset_dims[0], (uint8_t*)fragment + i * buffer->fragment_dims[0], buffer->fragment_dims[0]); + } + } else { + // 1D stracture, simply putting fragment at the appropriate position in the stream + uint8_t *fragment_buffer = dataset_buffer + (stream_id * buffer->fragments_per_stream + fragment_id) * buffer->fragment_size; + memcpy(fragment_buffer, fragment, buffer->fragment_size); + } + + // FIXME: Sanity checks: verify is not a dublicate fragment? + // This stuff is anyway is single-threaded which is a problem. + //atomic_fetch_add(&buffer->n_fragments[buffer_id], 1); + buffer->n_fragments[buffer_id]++; + + if (buffer->n_fragments[buffer_id] == buffer->fragments_per_dataset) { + // FIXME: what about a complete dataset blocked by earlier one with only a few framents missing? + if (dataset_id == buffer->current_id) + ready = TRUE; + else if ((buffer->latency_buffers)&&(dataset_id >= (buffer->current_id + buffer->latency_buffers))) { + ready = roof_buffer_skip_to_ready(buffer); + } + + if (ready) buffer->n_ready++; + } + + return ready; +} + +gboolean roof_buffer_skip_to_ready(RoofBuffer *buffer) { + for (guint i = 0; i < buffer->ring_size; i++) { + guint64 id = buffer->current_id + i; + guint buffer_id = id % buffer->ring_size; + + if (buffer->n_fragments[buffer_id] == buffer->fragments_per_dataset) { + buffer->current_id = id; + return TRUE; + } + + printf("Skipping dataset %lu (%u), only %u of %u fragments are ready\n", id, buffer_id, buffer->n_fragments[buffer_id], buffer->fragments_per_dataset); + buffer->n_fragments[buffer_id] = 0; + } + + return FALSE; +} + + +gboolean roof_buffer_get_dataset(RoofBuffer *buffer, gpointer output_buffer, gulong *seqid, GError **error) { + guint buffer_id = buffer->current_id % buffer->ring_size; + void *dataset_buffer = buffer->ring_buffer + buffer_id * buffer->dataset_size; + + // FIXME: what about a complete dataset blocked by earlier one with only a few framents missing? + if (buffer->n_fragments[buffer_id] < buffer->fragments_per_dataset) return FALSE; + + memcpy(output_buffer, dataset_buffer, buffer->dataset_size); + if (seqid) *seqid = buffer->current_id; + + buffer->n_fragments[buffer_id] = 0; + buffer->current_id += 1; + + return TRUE; +} + +gboolean roof_buffer_wait_dataset(RoofBuffer *buffer, gpointer output_buffer, gulong *seqid, gulong timeout, GError **error) { + struct timespec start_time, current_time; + clock_gettime(CLOCK_REALTIME, &start_time); + + guint buffer_id = buffer->current_id % buffer->ring_size; + + // FIXME: what about a complete dataset blocked by earlier one with only a few framents missing? + while (buffer->n_fragments[buffer_id] < buffer->fragments_per_dataset) { + // FIXME: get some sleep? How we can interrupt it? + // just small nanosleep? + if (timeout) { + clock_gettime(CLOCK_REALTIME, ¤t_time); + gulong wait = (current_time.tv_sec - start_time.tv_sec) * 1000000 + (current_time.tv_nsec - start_time.tv_nsec) / 1000; + if (wait > timeout) return FALSE; + } + } + + return roof_buffer_get_dataset(buffer, output_buffer, seqid, error); +} diff --git a/src/roof-buffer.h b/src/roof-buffer.h new file mode 100644 index 0000000..a95dbaa --- /dev/null +++ b/src/roof-buffer.h @@ -0,0 +1,52 @@ +#ifndef __ROOF_BUFFER_H +#define __ROOF_BUFFER_H + + // This IS harmful! Just for testing +#define ROOF_INDEPENDENT_STREAMS + +#include +#include +#include "ufo-roof-config.h" + +struct _RoofBuffer { + guint64 current_id; // The ID of the first (active) dataset in the buffer +#ifdef ROOF_INDEPENDENT_STREAMS + guint64 *first_id; // The ID of the first received dataset (used for numbering), -1 means not yet known +#else + guint64 first_id; // The ID of the first received dataset (used for numbering), -1 means not yet known +#endif + guint64 *last_id; + guint64 n_ready; + + guint n_streams; + guint ring_size; // Number of datasets to buffer + guint drop_buffers; // If we need to catch up + guint latency_buffers; // we skip incomplete buffers if current_id + latency_buffers is ready + uint8_t *ring_buffer; // The ring buffer + _Atomic guint *n_fragments; // Number of completed fragments in each buffer + guint *stream_fragment; // Currently processed fragment in the stream (for ordered streams) +// int *fragments; // Mark individual completed fragments (if we care for partial data) + + + guint64 max_datasets; // Only the specified number of datasets will be buffered, the rest will be silently dropped + guint n_dims; // Indicates if we just assemble one fragment after another or there is 2D/3D data structure (ROOF) + guint dataset_size; // Size (in bytes) of a full dataset + guint dataset_dims[2]; // x (in bytes), y (in rows) + guint fragment_size; // Size (in bytes) of a single fragment (we expect fixed-size fragments at the moment) + guint fragment_dims[2]; // x (in bytes), y (in rows) + + guint fragments_per_dataset; // Number of packets in dataset (used to compute when dataset is ready) + guint fragments_per_stream; // Number of packets in each of data streams (used to compute when dataset is ready) +}; + +typedef struct _RoofBuffer RoofBuffer; + +RoofBuffer *roof_buffer_new(RoofConfig *cfg, guint n_dims, guint max_datasets, GError **error); +void roof_buffer_free(RoofBuffer *buf); + +gboolean roof_buffer_set_fragment(RoofBuffer *buffer, guint stream_id, guint64 fragment_id, gconstpointer fragment, GError **error); +gboolean roof_buffer_skip_to_ready(RoofBuffer *buffer); +gboolean roof_buffer_get_dataset(RoofBuffer *buffer, gpointer output_buffer, gulong *seqid, GError **error); +gboolean roof_buffer_wait_dataset(RoofBuffer *buffer, gpointer output_buffer, gulong *seqid, gulong timeout, GError **error); + +#endif diff --git a/src/roof-config.c b/src/roof-config.c new file mode 100644 index 0000000..189c08f --- /dev/null +++ b/src/roof-config.c @@ -0,0 +1,245 @@ +#include +#include + +#include + +#include + +#include "ufo-roof.h" +#include "ufo-roof-error.h" +#include "ufo-roof-config.h" + +#define roof_config_node_get_with_default(var, parent, type, name, default) do { \ + JsonNode *node = json_object_get_member(parent, name); \ + if (node) var = json_node_get_##type(node); \ + else var = default; \ + } while(0) + +#define roof_config_node_get_string_with_default(var, parent, name, default) do { \ + const gchar *str; \ + JsonNode *node = json_object_get_member(parent, name); \ + if (node) str = json_node_get_string(node); \ + else str = default; \ + if (var != str) { \ + if (var) g_free(var); \ + var = g_strdup(str); \ + } \ + } while(0) + +#define roof_config_node_get(var, parent, type, name) \ + roof_config_node_get_with_default(var, parent, type, name, var) + +#define roof_config_node_get_string(var, parent, name) \ + roof_config_node_get_string_with_default(var, parent, name, var) + + +typedef struct { + RoofConfig cfg; + + JsonParser *parser; +} RoofConfigPrivate; + +void roof_config_free(RoofConfig *cfg) { + if (cfg) { + RoofConfigPrivate *priv = (RoofConfigPrivate*)cfg; + + if (priv->parser) + g_object_unref (priv->parser); + + free(cfg); + } +} + +RoofConfig *roof_config_new(const char *config, RoofConfigFlags flags, GError **error) { + RoofConfigPrivate *priv; + RoofConfig *cfg; + +// JsonNode *node; + JsonObject *root = NULL; + JsonObject *hardware = NULL; + JsonObject *geometry = NULL; + JsonObject *optics = NULL; + JsonObject *network = NULL; + JsonObject *performance = NULL; + JsonObject *simulation = NULL; + JsonObject *reconstruction = NULL; + JsonObject *data = NULL; + + GError *gerr = NULL; + + priv = (RoofConfigPrivate*)malloc(sizeof(RoofConfigPrivate)); + if (!priv) roof_new_error(error, "Can't allocate RoofConfig"); + + memset(priv, 0, sizeof(RoofConfigPrivate)); + + // Set defaults + cfg = &priv->cfg; + + cfg->roof_mode = FALSE; + cfg->n_planes = 2; + cfg->n_modules = 16; + cfg->channels_per_module = 16; + cfg->bit_depth = 16; + cfg->samples_per_rotation = 1000; + cfg->sample_rate = 0; + cfg->imaging_rate = 0; + + cfg->port = 52067; + cfg->n_streams = 1; + cfg->protocol = "udp"; + cfg->network_timeout = 100000000; // FIXME: remove 0 + cfg->header_size = sizeof(RoofPacketHeader); + cfg->payload_size = 0; + cfg->max_packet_size = 0; + cfg->max_packets = 100; + cfg->dataset_size = 0; + cfg->buffer_size = 2; + cfg->latency_buffers = 0; + cfg->drop_buffers = 0; + cfg->sockets_per_thread = 4; + + + // Read configuration + priv->parser = json_parser_new_immutable (); + json_parser_load_from_file (priv->parser, config, &gerr); + + if (gerr != NULL) { + g_propagate_prefixed_error(error, gerr, "Error parsing JSON file (%s) with ROOF configuration: ", config); + roof_config_free(cfg); + return NULL; + } + + root = json_node_get_object (json_parser_get_root (priv->parser)); + + if (root) { + roof_config_node_get(hardware, root, object, "hardware"); + roof_config_node_get(geometry, root, object, "geometry"); + roof_config_node_get(optics, root, object, "optics"); + roof_config_node_get(network, root, object, "network"); + roof_config_node_get(reconstruction, root, object, "reconstruction"); + roof_config_node_get(performance, root, object, "performance"); + roof_config_node_get(data, root, object, "data"); + + if (flags&ROOF_CONFIG_SIMULATION) + roof_config_node_get(simulation, root, object, "simulation"); + } + + if (hardware) { + roof_config_node_get(cfg->n_planes, hardware, int, "planes"); + roof_config_node_get(cfg->n_modules, hardware, int, "modules"); + roof_config_node_get(cfg->channels_per_module, hardware, int, "channels_per_module"); + roof_config_node_get(cfg->bit_depth, hardware, int, "bit_depth"); + + roof_config_node_get(cfg->samples_per_rotation, hardware, int, "samples_per_rotation"); + roof_config_node_get(cfg->sample_rate, hardware, int, "sample_rate"); + roof_config_node_get(cfg->imaging_rate, hardware, int, "imaging_rate"); + + if ((cfg->sample_rate)||(cfg->imaging_rate)) { + if ((!cfg->sample_rate)||(!cfg->imaging_rate)||(cfg->sample_rate%cfg->imaging_rate)) { + roof_config_free(cfg); + roof_new_error(error, "Invalid sample (%u) and imaging (%u) rates are specified", cfg->sample_rate, cfg->imaging_rate); + } + + if ((json_object_get_member(hardware, "samples_per_rotation"))&&(cfg->samples_per_rotation != (cfg->sample_rate / cfg->imaging_rate))) { + roof_config_free(cfg); + roof_new_error(error, "The specified samples-per-rotation (%u) doesn't match sample/imaging rates (%u / %u)", cfg->samples_per_rotation, cfg->sample_rate, cfg->imaging_rate); + } + + cfg->samples_per_rotation = cfg->sample_rate / cfg->imaging_rate; + } + + if ((cfg->bit_depth%8)||(cfg->bit_depth > 32)) { + roof_config_free(cfg); + roof_new_error(error, "Invalid bit-depth (%u) is configured, only 8, 16, 24, 32 is currently supported", cfg->bit_depth); + } + + cfg->fan_projections = cfg->samples_per_rotation; + cfg->fan_bins = cfg->n_modules * cfg->channels_per_module; + + cfg->dataset_size = cfg->fan_projections * cfg->fan_bins * (cfg->bit_depth / 8); + cfg->n_streams = cfg->n_modules; + cfg->roof_mode = TRUE; + } + + if (network) { + roof_config_node_get(cfg->port, network, int, "port"); + roof_config_node_get(cfg->n_streams, network, int, "streams"); + + roof_config_node_get(cfg->payload_size, network, int, "payload_size"); + roof_config_node_get(cfg->header_size, network, int, "header_size"); + roof_config_node_get(cfg->max_packet_size, network, int, "max_packet_size"); + roof_config_node_get(cfg->dataset_size, network, int, "dataset_size"); + + if (!cfg->payload_size) { + roof_config_free(cfg); + roof_new_error(error, "Packet payload and header size must be set"); + } + + if ((cfg->header_size < sizeof(RoofPacketHeader))&&(!strncmp(cfg->protocol, "udp", 3))) { + roof_config_free(cfg); + roof_new_error(error, "The header with packet id (%lu bytes) is expected for un-ordered protocols", sizeof(RoofPacketHeader)); + } + + if (!cfg->dataset_size) + cfg->dataset_size = cfg->payload_size; + } + + if (simulation) { + roof_config_node_get(cfg->header_size, simulation, int, "header_size"); + + if (!cfg->payload_size) + cfg->payload_size = cfg->dataset_size; + } + + if (performance) { + roof_config_node_get(cfg->max_packets, performance, int, "packets_at_once"); + roof_config_node_get(cfg->buffer_size, performance, int, "buffer_size"); + roof_config_node_get(cfg->drop_buffers, performance, int, "drop_buffers"); + roof_config_node_get(cfg->latency_buffers, performance, int, "latency_buffers"); + roof_config_node_get(cfg->sockets_per_thread, performance, int, "sockets_per_thread"); + } + + + // Check configuration consistency + guint fragments_per_dataset = cfg->dataset_size / cfg->payload_size; + guint fragments_per_stream = fragments_per_dataset / cfg->n_streams; + + // Dataset should be split in an integer number of network packets (we don't expect data from different datasets in one packet at the moment) + if ((cfg->dataset_size % cfg->payload_size)||(fragments_per_dataset%cfg->n_streams)) { + roof_config_free(cfg); + roof_new_error(error, "Inconsistent ROOF configuration: dataset_size=%u, packet_size=%u, data_streams=%u", cfg->dataset_size, cfg->payload_size, cfg->n_streams); + } + + // Packet should contain an integer number of complete projections (their parts provided by a single module) + if ((cfg->roof_mode)&&(cfg->payload_size % (cfg->channels_per_module * (cfg->bit_depth / 8)))) { + roof_config_free(cfg); + roof_new_error(error, "Inconsistent ROOF configuration: packet_size=%u, projection_size=%u (%u channels x %u bits)", cfg->payload_size, cfg->channels_per_module * (cfg->bit_depth / 8), cfg->channels_per_module, cfg->bit_depth); + } + + if (!cfg->max_packet_size) + cfg->max_packet_size = cfg->header_size + cfg->payload_size; + + if (hardware) { + if (cfg->n_modules != cfg->n_streams) { + roof_config_free(cfg); + roof_new_error(error, "Currently, number of ROOF modules (%u) is exepcted to be equal to number of independent data streams (%u)", cfg->n_modules, cfg->n_streams); + } + + if (cfg->dataset_size != (cfg->fan_projections * cfg->fan_bins * cfg->bit_depth / 8)) { + roof_config_free(cfg); + roof_new_error(error, "Specified dataset size (%u) does not match ROOF configuration (modules: %u, channels-per-module: %u, bit-depth: %u, samples-per-rotation: %u)", cfg->dataset_size, cfg->n_modules, cfg->channels_per_module, cfg->bit_depth, cfg->samples_per_rotation); + } + } + + if ((cfg->buffer_size * fragments_per_stream) < cfg->max_packets) { + cfg->max_packets = cfg->buffer_size * fragments_per_stream / 2; + } + + if (cfg->buffer_size < 4) { + cfg->drop_buffers = 0; + } else if (cfg->drop_buffers >= cfg->buffer_size) { + cfg->drop_buffers = cfg->buffer_size / 2; + } + + return cfg; +} diff --git a/src/roof-config.h b/src/roof-config.h new file mode 100644 index 0000000..f28cb60 --- /dev/null +++ b/src/roof-config.h @@ -0,0 +1,64 @@ +#ifndef __ROOF_CONFIG_H +#define __ROOF_CONFIG_H + +#include + +typedef struct { + // ROOF Hardware + gboolean roof_mode; // Indicates if ROOF is configured (1), otherwise only networking is implemented + guint n_planes; // Number of detector planes, ROOF module serves a ring segment from all planes in a round-robin fashion + guint n_modules; // Number of ROOF modules + guint channels_per_module; // Number of pixels in each module + guint samples_per_rotation; // Number of samples (projections) in a full fan sinogram; computed from sample_rate & image_rate if given + guint sample_rate; // Number of samples (projections) acquired per second, 0 - if unknown + guint imaging_rate; // Number of complete datasets (images) acquired per second, 0 - if unknown + guint bit_depth; // Number of bits per pixel (we currently support only multiples of 8) + + // Geometry + guint fan_projections; // Number of fan projections = samples_per_rotation + guint fan_bins; // Number of fan detectors = n_modules * channels_per_module + guint parallel_projections; + guint parallel_bins; +// guint detector_diameter; + + // Optics + + + // Network Server / Reader + gchar *protocol; // Protocols: tcp, udp, tcp6, udp6, ... + guint port; // First port + guint n_streams; // Number of independent data streams (expected on sequential ports), by default equal to number of ROOF modules + guint header_size; // Expected size of the packet header, for dgram protocols we need at least 32-bit sequence number. Defaults to uint32_t for udp* and 0 - otherwise + guint payload_size; // Expected size of TCP/UDP packet (without header) + guint dataset_size; // Size of a single dataset (image, sinogram, etc.). This is real size in bytes, excluding all technical headers used in communication protocol. Normally, it is computed based on ROOF hardware parameters. + + // Performance parameters + guint max_packets; // limits maximum number of packets which are read at once + guint max_packet_size; // payload_size + header_size + ... (we don't care if tail is variable length provided that the complete packet does not exceed max_packet_size bytes) + guint buffer_size; // How many datasets we can buffer. There is no sense to have more than 2 for odered protocols (default), but having larger number could help for UDP if significant order disturbances are expected + guint drop_buffers; // If we are slow and lost some buffers, we may drop more than minimally necessary to catch up. + guint latency_buffers; // We skip incomplete buffers if later (at least latency_buffer in future) dataset is already ready, 0 - never skip + guint network_timeout; // Maximum time (us) to wait for data on the socket + guint sockets_per_thread; // Number of sockets per thread. Optimally number of therads should not exceed number of CPU cores + + + +/* + guint planes_per_module; +*/ + + + + +} RoofConfig; + +typedef enum { + ROOF_CONFIG_DEFAULT = 0, + ROOF_CONFIG_SIMULATION = 1 +} RoofConfigFlags; + + +RoofConfig *roof_config_new(const char *config, RoofConfigFlags flags, GError **error); +void roof_config_free(RoofConfig *cfg); + +#endif /* __ROOF_CONFIG_H */ diff --git a/src/roof-error.h b/src/roof-error.h new file mode 100644 index 0000000..418645c --- /dev/null +++ b/src/roof-error.h @@ -0,0 +1,61 @@ +#ifndef __ROOF_ERROR_H +#define __ROOF_ERROR_H + +#include + + +#define roof_print_error(error) do { \ + g_warning("%s", error->message); \ + g_error_free(error); \ + error = NULL; \ + } while (0) + +#define roof_set_error(error, type, ...) do { \ + if (error) g_set_error(error, UFO_TASK_ERROR, UFO_TASK_ERROR_##type, __VA_ARGS__); \ + } while (0) + +#define roof_error(error, type, ...) do { \ + if (error) g_set_error(error, UFO_TASK_ERROR, UFO_TASK_ERROR_##type, __VA_ARGS__); \ + return; \ + } while (0) + +#define roof_propagate_error(error, err, ...) do { \ + g_propagate_prefixed_error(error, err, __VA_ARGS__); \ + return; \ + } while (0) + +#define roof_error_with_retval(error, retval, type, ...) do { \ + if (error) g_set_error(error, UFO_TASK_ERROR, UFO_TASK_ERROR_##type, __VA_ARGS__); \ + return retval; \ + } while (0) + +#define roof_propagate_error_with_retval(error, retval, err, ...) do { \ + g_propagate_prefixed_error(error, err, __VA_ARGS__); \ + return retval; \ + } while (0) + + +#define roof_setup_error(error, ...) \ + roof_error(error, SETUP, __VA_ARGS__) + +#define roof_setup_error_with_retval(error, ...) \ + roof_error_with_retval(error, retval, SETUP, __VA_ARGS__) + +#define roof_new_error(error, ...) \ + roof_error_with_retval(error, NULL, SETUP, __VA_ARGS__) + + +#define roof_network_error(error, ...) \ + roof_error(error, SETUP, __VA_ARGS__) + +#define root_set_network_error(error, ...) \ + roof_set_error(error, SETUP, __VA_ARGS__) + +#define roof_network_error_with_retval(error, retval, ...) \ + roof_error_with_retval(error, retval, SETUP, __VA_ARGS__) + +#define roof_memory_error(error, ...) \ + roof_error(error, SETUP, __VA_ARGS__) + + +#endif /* __ROOF_ERROR_H */ diff --git a/src/roof-read-file.c b/src/roof-read-file.c new file mode 100644 index 0000000..da8d51c --- /dev/null +++ b/src/roof-read-file.c @@ -0,0 +1,98 @@ +#include +#include +#include +#include + +#include "glib.h" + +#include "ufo-roof.h" +#include "ufo-roof-read-file.h" + +typedef struct { + RoofReadInterface iface; + + RoofConfig *cfg; + + gchar *fname; + FILE *fd; + uint8_t *buf; +} RoofReadFile; + +static void roof_read_file_free(RoofReadInterface *iface) { + RoofReadFile *reader = (RoofReadFile*)iface; + + if (reader) { + if (reader->fname) + g_free(reader->fname); + + if (reader->fd) + fclose(reader->fd); + + if (reader->buf) + free(reader->buf); + free(reader); + } +} + +static guint roof_read_file(RoofReadInterface *iface, uint8_t **buffers, GError **error) { + RoofReadFile *reader = (RoofReadFile*)iface; + RoofConfig *cfg = reader->cfg; + + assert(iface); + assert(buffers); + + size_t bytes = 0; + size_t packet_size = cfg->header_size + cfg->payload_size; + size_t expected = cfg->max_packets * packet_size; + + while ((!feof(reader->fd))&&(!ferror(reader->fd))&&(bytes < expected)) { + size_t ret = fread(reader->buf + bytes, 1, expected - bytes, reader->fd); + bytes += ret; + } + + guint packets = bytes / packet_size; + + if (ferror(reader->fd)) { + roof_network_error_with_retval(error, 0, "read failed, error %i", ferror(reader->fd)); + } else if ((feof(reader->fd))&&(bytes % packet_size)) { + roof_network_error_with_retval(error, packets, "extra data in the end of input"); + } + + *buffers = reader->buf; + return packets; +} + + +RoofReadInterface *roof_read_file_new(RoofConfig *cfg, const char *path, guint file_id, GError **error) { + RoofReadFile *reader = (RoofReadFile*)calloc(1, sizeof(RoofReadFile)); + if (!reader) roof_new_error(error, "Can't allocate RoofReadFile"); + + // FIXME: Shall we jump if max_packet_size > header+payload (or will be extra data included in the data files)? Report error for now. + if ((cfg->header_size + cfg->payload_size) != cfg->max_packet_size) + roof_new_error(error, "packet_size (%u) should be equal to max_packet_size (%u) if RoofReadFile is used", cfg->header_size + cfg->payload_size, cfg->max_packet_size); + + reader->cfg = cfg; + reader->iface.close = roof_read_file_free; + reader->iface.read =roof_read_file; + + reader->fname = g_strdup_printf(path, file_id); + if (!reader->fname) { + free(reader); + roof_new_error(error, "Can't build file name"); + } + + reader->fd = fopen(reader->fname, "rb"); + if (!reader->fd) { + g_free(reader->fname); + g_free(reader); + roof_new_error(error, "Can't open file %i at path %s", file_id, path); + } + + reader->buf = (uint8_t*)malloc(cfg->max_packets * (cfg->header_size + cfg->payload_size)); + if (!reader->buf) { + roof_read_file_free((RoofReadInterface*)reader); + roof_new_error(error, "Can't allocate file buffer"); + } + + return (RoofReadInterface*)reader; +} diff --git a/src/roof-read-file.h b/src/roof-read-file.h new file mode 100644 index 0000000..60f60ba --- /dev/null +++ b/src/roof-read-file.h @@ -0,0 +1,8 @@ +#ifndef __ROOF_READ_FILE_H +#define __ROOF_READ_FILE_H + +#include "ufo-roof-read.h" + +RoofReadInterface *roof_read_file_new(RoofConfig *cfg, const char *path, guint file_id, GError **error); + +#endif diff --git a/src/roof-read-socket.c b/src/roof-read-socket.c new file mode 100644 index 0000000..1b98b44 --- /dev/null +++ b/src/roof-read-socket.c @@ -0,0 +1,159 @@ +#define _GNU_SOURCE + +#include +#include +#include +#include +#include +#include +#include + +#include "glib.h" + +#include "ufo-roof.h" +#include "ufo-roof-read-socket.h" + +typedef struct { + RoofReadInterface iface; + + RoofConfig *cfg; + + int socket; + struct mmsghdr *msg; + struct iovec *msgvec; + uint8_t *buf; + + int libvma; +} RoofReadSocket; + +static void roof_read_socket_free(RoofReadInterface *iface) { + RoofReadSocket *reader = (RoofReadSocket*)iface; + + if (reader) { + if (reader->socket >= 0) + close(reader->socket); + if (reader->msgvec) + free(reader->msgvec); + if (reader->msg) + free(reader->msg); + if (reader->buf) + free(reader->buf); + free(reader); + } +} + +static guint roof_read_socket(RoofReadInterface *iface, uint8_t **buf, GError **error) { + int packets; + struct timespec timeout_ts; + + assert(iface); + assert(buf); + + RoofReadSocket *reader = (RoofReadSocket*)iface; + RoofConfig *cfg = reader->cfg; + + timeout_ts.tv_sec = cfg->network_timeout / 1000000; + timeout_ts.tv_nsec = 1000 * (cfg->network_timeout % 1000000); + + +//retry: + // Timeout seems broken, see BUGS in 'recvmmsg' bugs page + packets = recvmmsg(reader->socket, reader->msg, reader->cfg->max_packets, MSG_WAITFORONE, &timeout_ts); + if (packets < 0) roof_network_error_with_retval(error, 0, "recvmmsg failed, error %i", errno); + +/* + // FIXME: Shall we verify packets consistency here? We can check at least the sizes... + if ((packets == 1)&&(msg[0].msg_len < 16)) { + goto retry; + } +*/ + + *buf = reader->buf; + return (guint)packets; +} + +RoofReadInterface *roof_read_socket_new(RoofConfig *cfg, guint id, GError **error) { + int err; + int port = cfg->port + id; + char port_str[16]; + const char *addr_str = "192.168.100.8"; + struct addrinfo sockaddr_hints; + struct addrinfo *sockaddr_info; + + RoofReadSocket *reader = (RoofReadSocket*)calloc(1, sizeof(RoofReadSocket)); + if (!reader) roof_new_error(error, "Can't allocate RoofReadSocket"); + + reader->cfg = cfg; + reader->iface.close = roof_read_socket_free; + reader->iface.read =roof_read_socket; + + snprintf(port_str, sizeof(port_str), "%d", port); + port_str[sizeof(port_str) / sizeof(port_str[0]) - 1] = '\0'; + + memset(&sockaddr_hints, 0, sizeof(sockaddr_hints)); + if (!strncmp(cfg->protocol, "udp", 3)) { + sockaddr_hints.ai_family = AF_UNSPEC; + sockaddr_hints.ai_socktype = SOCK_DGRAM; + sockaddr_hints.ai_protocol = IPPROTO_UDP; + } else if (!strncmp(cfg->protocol, "tcp", 3)) { + sockaddr_hints.ai_family = AF_UNSPEC; + sockaddr_hints.ai_socktype = SOCK_STREAM; + sockaddr_hints.ai_protocol = IPPROTO_TCP; + } else { + roof_new_error(error, "Unsupported protocol (%s)", cfg->protocol); + } + + err = getaddrinfo(addr_str, port_str, &sockaddr_hints, &sockaddr_info); + if (err || !sockaddr_info) { + free(reader); + roof_new_error(error, "Invalid address (%s) or port (%s)", addr_str, port_str); + } + + reader->socket = socket(sockaddr_info->ai_family, sockaddr_info->ai_socktype | SOCK_CLOEXEC, sockaddr_info->ai_protocol); + if(reader->socket == -1) { + freeaddrinfo(sockaddr_info); + free(reader); + roof_new_error(error, "Can't create socket (%s) for address (%s) on port (%s)", cfg->protocol, addr_str, port_str); + } + + err = bind(reader->socket, sockaddr_info->ai_addr, sockaddr_info->ai_addrlen); + if(err != 0) { + freeaddrinfo(sockaddr_info); + close(reader->socket); + free(reader); + roof_new_error(error, "Error (%i) binding socket (%s) for address (%s) on port (%s)", err, cfg->protocol, addr_str, port_str); + } + +/* + // Check that FPGA module is ready + char msg[4]; + addr_str = "192.168.34.83"; + getaddrinfo(addr_str, port_str, &sockaddr_hints, &sockaddr_info); + sendto(reader->socket, msg, sizeof(msg), 0, sockaddr_info->ai_addr, sockaddr_info->ai_addrlen); +*/ + freeaddrinfo(sockaddr_info); + + if (reader->libvma) { + } else { + reader->buf = (uint8_t*)malloc(cfg->max_packets * cfg->max_packet_size); + reader->msg = (struct mmsghdr*)malloc(cfg->max_packets * sizeof(struct mmsghdr)); + reader->msgvec = (struct iovec*)malloc(cfg->max_packets * sizeof(struct iovec)); + + if ((!reader->buf)||(!reader->msg)||(!reader->msgvec)) { + roof_read_socket_free((RoofReadInterface*)reader); + roof_new_error(error, "Can't allocate socket buffer"); + } + + memset(reader->msg, 0, cfg->max_packets * sizeof(struct mmsghdr)); + memset(reader->msgvec, 0, cfg->max_packets * sizeof(struct iovec)); + for (guint i = 0; i < cfg->max_packets; i++) { + reader->msgvec[i].iov_base = reader->buf + i * cfg->max_packet_size; + reader->msgvec[i].iov_len = cfg->max_packet_size; + reader->msg[i].msg_hdr.msg_iov = &reader->msgvec[i]; + reader->msg[i].msg_hdr.msg_iovlen = 1; + } + } + + + return (RoofReadInterface*)reader; +} diff --git a/src/roof-read-socket.h b/src/roof-read-socket.h new file mode 100644 index 0000000..34a98e8 --- /dev/null +++ b/src/roof-read-socket.h @@ -0,0 +1,8 @@ +#ifndef __ROOF_READ_SOCKET_H +#define __ROOF_READ_SOCKET_H + +#include "ufo-roof-read.h" + +RoofReadInterface *roof_read_socket_new(RoofConfig *cfg, guint id, GError **error); + +#endif diff --git a/src/roof-read.c b/src/roof-read.c new file mode 100644 index 0000000..18a7508 --- /dev/null +++ b/src/roof-read.c @@ -0,0 +1,107 @@ +#include + +#include "roof.h" +#include "roof-read.h" + +RoofReadContext *roof_read_context_new(RoofConfig *cfg, RoofReadInterface *rdi, GError **error) { + guint i; + GError *gerr = NULL; + + RoofReadContext *rdc = (RoofReadContext*)calloc(1, sizeof(RoofReadContext)); + if (!rdc) roof_new_error(error, "Can't allocate RoofReadContext"); + + rdc->cfg = cfg; + rdc->rdi = rdi; + + return rdc; +} + +void roof_read_context_free(RoofReadContext *rdc) { + if (rdc) { + free(rdc); + } +} + +void roof_read(RoofReadContext *rdc, RoofBufferInterface *bfi, GError *error) { + uint8_t *rdbuf = rdc->rdbuf; + guint n_packets = rdc->n_packets; + guint packet_id = rdc->packet_id; + guint fragment_id = rdc->fragment_id; + GError *gerr = NULL; + + ReadRoofInterface *rdi = rdc->rdi; + + if (rdc->rdbuf) { + rdbuf = rdc->rdbuf; + packets + else { + guint packets = rdi->read(rdi[stream_id], &rdbuf, &gerr); + if (gerr) roof_propagate_error(error, gerr, "roof_read: "); + packet_id = 0; + fragment_id = 0; + } + + packet = rdbuf + packet_id * (size + padding + + while (packet) { + + + while (rdbuf) { + // + + for (guint i = 0; i < packets; i++) { + guint64 packet_id = 0; + + // Otherwise considered consecutive and handled by the buffer + if (cfg->header_size >= sizeof(RoofPacketHeader)) { + RoofPacketHeader *pheader = ROOF_PACKET_HEADER(fragment); + packet_id = be64toh(pheader->packet_id) + 1; + } else { + // FIXME: consider consecutive + //fragment_id = ++buffer->stream_fragment[stream_id]; + } + + // FIXME: packet may contain fragments for multiple datasets + fragment_id = packet_id * fragments_per_packet; + guint64 dataset_id = (packet_id - 1) / cfg->fragments_per_stream; + guint64 fragment_id = (packet_id - 1) % cfg->fragments_per_stream; + + // FIXME: verify that packet is consecutive + // if + + // Drop packets of already skipped datasets + if (dataset_id < priv->current_id) + continue; + + // FIXME: stop processing and return.... + if (dataset_id < priv->current_id) + + uint8_t *fragment_buffer = dataset_buffer + + stream_id * fragment_dims[0] + // x-coordinate + (fragment_id * fragment_dims[1]) * dataset_dims[0]; // y-coordinate + + for (guint i = 0; i < buffer->fragment_dims[1]; ++i) { + memcpy(fragment_buffer + i * buffer->dataset_dims[0], (uint8_t*)fragment + i * buffer->fragment_dims[0], buffer->fragment_dims[0]); + + +// buffer->last_id[stream_id] = dataset_id; + + ready |= roof_buffer_set_fragment(buf, sid, packet_id, fragment, &gerr); + if (gerr) roof_print_error(gerr); + + fragment += cfg->max_packet_size; + + + } + + + } + + + + + void *buf = roof_buffer_get_dataset(bfi, ?); + + // Computing offsets each time, etc. + roof_buffer_set_fragment +} \ No newline at end of file diff --git a/src/roof-read.h b/src/roof-read.h new file mode 100644 index 0000000..1ab846a --- /dev/null +++ b/src/roof-read.h @@ -0,0 +1,45 @@ +#ifndef __ROOF_READ_H +#define __ROOF_READ_H + +typedef struct _RoofRead RoofRead; + +#include "roof-config.h" + +G_BEGIN_DECLS + +typedef struct _RoofReadContext RoofReadContext; +typedef struct _RoofReadInterface RoofReadInterface; +typedef struct _RoofReadInterfaceSettings RoofReadInterfaceSettings; + +typedef guint (*RoofReaderRead)(RoofReadInterface *reader, uint8_t **buf, GError **error); +typedef void (*RoofReaderClose)(RoofReadInterface *reader); + +struct _RoofReadInterfaceSettings { + guint padding; // Packet size + padding +}; + +struct _RoofReadInterface { + RoofReaderRead read; + RoofReaderClose close; + + RoofReadInterfaceSettings settings; +}; + +struct _RoofReadContext { + RoofConfig *cfg; + RoofReadInterface *rdi; + + void *rdbuf; // The buffer we are currently processing + guint n_packets; // Number of packets in the buffer + guint packet_id; // Last processed packet in the buffer + guint fragment_id; // Last processed fragment in the packet +}; + +RoofReadContext *roof_read_context_new(RoofConfig *cfg, RoofReadInterface *rdi, GError **error); +void roof_read_context_free(RoofReadContext *ctx); +const RoofReadInterfaceSettings *roof_read_get_settings(UFORoofRead *ctx, GError **error); + + +G_END_DECLS + +#endif /* __ROOF_READ_H */ diff --git a/src/roof-thread.c b/src/roof-thread.c new file mode 100644 index 0000000..570300f --- /dev/null +++ b/src/roof-thread.c @@ -0,0 +1,95 @@ +#include + +#include "roof.h" +#include "roof-thread.h" + + +RoofThreadContext *roof_thread_context_new(RoofConfig *cfg, Roof *roof, guint from, guint to, GError **error) { + GError *gerr = NULL; + + RoofThreadContext *rdt = (RoofThreadContext*)calloc(1, sizeof(RoofThreadContext)); + if (!rdt) roof_new_error(error, "Can't allocate RoofThreadContext"); + + rdt->cfg = cfg; + rdt->rdi = rdi; + + return rdt; + +} + +void roof_thread_context_free(RoofThreadContext *rdt) { + if (rdt) { + free(rdt); + } +} + + +int roof_thread_read_socket(Roof * + +int roof_thread_read(HWThread thr, void *hwctx, int id, void *data) { + Roof *ctx = (Roof*)data; + + RoofConfig *cfg = ctx->cfg; + RoofThreadContext *rdt = ctx->rdt[id]; + + guint dataset_dims[2] = { cfg->fan_bins * cfg->bit_depth / 8, cfg->fan_projections }; + guint fragment_dims[2] = { cfg->channels_per_module * cfg->bit_depth / 8, ????cfg->payload_size / buffer->fragment_dims[0] }; + + packet_id + fragment_id = + + for (guint stream_id = from; stream_id < to; stream_id++) { + + uint8_t *rdbuf; + guint packets = priv->rdi[stream_id]->read(priv->rdi[stream_id], &rdbuf, &gerr); + if (gerr) roof_print_error(gerr); + + for (guint i = 0; i < packets; i++) { + guint64 packet_id = 0; + + // Otherwise considered consecutive and handled by the buffer + if (cfg->header_size >= sizeof(RoofPacketHeader)) { + RoofPacketHeader *pheader = ROOF_PACKET_HEADER(fragment); + packet_id = be64toh(pheader->packet_id) + 1; + } else { + // FIXME: consider consecutive + //fragment_id = ++buffer->stream_fragment[stream_id]; + } + + // FIXME: packet may contain fragments for multiple datasets + fragment_id = packet_id * fragments_per_packet; + guint64 dataset_id = (packet_id - 1) / cfg->fragments_per_stream; + guint64 fragment_id = (packet_id - 1) % cfg->fragments_per_stream; + + // FIXME: verify that packet is consecutive + // if + + // Drop packets of already skipped datasets + if (dataset_id < priv->current_id) + continue; + + // FIXME: stop processing and return.... + if (dataset_id < priv->current_id) + + uint8_t *fragment_buffer = dataset_buffer + + stream_id * fragment_dims[0] + // x-coordinate + (fragment_id * fragment_dims[1]) * dataset_dims[0]; // y-coordinate + + for (guint i = 0; i < buffer->fragment_dims[1]; ++i) { + memcpy(fragment_buffer + i * buffer->dataset_dims[0], (uint8_t*)fragment + i * buffer->fragment_dims[0], buffer->fragment_dims[0]); + + +// buffer->last_id[stream_id] = dataset_id; + + ready |= roof_buffer_set_fragment(buf, sid, packet_id, fragment, &gerr); + if (gerr) roof_print_error(gerr); + + fragment += cfg->max_packet_size; + + + } + + } + +} + diff --git a/src/roof-thread.h b/src/roof-thread.h new file mode 100644 index 0000000..a180b29 --- /dev/null +++ b/src/roof-thread.h @@ -0,0 +1,30 @@ +#ifndef __ROOF_THREAD_H +#define __ROOF_THREAD_H + +typedef struct _RoofThreadContext RoofThreadContext; + +#include "ufo-roof-read.h" + +struct _RoofThreadContext { + RoofConfig *cfg; + RoofBuffer *buf; + RoofReadInterface *rdi; + guint from, to; // Determines ports/files which are read by this thread (from is inclusive and to - exclusive) +}; + +/* +RoofReadThread *guint roof_read_thread_new(RoofRead *rd, guint from, guint to, GError **error); +void roof_read_thread_free(UFORoofReadThread *thr, GError **error); +gboolean roof_read_thread_start(UFORoofReadThread *thr, GError **error); +gboolean roof_read_thread_stop(UFORoofReadThread *thr, GError **error); +*/ + +int roof_thread_read(HWThread thr, void *hwctx, int id, void *data); + +RoofThreadContext *roof_thread_context_new(RoofConfig *cfg, RoofReadContext *rdc, guint from, guint to, GError **error); +void roof_thread_context_free(RoofThreadContext *ctx); + + + +#endif /* __ROOF_THREAD_H */ + diff --git a/src/roof.c b/src/roof.c new file mode 100644 index 0000000..505e3aa --- /dev/null +++ b/src/roof.c @@ -0,0 +1,133 @@ +void roof_init() { + hw_sched_init(); +} + +Roof *roof_new(UfoRoofConfig *cfg, GError **error) { + guint i; + GError *gerr = NULL; + + Roof *ctx = (Roof*)calloc(1, sizeof(Roof)); + if (!ctx) roof_new_error(error, "Can't allocate Roof context"); + + ctx->cfg = cfg; + + ctx->n_threads = cfg->n_streams / cfg->sockets_per_thread; + if (cfg->n_streams % cfg->sockets_per_thread) ctx->n_threads++; + + ctx->rdi = (RoofReadInterface**)calloc(cfg->n_streams, sizeof(RoofReadInterface*)); + ctx->rdc = (RoofReadContext**)calloc(cfg->n_streams, sizeof(RoofReadContext*)); + ctx->rdt = (RoofThreadContext**)calloc(ctx->n_threads, sizeof(RoofThreadContext*)); + ctx->sched = hw_sched_create(cfg->n_threads); + + if ((!ctx->rdi)||(!ctx->rdc)||(!ctx->rdt)||(!ctx->sched)) { + roof_free(ctx); + roof_setup_error(error, "Failed to allocate memory for various Roof contexts"); + } + + return ctx; +} + +void roof_configure_simulation(Roof *ctx, const gchar *path, guint first_file_number, GError **error) { + assert(ctx); + + ctx->simulate = 1; + ctx->path = path; + ctx->first_file_number = first_file_number; +} + +void roof_configure_stop_mode(Roof *ctx, const gulong max, GError **error) { + assert(ctx); + + ctx->max_datasets = max; +} + + +void roof_setup(Roof *ctx, GError **error) { + guint i; + GError *gerr = NULL; + + assert(ctx); + + RoofConfig *cfg = ctx->cfg; + +/* + ctx->buf = roof_buffer_new(cfg, 2, ctx->max_datasets, &gerr); + if ((gerr)||(!ctx->buf)) + roof_propagate_error(error, gerr, "roof_buffer_new: "); +*/ + + for (i = 0; i < cfg->n_streams; i++) { + if (ctx->simulate) { + if (!ctx->path) + roof_setup_error(error, "Path to simulated data should be specified"); + + ctx->rdi[i] = roof_read_file_new(cfg, ctx->path, ctx->first_file_number + i, &gerr); + } else { + ctx->rdi[i] = roof_read_socket_new(cfg, i, &gerr); + } + + if (!ctx->rdi[i]) + roof_propagate_error(error, gerr, "roof_read_interface_new: "); + + ctx->rdc[i] = roof_read_context_new(cfg, ctx->rdi[i], &gerr); + if (!ctx->rdc[i]) + roof_propagate_error(error, gerr, "roof_read_context_new: "); + } + + // We try to distribute sockets uniformly respecting sockets_per_thread as maximum limit + guint extra = 0, sockets_per_thread = cfg->n_streams / ctx->n_threads; + if (cfg->n_streams % ctx->n_threads) extra = cfg->n_streams - ctx->n_threads * sockets_per_thread; + + guint from, to; + for (i = 0; i < ctx->n_threads; i++) { + guint to = from + sockets_per_thread; + if (i < extra) to++; + + ctx->thr[i]= roof_thread_new(cfg, ctx, from, to, &gerr); + if (!ctx->thr[i]) roof_propagate_error(error, gerr, "roof_thread_new (%i): ", i); + } +} + + +void roof_free(Roof *ctx) { + guint i; + + if (ctx) { + RoofConfig *cfg = ctx->cfg; + if (ctx->sched) hw_sched_destroy(priv->sched); + + if (ctx->rdt) { + for (i = 0; i < ctx->n_threads; i++) + if (ctx->rdt[i]) roof_thread_context_free(ctx->rdt[i]); + free(ctx->rdt); + } + + if (ctx->rdc) { + for (i = 0; i < cfg->n_streams; i++) + if (ctx->rdc[i]) roof_read_context_free(ctx->rdc[i]); + free(ctx->rdc); + } + + if (ctx->rdi) { + for (i = 0; i < cfg->n_streams; i++) + if (ctx->rdi[i]) roof_read_interface_free(ctx->rdi[i]); + free(ctx->rdi); + } + + if (ctx->buf) roof_buffer_free(ctx->buf); + free(ctx); + } +} + + +void roof_read_dataset(Roof *ctx, void *buffer, GError **error) { + priv->current_dataset; + priv->current_buffer = buffer; + + err = hw_sched_schedule_thread_task(sched, (void*)ctx, roof_thread_read); + if (!err) err = hw_sched_wait_task(sched); + if (err) { fprintf(stderr, "Error %i scheduling init threads", err); exit(-1); } +} + + +} diff --git a/src/roof.h b/src/roof.h new file mode 100644 index 0000000..316e8ed --- /dev/null +++ b/src/roof.h @@ -0,0 +1,46 @@ +#ifndef __ROOF_H +#define __ROOF_H + +typedef struct _Roof Roof; + +#include "roof-config.h" +#include "roof-buffer.h" +#include "roof-read.h" +#include "roof-thread.h" + +struct _Roof { + RoofConfig *cfg; // Parsed ROOF parameters + RoofBuffer *buf; // Ring buffer for incomming UDP packet + RoofReadInterface **rdi; // Reader interface abstraction, one per socket (no threading) + RoofReadContext **rdc; // Reader context: common structures, one per socket (no threading) + RoofThread **rdt; // Threading context: multiple reader contexts per thread + + guint n_threads; // Number of schedulled threads + HWSched sched; // OpenMP-style thread scheduler + + gboolean simulate; // Indicates if we are running in network or simulation modes + gchar *path; // UFO file path for simulation mode + guint first_file_number; // Number of a first simulated file (0 or 1) + + guint max_datasets; // Number of datasets to read + +// guint64 announced; // For debugging +// guint64 generated; // Total number for control + +// struct timespec last_fragment_timestamp; + +}; + + +Roof *roof_new(RoofConfig *cfg, GError **error); +void roof_free(Roof *ctx); + +void roof_configure_simulation(Roof *ctx, const gchar *path, guint first_file_number, GError **error); +void roof_configure_stop_mode(Roof *ctx, const gulong max, GError **error); +//void roof_configure_writer(Roof *ctx, ...); +//void roof_configure_filter(Roof *ctx, ...); +void roof_setup(Roof *ctx, GError **error); + +void roof_read(Roof *ctx, void *buffer, GError **error); + +#endif \ No newline at end of file diff --git a/src/save/memcpy.c b/src/save/memcpy.c new file mode 100644 index 0000000..5c29d01 --- /dev/null +++ b/src/save/memcpy.c @@ -0,0 +1,344 @@ +/******************************************************************** + ** File: memcpy.c + ** + ** Copyright (C) 1999-2010 Daniel Vik + ** + ** This software is provided 'as-is', without any express or implied + ** warranty. In no event will the authors be held liable for any + ** damages arising from the use of this software. + ** Permission is granted to anyone to use this software for any + ** purpose, including commercial applications, and to alter it and + ** redistribute it freely, subject to the following restrictions: + ** + ** 1. The origin of this software must not be misrepresented; you + ** must not claim that you wrote the original software. If you + ** use this software in a product, an acknowledgment in the + ** use this software in a product, an acknowledgment in the + ** product documentation would be appreciated but is not + ** required. + ** + ** 2. Altered source versions must be plainly marked as such, and + ** must not be misrepresented as being the original software. + ** + ** 3. This notice may not be removed or altered from any source + ** distribution. + ** + ** + ** Description: Implementation of the standard library function memcpy. + ** This implementation of memcpy() is ANSI-C89 compatible. + ** + ** The following configuration options can be set: + ** + ** LITTLE_ENDIAN - Uses processor with little endian + ** addressing. Default is big endian. + ** + ** PRE_INC_PTRS - Use pre increment of pointers. + ** Default is post increment of + ** pointers. + ** + ** INDEXED_COPY - Copying data using array indexing. + ** Using this option, disables the + ** PRE_INC_PTRS option. + ** + ** MEMCPY_64BIT - Compiles memcpy for 64 bit + ** architectures + ** + ** + ** Best Settings: + ** + ** Intel x86: LITTLE_ENDIAN and INDEXED_COPY + ** + *******************************************************************/ + + + +/******************************************************************** + ** Configuration definitions. + *******************************************************************/ + +#define LITTLE_ENDIAN +#define INDEXED_COPY + + +/******************************************************************** + ** Includes for size_t definition + *******************************************************************/ + +#include + + +/******************************************************************** + ** Typedefs + *******************************************************************/ + +typedef unsigned char UInt8; +typedef unsigned short UInt16; +typedef unsigned int UInt32; +#ifdef _WIN32 +typedef unsigned __int64 UInt64; +#else +typedef unsigned long long UInt64; +#endif + +#ifdef MEMCPY_64BIT +typedef UInt64 UIntN; +#define TYPE_WIDTH 8L +#else +typedef UInt32 UIntN; +#define TYPE_WIDTH 4L +#endif + + +/******************************************************************** + ** Remove definitions when INDEXED_COPY is defined. + *******************************************************************/ + +#if defined (INDEXED_COPY) +#if defined (PRE_INC_PTRS) +#undef PRE_INC_PTRS +#endif /*PRE_INC_PTRS*/ +#endif /*INDEXED_COPY*/ + + + +/******************************************************************** + ** Definitions for pre and post increment of pointers. + *******************************************************************/ + +#if defined (PRE_INC_PTRS) + +#define START_VAL(x) (x)-- +#define INC_VAL(x) *++(x) +#define CAST_TO_U8(p, o) ((UInt8*)p + o + TYPE_WIDTH) +#define WHILE_DEST_BREAK (TYPE_WIDTH - 1) +#define PRE_LOOP_ADJUST - (TYPE_WIDTH - 1) +#define PRE_SWITCH_ADJUST + 1 + +#else /*PRE_INC_PTRS*/ + +#define START_VAL(x) +#define INC_VAL(x) *(x)++ +#define CAST_TO_U8(p, o) ((UInt8*)p + o) +#define WHILE_DEST_BREAK 0 +#define PRE_LOOP_ADJUST +#define PRE_SWITCH_ADJUST + +#endif /*PRE_INC_PTRS*/ + + + +/******************************************************************** + ** Definitions for endians + *******************************************************************/ + +#if defined (LITTLE_ENDIAN) + +#define SHL >> +#define SHR << + +#else /* LITTLE_ENDIAN */ + +#define SHL << +#define SHR >> + +#endif /* LITTLE_ENDIAN */ + + + +/******************************************************************** + ** Macros for copying words of different alignment. + ** Uses incremening pointers. + *******************************************************************/ + +#define CP_INCR() { \ + INC_VAL(dstN) = INC_VAL(srcN); \ +} + +#define CP_INCR_SH(shl, shr) { \ + dstWord = srcWord SHL shl; \ + srcWord = INC_VAL(srcN); \ + dstWord |= srcWord SHR shr; \ + INC_VAL(dstN) = dstWord; \ +} + + + +/******************************************************************** + ** Macros for copying words of different alignment. + ** Uses array indexes. + *******************************************************************/ + +#define CP_INDEX(idx) { \ + dstN[idx] = srcN[idx]; \ +} + +#define CP_INDEX_SH(x, shl, shr) { \ + dstWord = srcWord SHL shl; \ + srcWord = srcN[x]; \ + dstWord |= srcWord SHR shr; \ + dstN[x] = dstWord; \ +} + + + +/******************************************************************** + ** Macros for copying words of different alignment. + ** Uses incremening pointers or array indexes depending on + ** configuration. + *******************************************************************/ + +#if defined (INDEXED_COPY) + +#define CP(idx) CP_INDEX(idx) +#define CP_SH(idx, shl, shr) CP_INDEX_SH(idx, shl, shr) + +#define INC_INDEX(p, o) ((p) += (o)) + +#else /* INDEXED_COPY */ + +#define CP(idx) CP_INCR() +#define CP_SH(idx, shl, shr) CP_INCR_SH(shl, shr) + +#define INC_INDEX(p, o) + +#endif /* INDEXED_COPY */ + + +#define COPY_REMAINING(count) { \ + START_VAL(dst8); \ + START_VAL(src8); \ + \ + switch (count) { \ + case 7: INC_VAL(dst8) = INC_VAL(src8); \ + case 6: INC_VAL(dst8) = INC_VAL(src8); \ + case 5: INC_VAL(dst8) = INC_VAL(src8); \ + case 4: INC_VAL(dst8) = INC_VAL(src8); \ + case 3: INC_VAL(dst8) = INC_VAL(src8); \ + case 2: INC_VAL(dst8) = INC_VAL(src8); \ + case 1: INC_VAL(dst8) = INC_VAL(src8); \ + case 0: \ + default: break; \ + } \ +} + +#define COPY_NO_SHIFT() { \ + UIntN* dstN = (UIntN*)(dst8 PRE_LOOP_ADJUST); \ + UIntN* srcN = (UIntN*)(src8 PRE_LOOP_ADJUST); \ + size_t length = count / TYPE_WIDTH; \ + \ + while (length & 7) { \ + CP_INCR(); \ + length--; \ + } \ + \ + length /= 8; \ + \ + while (length--) { \ + CP(0); \ + CP(1); \ + CP(2); \ + CP(3); \ + CP(4); \ + CP(5); \ + CP(6); \ + CP(7); \ + \ + INC_INDEX(dstN, 8); \ + INC_INDEX(srcN, 8); \ + } \ + \ + src8 = CAST_TO_U8(srcN, 0); \ + dst8 = CAST_TO_U8(dstN, 0); \ + \ + COPY_REMAINING(count & (TYPE_WIDTH - 1)); \ + \ + return dest; \ +} + + + +#define COPY_SHIFT(shift) { \ + UIntN* dstN = (UIntN*)((((UIntN)dst8) PRE_LOOP_ADJUST) & \ + ~(TYPE_WIDTH - 1)); \ + UIntN* srcN = (UIntN*)((((UIntN)src8) PRE_LOOP_ADJUST) & \ + ~(TYPE_WIDTH - 1)); \ + size_t length = count / TYPE_WIDTH; \ + UIntN srcWord = INC_VAL(srcN); \ + UIntN dstWord; \ + \ + while (length & 7) { \ + CP_INCR_SH(8 * shift, 8 * (TYPE_WIDTH - shift)); \ + length--; \ + } \ + \ + length /= 8; \ + \ + while (length--) { \ + CP_SH(0, 8 * shift, 8 * (TYPE_WIDTH - shift)); \ + CP_SH(1, 8 * shift, 8 * (TYPE_WIDTH - shift)); \ + CP_SH(2, 8 * shift, 8 * (TYPE_WIDTH - shift)); \ + CP_SH(3, 8 * shift, 8 * (TYPE_WIDTH - shift)); \ + CP_SH(4, 8 * shift, 8 * (TYPE_WIDTH - shift)); \ + CP_SH(5, 8 * shift, 8 * (TYPE_WIDTH - shift)); \ + CP_SH(6, 8 * shift, 8 * (TYPE_WIDTH - shift)); \ + CP_SH(7, 8 * shift, 8 * (TYPE_WIDTH - shift)); \ + \ + INC_INDEX(dstN, 8); \ + INC_INDEX(srcN, 8); \ + } \ + \ + src8 = CAST_TO_U8(srcN, (shift - TYPE_WIDTH)); \ + dst8 = CAST_TO_U8(dstN, 0); \ + \ + COPY_REMAINING(count & (TYPE_WIDTH - 1)); \ + \ + return dest; \ +} + + +/******************************************************************** + ** + ** void *memcpy(void *dest, const void *src, size_t count) + ** + ** Args: dest - pointer to destination buffer + ** src - pointer to source buffer + ** count - number of bytes to copy + ** + ** Return: A pointer to destination buffer + ** + ** Purpose: Copies count bytes from src to dest. + ** No overlap check is performed. + ** + *******************************************************************/ + +void *fast_memcpy(void *dest, const void *src, size_t count) +{ + UInt8* dst8 = (UInt8*)dest; + UInt8* src8 = (UInt8*)src; + + if (count < 8) { + COPY_REMAINING(count); + return dest; + } + + START_VAL(dst8); + START_VAL(src8); + + while (((UIntN)dst8 & (TYPE_WIDTH - 1)) != WHILE_DEST_BREAK) { + INC_VAL(dst8) = INC_VAL(src8); + count--; + } + + switch ((((UIntN)src8) PRE_SWITCH_ADJUST) & (TYPE_WIDTH - 1)) { + case 0: COPY_NO_SHIFT(); break; + case 1: COPY_SHIFT(1); break; + case 2: COPY_SHIFT(2); break; + case 3: COPY_SHIFT(3); break; +#if TYPE_WIDTH > 4 + case 4: COPY_SHIFT(4); break; + case 5: COPY_SHIFT(5); break; + case 6: COPY_SHIFT(6); break; + case 7: COPY_SHIFT(7); break; +#endif + } +} diff --git a/src/save/memcpy.h b/src/save/memcpy.h new file mode 100644 index 0000000..0714823 --- /dev/null +++ b/src/save/memcpy.h @@ -0,0 +1,63 @@ +/******************************************************************** + ** File: memcpy.h + ** + ** Copyright (C) 2005 Daniel Vik + ** + ** This software is provided 'as-is', without any express or implied + ** warranty. In no event will the authors be held liable for any + ** damages arising from the use of this software. + ** Permission is granted to anyone to use this software for any + ** purpose, including commercial applications, and to alter it and + ** redistribute it freely, subject to the following restrictions: + ** + ** 1. The origin of this software must not be misrepresented; you + ** must not claim that you wrote the original software. If you + ** use this software in a product, an acknowledgment in the + ** use this software in a product, an acknowledgment in the + ** product documentation would be appreciated but is not + ** required. + ** + ** 2. Altered source versions must be plainly marked as such, and + ** must not be misrepresented as being the original software. + ** + ** 3. This notice may not be removed or altered from any source + ** distribution. + ** + ** + ** Description: Implementation of the standard library function memcpy. + ** This implementation of memcpy() is ANSI-C89 compatible. + ** + *******************************************************************/ + + +/******************************************************************** + ** Includes for size_t definition + *******************************************************************/ + +#include + + +/******************************************************************** + ** + ** void *memcpy(void *dest, const void *src, size_t count) + ** + ** Args: dest - pointer to destination buffer + ** src - pointer to source buffer + ** count - number of bytes to copy + ** + ** Return: A pointer to destination buffer + ** + ** Purpose: Copies count bytes from src to dest. No overlap check + ** is performed. + ** + *******************************************************************/ + +#ifdef __cplusplus +extern "C" { +#endif + +void *fast_memcpy(void *dest, const void *src, size_t count); + +#ifdef __cplusplus +} +#endif diff --git a/src/save/ufo-roof-buffer-build-task.c b/src/save/ufo-roof-buffer-build-task.c new file mode 100644 index 0000000..bdb7a7d --- /dev/null +++ b/src/save/ufo-roof-buffer-build-task.c @@ -0,0 +1,474 @@ +/* + * Copyright (C) 2011-2015 Karlsruhe Institute of Technology + * + * This file is part of Ufo. + * + * This library is free software: you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation, either + * version 3 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see . + */ + +#include +#include +#include + +#ifdef __APPLE__ +#include +#else +#include +#endif + +#include "ufo-roof.h" +#include "ufo-roof-buffer.h" +#include "ufo-roof-build-task.h" + +typedef enum { + BUILD_AUTO = 0, + BUILD_RAW, + BUILD_SINO, + BUILD_UFO +} BuildType; + +struct _UfoRoofBuildTaskPrivate { + gchar *config; // ROOF configuration file name + UfoRoofConfig *cfg; // Parsed ROOF parameters + UfoRoofBuffer *buf; // Ring buffer for incomming UDP packet + UfoRoofReadInterface; *rdi; // Reader interfaces, one per socket (no threading) + UfoRoofRead *rd; // Threading interface + + gchar *path; // UFO file path for simulation mode + guint first_file_number; // Number of a first simulated file (0 or 1) + + BuildType build; // What dataset do we build: ROOF sinogram or raw network data + guint number; // Number of datasets to read + gboolean stop; // Stop flag + gboolean simulate; // Indicates if we are running in network or simulation modes + + guint64 announced; // For debugging + guint64 generated; // Total number for control + + struct timespec last_fragment_timestamp; +}; + +static void ufo_task_interface_init (UfoTaskIface *iface); + +G_DEFINE_TYPE_WITH_CODE (UfoRoofBuildTask, ufo_roof_build_task, UFO_TYPE_TASK_NODE, + G_IMPLEMENT_INTERFACE (UFO_TYPE_TASK, + ufo_task_interface_init)) + +#define UFO_ROOF_BUILD_TASK_GET_PRIVATE(obj) (G_TYPE_INSTANCE_GET_PRIVATE((obj), UFO_TYPE_ROOF_BUILD_TASK, UfoRoofBuildTaskPrivate)) + + + +static GEnumValue build_values[] = { + { BUILD_AUTO, "BUILD_AUTO", "auto" }, + { BUILD_RAW, "BUILD_RAW", "raw" }, + { BUILD_SINO, "BUILD_SINO", "sino" }, + { BUILD_UFO, "BUILD_UFO", "ufo" }, + { 0, NULL, NULL} +}; + +enum { + PROP_0, + PROP_STOP, + PROP_SIMULATE, + PROP_PATH, + PROP_FIRST, + PROP_NUMBER, + PROP_BUILD, + PROP_CONFIG, + N_PROPERTIES +}; + +static GParamSpec *properties[N_PROPERTIES] = { NULL, }; + +UfoNode * +ufo_roof_build_task_new (void) +{ + return UFO_NODE (g_object_new (UFO_TYPE_ROOF_BUILD_TASK, NULL)); +} + +static void +ufo_roof_build_task_setup (UfoTask *task, + UfoResources *resources, + GError **error) +{ + guint i; + GError *gerr = NULL; + + UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task); + + if (!priv->config) + roof_setup_error(error, "ROOF configuration is not specified"); + + priv->cfg = ufo_roof_config_new(priv->config, priv->simulate?UFO_ROOF_CONFIG_SIMULATION:UFO_ROOF_CONFIG_DEFAULT, &gerr); + if (!priv->cfg) + roof_propagate_error(error, gerr, "roof-build-setup: "); + + for (i = 0; i < priv->cfg->n_streams; i++) { + if (priv->simulate) { + if (!priv->path) + roof_setup_error(error, "Path to simulated data should be specified"); + + priv->rdi[i] = ufo_roof_read_file_new(priv->cfg, priv->path, priv->id * priv->cfg->sockets_per_thread + i + priv->first_file_number, &gerr); + } else + priv->rdi[i] = ufo_roof_read_socket_new(priv->cfg, priv->id * priv->cfg->sockets_per_thread + i, &gerr); + + if (!priv->rdi[i]) + roof_propagate_error(error, gerr, "roof_read_interface_new: "); + } + + if (priv->build == BUILD_AUTO) { + if (priv->cfg->roof_mode) priv->build = BUILD_SINO; + else priv->build = BUILD_RAW; + g_object_notify_by_pspec (G_OBJECT(task), properties[PROP_BUILD]); + } + + priv->buf = ufo_roof_buffer_new(priv->cfg, (priv->build == BUILD_RAW)?1:2, priv->number, &gerr); + if ((gerr)||(!priv->buf)) + roof_propagate_error(error, gerr, "roof_buffer_new: "); + + priv->rd = ufo_roof_read_new(priv->cfg, priv->rdi, priv->buf, &gerr); + if (gerr) + roof_propagate_error(error, gerr, "roof_read_new: "); + + clock_gettime(CLOCK_REALTIME, &priv->last_fragment_timestamp); +} + +static void +ufo_roof_build_task_finalize (GObject *object) +{ + UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (object); + + if (priv->rd) { + ufo_roof_read_free(priv->rd); + priv->rd = NULL; + } + + if (priv->buf) { + ufo_roof_buffer_free(priv->buf); + priv->buf = NULL; + } + + if (priv->cfg) { + ufo_roof_config_free(priv->cfg); + priv->cfg = NULL; + } + + if (priv->config) { + g_free(priv->config); + priv->config = NULL; + } + + if (priv->path) { + g_free(priv->path); + priv->path = NULL; + } + + G_OBJECT_CLASS (ufo_roof_build_task_parent_class)->finalize (object); +} + + + +static void +ufo_roof_build_task_get_requisition (UfoTask *task, + UfoBuffer **inputs, + UfoRequisition *requisition, + GError **error) +{ + UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task); + + // FIXME: Can we handle data types more elegant? + if (priv->build == BUILD_RAW) { + guint bytes = priv->cfg->dataset_size; + requisition->n_dims = 1; + requisition->dims[0] = bytes / sizeof(float) + ((bytes%sizeof(float))?1:0); + } else if (priv->build == BUILD_SINO) { + guint bytes = priv->cfg->fan_bins * priv->cfg->bit_depth / 8; + requisition->n_dims = 2; + requisition->dims[0] = bytes / sizeof(float) + ((bytes%sizeof(float))?1:0); + requisition->dims[1] = priv->cfg->fan_projections; + } else if (priv->build == BUILD_UFO) { + requisition->n_dims = 2; + requisition->dims[0] = priv->cfg->fan_bins; + requisition->dims[1] = priv->cfg->fan_projections; + } +} + +static guint +ufo_roof_build_task_get_num_inputs (UfoTask *task) +{ + return 1; +} + +static guint +ufo_roof_build_task_get_num_dimensions (UfoTask *task, + guint input) +{ + return 1; +} + +static UfoTaskMode +ufo_roof_build_task_get_mode (UfoTask *task) +{ + return UFO_TASK_MODE_CPU | UFO_TASK_MODE_GENERATOR; +} + + +static gboolean +ufo_roof_build_task_generate (UfoTask *task, + UfoBuffer *output, + UfoRequisition *requisition) +{ + gboolean ready = FALSE; + gulong seqid; + GError *gerr = NULL; + GValue ival = G_VALUE_INIT; + GValue lval = G_VALUE_INIT; + + UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task); + UfoRoofConfig *cfg = priv->cfg; + UfoRoofBuffer *buf = priv->buf; + + void *output_buffer = ufo_buffer_get_host_array(output, NULL); + + if (priv->stop) + return FALSE; + + // FIXME: Wait or break. Not both. + do { + ready = ufo_roof_buffer_wait_dataset(buf, output_buffer, &seqid, cfg->network_timeout, &gerr); + if (gerr) roof_print_error(gerr); + + if (!ready) { + ready = ufo_roof_buffer_skip_to_ready(buf); + if (!ready) { + priv->stop = TRUE; + g_object_notify_by_pspec (G_OBJECT(task), properties[PROP_STOP]); + return FALSE; + } + } + } while (!ready); + + // FIXME: integrate fastwriter somewhere here? + + if (priv->build == BUILD_UFO) { + switch (cfg->bit_depth) { + case 8: + ufo_buffer_convert(output, UFO_BUFFER_DEPTH_8U); + break; + case 16: + ufo_buffer_convert(output, UFO_BUFFER_DEPTH_16U); + break; + case 32: + ufo_buffer_convert(output, UFO_BUFFER_DEPTH_32U); + break; + default: + printf("Usupported bit-depth %u\n", cfg->bit_depth); + } + } + + // Metadata: plane and sequential number within the plane + g_value_init (&ival, G_TYPE_UINT); + g_value_init (&lval, G_TYPE_ULONG); + if (priv->build != BUILD_UFO) { + g_value_set_uint (&ival, cfg->bit_depth); + ufo_buffer_set_metadata (output, "bpp", &ival); + } + g_value_set_uint (&ival, 1 + seqid % cfg->n_planes); + ufo_buffer_set_metadata (output, "plane", &ival); + g_value_set_ulong (&lval, seqid / cfg->n_planes); + ufo_buffer_set_metadata (output, "plane_id", &lval); + g_value_set_ulong (&lval, seqid); + ufo_buffer_set_metadata (output, "seqid", &lval); + g_value_unset(&lval); + g_value_unset(&ival); + + // FIXME: Or shall we start from counting from the ID of the first registerd dataset + if ((priv->number)&&(buf->current_id >= priv->number)) { +// printf("%lu datasets processed, stopping\n", buf->current_id); + priv->stop = TRUE; + g_object_notify_by_pspec (G_OBJECT(task), properties[PROP_STOP]); + } + + if (ready) priv->generated++; + + if (((priv->number > 0)&&(priv->number <= 100))||((buf->current_id - priv->announced) > 1000)) { + if (ready) + printf("Processing dataset %li (ready ), next : %u out of %u\n", buf->current_id, buf->n_fragments[buf->current_id%buf->ring_size], buf->fragments_per_dataset); + else + printf("Skipping dataset %li (timeout), acquired: %u out of %u\n", buf->current_id + 1, buf->n_fragments[buf->current_id%buf->ring_size], buf->fragments_per_dataset); + priv->announced = buf->current_id; + } + + return ready; +} + +static void +ufo_roof_build_task_set_property (GObject *object, + guint property_id, + const GValue *value, + GParamSpec *pspec) +{ + UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (object); + + switch (property_id) { + case PROP_CONFIG: + if (priv->config) g_free(priv->config); + priv->config = g_value_dup_string(value); + break; + case PROP_STOP: + priv->stop = g_value_get_boolean (value); + break; + case PROP_SIMULATE: + priv->simulate = g_value_get_boolean (value); + break; + case PROP_PATH: + if (priv->path) g_free(priv->path); + priv->path = g_value_dup_string(value); + break; + case PROP_FIRST: + priv->first_file_number = g_value_get_uint (value); + break; + case PROP_NUMBER: + priv->number = g_value_get_uint (value); + break; + case PROP_BUILD: + priv->build = g_value_get_enum (value); + if ((priv->build == BUILD_AUTO)&&(priv->cfg)) { + if (priv->cfg->roof_mode) priv->build = BUILD_SINO; + else priv->build = BUILD_RAW; + } + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); + break; + } +} + +static void +ufo_roof_build_task_get_property (GObject *object, + guint property_id, + GValue *value, + GParamSpec *pspec) +{ + UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (object); + + switch (property_id) { + case PROP_CONFIG: + g_value_set_string(value, priv->config); + break; + case PROP_STOP: + g_value_set_boolean (value, priv->stop); + break; + case PROP_SIMULATE: + g_value_set_boolean (value, priv->simulate); + break; + case PROP_PATH: + g_value_set_string(value, priv->path?priv->path:""); + break; + case PROP_FIRST: + g_value_set_uint (value, priv->first_file_number); + break; + case PROP_NUMBER: + g_value_set_uint (value, priv->number); + break; + case PROP_BUILD: + g_value_set_enum (value, priv->build); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); + break; + } +} + +static void +ufo_task_interface_init (UfoTaskIface *iface) +{ + iface->setup = ufo_roof_build_task_setup; + iface->get_num_inputs = ufo_roof_build_task_get_num_inputs; + iface->get_num_dimensions = ufo_roof_build_task_get_num_dimensions; + iface->get_mode = ufo_roof_build_task_get_mode; + iface->get_requisition = ufo_roof_build_task_get_requisition; + iface->generate = ufo_roof_build_task_generate; +} + +static void +ufo_roof_build_task_class_init (UfoRoofBuildTaskClass *klass) +{ + GObjectClass *oclass = G_OBJECT_CLASS (klass); + + oclass->set_property = ufo_roof_build_task_set_property; + oclass->get_property = ufo_roof_build_task_get_property; + oclass->finalize = ufo_roof_build_task_finalize; + + properties[PROP_CONFIG] = + g_param_spec_string ("config", + "ROOF configuration", + "Path to ROOF configuration file", + "", + G_PARAM_READWRITE); + + properties[PROP_STOP] = + g_param_spec_boolean ("stop", + "Stop flag", + "Stop socket servers and terminates filter execution", + FALSE, + G_PARAM_READWRITE); + + properties[PROP_SIMULATE] = + g_param_spec_boolean ("simulate", + "Simulation mode", + "Read data from the specified files instead of network", + FALSE, + G_PARAM_READWRITE); + + properties[PROP_PATH] = + g_param_spec_string ("path", + "Input files for simulation mode", + "Optional path to input files for simulation mode (parameter from configuration file is used if not specified)", + "", + G_PARAM_READWRITE); + + properties[PROP_FIRST] = + g_param_spec_uint ("first_file_number", + "Offset to the first read file", + "Offset to the first read file", + 0, G_MAXUINT, 0, + G_PARAM_READWRITE); + + properties[PROP_NUMBER] = + g_param_spec_uint("number", + "Number of datasets to receive", + "Number of datasets to receive", + 0, G_MAXUINT, 0, + G_PARAM_READWRITE); + + properties[PROP_BUILD] = + g_param_spec_enum ("build", + "Build type (\"raw\", \"sino\", \"ufo\")", + "Build type (\"raw\" - raw data, \"sino\" - arrange in sinogram, \"ufo\" - arrange in sinogram and convert UFO floating-point format)", + g_enum_register_static ("build", build_values), + 0, G_PARAM_READWRITE); + + + for (guint i = PROP_0 + 1; i < N_PROPERTIES; i++) + g_object_class_install_property (oclass, i, properties[i]); + + g_type_class_add_private (oclass, sizeof(UfoRoofBuildTaskPrivate)); +} + +static void +ufo_roof_build_task_init(UfoRoofBuildTask *self) +{ + self->priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE(self); +} diff --git a/src/save/ufo-roof-read-thread.c b/src/save/ufo-roof-read-thread.c new file mode 100644 index 0000000..60868d2 --- /dev/null +++ b/src/save/ufo-roof-read-thread.c @@ -0,0 +1,155 @@ +#include +#include + +#include + +#include "ufo-roof-buffer.h" +#include "ufo-roof-read-thread.h" +#include "ufo-roof-read.h" + + +UfoRoofReadThread *guint ufo_roof_read_thread_new(UfoRoofRead *rd, guint from, guint to, GError **error) { + int i; + GError *gerr = NULL; + + UfoRoofReadThread *thr = (UfoRoofReadThread*)calloc(1, sizeof(UfoRoofReadThread)); + if (!ctx) roof_new_error(error, "Can't allocate UfoRoofReadThread context"); + + thr->rdbuf = malloc(cfg->max_packets * cfg->max_packet_size); + if (!thr->rdbuf) { + ufo_roof_read_thread_free(thr); + roof_new_error(error, "Can't allocate memory for temporary packet receiver buffer"); + } + + thr->rd = rd; + thr->from = from; + thr->to = to; + + + return thr; + +} + +void ufo_roof_read_thread_free(UFORoofReadThread *thr, GError **error) { + if (!thr) return; + if (thr->rdbuf) free(thr->rdbuf); + + ufo_roof_thread_stop(thr, error); + free(thr); +} + +static int ufo_roof_read_thread_run(void *arg) { + GError *gerr = NULL; + + UfoRoofReadThread *thr = (UfoRoofReadThread*)arg; + + UfoRoofConfig *cfg = thr->rd->cfg; + UfoRoofBuffer *buf = thr->rd->buf; + UfoRoofReadInterface *rdi = thr->rd->rdi; + + guint from = thr->from; + guint to = thr->to; + + void *rdbuf = thr->rdbuf; + + uint64_t current_id[to - from] = {0}; + uint64_t grabbed[to - from] = {0}; + + static uint64_t errors = 0; + + while (thr->op != UFO_ROOF_OP_STOP) { + for (guint sid = from; sid < to; sid++) { + // FIXME break waiting on stop? If no packets are send + guint packets = rdi[sid]->read(priv->reader[sid], rdbuf, &gerr); + if (gerr) roof_print_error(gerr); + + guint ready = false; + const uint8_t *fragment = (uint8_t*)rdbuf; + for (guint i = 0; i < packets; i++) { + guint64 packet_id = 0; + + // Otherwise considered consecutive and handled by the buffer + if (cfg->header_size >= sizeof(UfoRoofPacketHeader)) { + UfoRoofPacketHeader *pheader = UFO_ROOF_PACKET_HEADER(fragment); + packet_id = be64toh(pheader->packet_id) + 1; + } + +#ifdef UFO_ROOF_DEBUG + if ((current_id[sid - from])&&(current_id[sid - from] + 1 != packet_id)) { + printf("Channel %i(%i): =======> Missing %lu packets, expecting %lu but got %lu (N %i from total packets in pack %u)\n", priv->id * cfg->sockets_per_thread + sid, sid, packet_id - current_id[sid] - 1, current_id[sid] + 1, packet_id, i, packets); + //if (++errors > 1000) exit(1); + } + + current_id[sid - from] = packet_id; + grabbed[sid - from]++; + if ((grabbed[sid - from]%1000000)==0) + printf("Channel %i: Grabbed %lu Mpackets\n", sid, grabbed[sid - from]/1000000); +#endif + + ready |= ufo_roof_buffer_set_fragment(buf, sid, packet_id, fragment, &gerr); + if (gerr) roof_print_error(gerr); + + fragment += cfg->max_packet_size; + } // fragment-loop + + // send notification? Broadcast blocks, we don't want it. + if (ready) { + } + + } // socket-loop + } // operation-loop + + +#ifdef UFO_ROOF_DEBUG + // Store first received packet on each channel... + static int debug = 1; + if (debug) { + char fname[256]; + sprintf(fname, "channel%i_packet0.raw", priv->id); + FILE *f = fopen(fname, "w"); + if (f) { + fwrite(output_buffer, 1, cfg->max_packets * cfg->max_packet_size, f); + fclose(f); + } + debug = 0; + } +#endif /* UFO_ROOF_DEBUG */ + + // FIXME: End of data (shall we restart in the network case?) +// if (!packets) +// return FALSE; + + // Shall I use UFO metadata (ufo_buffer_set_metadata) insead? + header->channel_id = priv->id; +// header->n_packets = packets; + + return TRUE; +} + + +} + +gboolean ufo_roof_read_thread_start(UFORoofReadThread *thr, GError **error) { + int err; + if (!thr) return FALSE; + + err = thrd_create(&thr->thread, ufo_roof_read_thread_run, thr); + if (err != thrd_success) roof_setup_error_with_retval(error, FALSE, "Error (%i) spawning new read thread", err); + + ctx->launched = TRUE; + return TRUE; +} + +gboolean ufo_roof_read_thread_stop(UFORoofReadThread *thr, GError **error) { + int err, ret; + if (!thr) return FALSE; + if (!thr->launched) return TRUE; + + // Signal thread termination + + err = thrd_join(&thr->thread, &ret); + if (err != thrd_success) roof_setup_error_with_retval(error, FALSE, "Error (%i) waiting for read thread termination", err); + + return TRUE; +} + diff --git a/src/save/ufo-roof-read-thread.h b/src/save/ufo-roof-read-thread.h new file mode 100644 index 0000000..ebe8989 --- /dev/null +++ b/src/save/ufo-roof-read-thread.h @@ -0,0 +1,23 @@ +#ifndef __UFO_ROOF_READ_THREAD_H +#define __UFO_ROOF_READ_THREAD_H + +typedef struct _UfoRoofReadThread UfoRoofReadThread; + +#include "ufo-roof-read.h" + +struct _UfoRoofReadThread { + UfoRoofRead *rd; // ROOF Reader Cotext + guint from, to; // Determines ports/files which are read by this thread (from is inclusive and to - exclusive) + + gboolean launched; // Flag indicating if thread is launched + thrd_t thread; // Thread ID +}; + +/* +UfoRoofReadThread *guint ufo_roof_read_thread_new(UfoRoofRead *rd, guint from, guint to, GError **error); +void ufo_roof_read_thread_free(UFORoofReadThread *thr, GError **error); +gboolean ufo_roof_read_thread_start(UFORoofReadThread *thr, GError **error); +gboolean ufo_roof_read_thread_stop(UFORoofReadThread *thr, GError **error); +*/ + +#endif /* __UFO_ROOF_READ_THREAD_H */ \ No newline at end of file diff --git a/src/save/ufo-roof-read.c b/src/save/ufo-roof-read.c new file mode 100644 index 0000000..f3d790d --- /dev/null +++ b/src/save/ufo-roof-read.c @@ -0,0 +1,123 @@ +#include +#include + +#include + +#include "ufo-roof-buffer.h" +#include "ufo-roof-read-thread.h" +#include "ufo-roof-read.h" + + + +#include +#include +#include +#include + +static void ufo_roof_read_optimize(UfoRoofConfig *cfg) { + // FIXME: request real-time permissions? + // FIXME: do we need this? +/* + uint32_t lat = 0; + int fd = open("/dev/cpu_dma_latency", O_RDWR); + if (fd == -1) { + fprintf(stderr, "Failed to open cpu_dma_latency: error %s", strerror(errno)); + } else { + write(fd, &lat, sizeof(lat)); + close(fd); + } +*/ +} + + +const UfoRoofReadInterfaceSettings *ufo_roof_read_get_settings(UFORoofRead *ctx, GError **error) { + assert(ctx); + return ctx->settings; +} + + +UfoRoofRead *ufo_roof_read_new(UfoRoofConfig *cfg, UfoRoofReadInterface *rdi, UfoRoofBuffer *buf, GError **error) { + guint i; + GError *gerr = NULL; + + UfoRoofRead *ctx = (UfoRoofRead*)calloc(1, sizeof(UfoRoofRead)); + if (!ctx) roof_new_error(error, "Can't allocate UfoRoofRead context"); + + ufo_roof_read_optimize(ctx); + + ctx->n_threads = cfg->n_read_threads; + ctx->cfg = cfg; + ctx->buf = buf; + ctx->rdi = rdi; + + ctx->thr = (UfoRoofReadThread*)calloc(cfg->n_read_threads, sizeof(UfoRoofReadThread)); + if (!ctx->thr) { + ufo_roof_read_free(ctx); + roof_new_error(error, "Error allocating UfoRoofReadThread contexts"); + } + + // We try to distribute sockets uniformly respecting sockets_per_thread as maximum limit + guint n_threads = priv->cfg->n_streams / priv->cfg->sockets_per_thread; + if (priv->cfg->n_streams % priv->cfg->sockets_per_thread) n_threads++; + ctx->n_threads = n_threads; + + guint extra = 0, sockets_per_thread = priv->cfg->n_streams / n_threads; + if (priv->cfg->n_streams % n_threads) extra = priv->cfg->n_streams - n_threads * sockets_per_thread; + + guint from, to; + for (i = 0, from = 0; i < n_threads; i++, from = to) { + guint to = from + sockets_per_thread; + if (i < extra) to++; + + ctx->thr[i]= ufo_roof_thread_new(ctx, from, to, &gerr); + if (!ctx->thr[i]) roof_propagate_error(error, gerr, "ufo_roof_thread_new (%i): ", i); + } + + return ctx; +} + +void ufo_roof_read_free(UfoRoofRead *ctx) { + if (!ctx) return; + + if (ctx->thr) { + int i; + ufo_roof_read_stop(ctx); + for (i = 0; i < ctx->n_threads; i++) { + if (ctx->thr[i]) + ufo_roof_read_thread_free(ctx->thr[i]); + } + free(ctx->thr); + } + free(ctx); +} + +gboolean ufo_roof_read_start(UFORoofRead *ctx, GError **error) { + gboolean ret; + GError *gerr; + + if ((!ctx)||(!ctx->thr)) return FALSE; + + for (int i = 0; i < ctx->n_threads; i++) { + if (!ctx->thr[i]) return FALSE; + ret = ufo_roof_read_thread_start(ctx, &gerr); + if (!ret) roof_propagate_error_with_retval(error, FALSE, gerr, "ufo_roof_read_thread_start (%i): ", i); + } + return TRUE; +} + +gboolean ufo_roof_read_stop(UFORoofRead *ctx, GError **error) { + gboolean ret, res = FALSE; + GError *gerr; + + if ((!ctx)||(!ctx->thr)) return FALSE; + + for (int i = 0; i < ctx->n_threads; i++) { + if (!ctx->thr[i]) return FALSE; + ret = ufo_roof_read_thread_stop(ctx, &gerr); + if (!ret) g_propagate_perfixed_error(error, gerr, "ufo_roof_read_thread_stop (%i): ", i); + res |= !ret; + } + return !res; +} + + diff --git a/src/save/ufo-roof-read.h b/src/save/ufo-roof-read.h new file mode 100644 index 0000000..16e910b --- /dev/null +++ b/src/save/ufo-roof-read.h @@ -0,0 +1,61 @@ +#ifndef __UFO_ROOF_READ_H +#define __UFO_ROOF_READ_H + +typedef struct _UfoRoofRead UfoRoofRead; + +#include "ufo-roof-config.h" +#include "ufo-roof-buffer.h" +//#include "ufo-roof-read-thread.h" + +G_BEGIN_DECLS + +typedef struct _UfoRoofReadInterface UfoRoofReadInterface; +typedef struct _UfoRoofReadInterfaceSettings UfoRoofReadInterfaceSettings; + +//typedef guint (*UfoRoofReaderRead)(UfoRoofReadInterface *reader, uint8_t *buf, GError **error); + +typedef guint (*UfoRoofReaderRead)(UfoRoofReadInterface *reader, uint8_t **buf, GError **error); +typedef void (*UfoRoofReaderClose)(UfoRoofReadInterface *reader); + +struct _UfoRoofReadInterfaceSettings { + guint padding; // Packet size + padding +}; + +struct _UfoRoofReadInterface { + UfoRoofReaderRead read; + UfoRoofReaderClose close; + + UfoRoofReadInterfaceSettings settings; +}; + +typedef enum { + UFO_ROOF_OP_STOP = 0, + UFO_ROOF_OP_READ +} UfoRoofOp; + +struct _UfoRoofReadContext { + UfoRoofConfig *cfg; + UfoRoofBuffer *buf; + UfoRoofReadInterface *rdi; + + gulong packet +// UfoRoofReadThread *thr; + + guint n_threads; + UfoRoofOp op; // Current operation (reading by default) +}; + + + +/* +UfoRoofRead *ufo_roof_read_new(UfoRoofConfig *cfg, UfoRoofReadInterface *rdi, UfoRoofBuffer *buf, GError **error); +void ufo_roof_read_free(UfoRoofRead *ctx); +gboolean ufo_roof_read_start(UFORoofRead *ctx, GError **error); +gboolean ufo_roof_read_stop(UFORoofRead *ctx, GError **error); + +const UfoRoofReadInterfaceSettings *ufo_roof_read_get_settings(UFORoofRead *ctx, GError **error); +*/ + +G_END_DECLS + +#endif /* __UFO_ROOF_READ_H */ diff --git a/src/ufo-roof-buffer.c b/src/ufo-roof-buffer.c deleted file mode 100644 index 0e0a890..0000000 --- a/src/ufo-roof-buffer.c +++ /dev/null @@ -1,209 +0,0 @@ -#include -#include -#include - -#include "glib.h" - -#include "ufo-roof.h" -#include "ufo-roof-buffer.h" - -// This is currently not thread safe. With dual-filter architecture this will be called sequentially. - -UfoRoofBuffer *ufo_roof_buffer_new(UfoRoofConfig *cfg, guint n_dims, guint max_datasets, GError **error) { - if ((n_dims < 1)||(n_dims > 2)) - roof_new_error(error, "Unsupported number of dimmensions %u (only plain and 2D ROOF structure is currently supported)", n_dims); - - UfoRoofBuffer *buffer = (UfoRoofBuffer*)calloc(1, sizeof(UfoRoofBuffer)); - if (!buffer) roof_new_error(error, "Can't allocate UfoRoofBuffer"); - - buffer->max_datasets = max_datasets; - buffer->ring_size = cfg->buffer_size; - buffer->drop_buffers = cfg->drop_buffers; - buffer->latency_buffers = cfg->latency_buffers; - buffer->n_dims = n_dims; - buffer->dataset_size = cfg->dataset_size; - buffer->dataset_dims[0] = cfg->fan_bins * cfg->bit_depth / 8; - buffer->dataset_dims[1] = cfg->fan_projections; - buffer->fragment_size = cfg->payload_size; - buffer->fragment_dims[0] = cfg->channels_per_module * cfg->bit_depth / 8; - buffer->fragment_dims[1] = buffer->fragment_size / buffer->fragment_dims[0]; - - buffer->fragments_per_dataset = buffer->dataset_size / buffer->fragment_size; - buffer->fragments_per_stream = buffer->fragments_per_dataset / cfg->n_streams; -// printf("Configuration: dataset: %u - %u fragments (%u streams x %u) x %u bytes\n", buffer->dataset_size, buffer->fragments_per_dataset, cfg->n_streams, buffer->fragments_per_stream, buffer->fragment_size); - - buffer->ring_buffer = malloc(buffer->ring_size * buffer->dataset_size); - buffer->n_fragments = (_Atomic guint*)calloc(buffer->ring_size, sizeof(_Atomic int)); - buffer->stream_fragment = (guint*)calloc(cfg->n_streams, sizeof(guint)); - -#ifdef UFO_ROOF_INDEPENDENT_STREAMS - buffer->first_id = malloc(buffer->ring_size * sizeof(guint64)); - if (!buffer->first_id) roof_new_error(error, "Can't allocate first_id buffer for ROOF datasets"); - for (guint i = 0; i < buffer->ring_size; i++) - buffer->first_id[i] = (guint64)-1; -#else - buffer->first_id = (guint64)-1; -#endif - - if ((!buffer->ring_buffer)||(!buffer->n_fragments)||(!buffer->stream_fragment)) { - ufo_roof_buffer_free(buffer); - roof_new_error(error, "Can't allocate ring buffer for ROOF datasets, total size %u", buffer->ring_size * buffer->dataset_size); - } - - return buffer; -} - -void ufo_roof_buffer_free(UfoRoofBuffer *buffer) { - if (buffer) { -#ifdef UFO_ROOF_INDEPENDENT_STREAMS - if (buffer->first_id) - free(buffer->first_id); -#endif - if (buffer->ring_buffer) - free(buffer->ring_buffer); - if (buffer->n_fragments) - free(buffer->n_fragments); - if (buffer->stream_fragment) - free(buffer->stream_fragment); - - free(buffer); - } -} - - // fragment_id is numbered from 1 (0 - means auto) -gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, guint64 fragment_id, gconstpointer fragment, GError **error) { - gboolean ready = FALSE; - guint buffer_id; - guint64 first_id; - guint64 dataset_id; - - if (!fragment_id) { - fragment_id = ++buffer->stream_fragment[stream_id]; - } - - // If we have packets of arbitrary size, we would need dataset_id transferred along with packet_id (otherwise complex guessing is required) - dataset_id = (fragment_id - 1) / buffer->fragments_per_stream; - fragment_id = (fragment_id - 1) % buffer->fragments_per_stream; - -#ifdef UFO_ROOF_INDEPENDENT_STREAMS - if (buffer->first_id[stream_id] == (guint64)-1) - buffer->first_id[stream_id] = dataset_id; - first_id = buffer->first_id[stream_id]; -#else - if (buffer->first_id == (guint64)-1) - buffer->first_id = dataset_id; - first_id = buffer->first_id; -#endif - if (dataset_id < first_id) - return FALSE; - - dataset_id -= first_id; - buffer_id = dataset_id % buffer->ring_size; - - // FIXME: Currently, this produces too much output. Introduce some kind of debugging mode? - if (dataset_id < buffer->current_id) { - roof_network_error_with_retval(error, FALSE, "Late arrived packet for dataset %li, currently processing %li", dataset_id, buffer->current_id); - return FALSE; - } - - if ((buffer->max_datasets)&&(dataset_id >= buffer->max_datasets)) { -// printf("Stream %i: dataset %li < %li, first_id: %li\n", stream_id, dataset_id, buffer->max_datasets, first_id); - return FALSE; - } - // We are not fast enough, new packets are arrvining to fast - if (dataset_id >= (buffer->current_id + buffer->ring_size)) { - // FIXME: Broken packets sanity checks? Allocate additional buffers on demand? - - root_set_network_error(error, "Ring buffer exhausted. Get packet for dataset %li. Dropping datasets from %li to %li, dataset %li has %i parts of %i completed", - dataset_id, buffer->current_id, dataset_id - (buffer->ring_size - buffer->drop_buffers), buffer->current_id, buffer->n_fragments[buffer_id], buffer->fragments_per_dataset); - - // FIXME: Send semi-complete buffers further? - // FIXME: Or shall we drop more if larger buffers are allocated? - if ((dataset_id - buffer->current_id) > 2 * buffer->ring_size) { - memset(buffer->n_fragments, 0, buffer->ring_size * sizeof(_Atomic guint)); - buffer->current_id = dataset_id; - } else { - for (guint i = buffer->current_id; i <= (dataset_id - (buffer->ring_size - buffer->drop_buffers)); i++) - buffer->n_fragments[i%buffer->ring_size] = 0; - buffer->current_id = dataset_id - (buffer->ring_size - buffer->drop_buffers) + 1; - } - - if (buffer->n_fragments[buffer->current_id%buffer->ring_size] == buffer->fragments_per_dataset) - ready = TRUE; - - // FIXME: In mult-threaded case, we need to ensure that all threads are stopped writting here (and generator is not reading) before we can reassign buffer to the new dataset. - // To avoid locking, we can store per-thread 'current_id' and only proceed to writting when all per-threads current_ids are equal or above the global value - // The updates may happen after writting/reading is finished. - } - -/* - printf("dataset: %lu (%u) is %u of %u complete, new packet: %lu (%ux%u %lu), packet_size: %u [%x]\n", - dataset_id, buffer_id, - buffer->n_fragments[buffer_id] + 1, buffer->fragments_per_dataset, - stream_id * buffer->fragments_per_stream + fragment_id, stream_id, buffer->fragments_per_stream, fragment_id, - buffer->fragment_size, ((uint32_t*)fragment)[0] - ); -*/ - - uint8_t *dataset_buffer = buffer->ring_buffer + buffer_id * buffer->dataset_size; - if (buffer->n_dims == 2) { - uint8_t *fragment_buffer = dataset_buffer + - stream_id * buffer->fragment_dims[0] + // x-coordinate - (fragment_id * buffer->fragment_dims[1]) * buffer->dataset_dims[0]; // y-coordinate - - for (guint i = 0; i < buffer->fragment_dims[1]; ++i) { - memcpy(fragment_buffer + i * buffer->dataset_dims[0], (uint8_t*)fragment + i * buffer->fragment_dims[0], buffer->fragment_dims[0]); - } - } else { - // 1D stracture, simply putting fragment at the appropriate position in the stream - uint8_t *fragment_buffer = dataset_buffer + (stream_id * buffer->fragments_per_stream + fragment_id) * buffer->fragment_size; - memcpy(fragment_buffer, fragment, buffer->fragment_size); - } - - // FIXME: Sanity checks: verify is not a dublicate fragment? - atomic_fetch_add(&buffer->n_fragments[buffer_id], 1); - - if (buffer->n_fragments[buffer_id] == buffer->fragments_per_dataset) { - // FIXME: what about a complete dataset blocked by earlier one with only a few framents missing? - if (dataset_id == buffer->current_id) - ready = TRUE; - else if ((buffer->latency_buffers)&&(dataset_id >= (buffer->current_id + buffer->latency_buffers))) - ready = ufo_roof_buffer_skip_to_ready(buffer); - } - - return ready; -} - -gboolean ufo_roof_buffer_skip_to_ready(UfoRoofBuffer *buffer) { - for (guint i = 0; i < buffer->ring_size; i++) { - guint64 id = buffer->current_id + i; - guint buffer_id = id % buffer->ring_size; - - if (buffer->n_fragments[buffer_id] == buffer->fragments_per_dataset) { - buffer->current_id = id; - return TRUE; - } - -// printf("Skipping event %lu (%u), only %u of %u fragments are ready\n", id, buffer_id, buffer->n_fragments[buffer_id], buffer->fragments_per_dataset); - buffer->n_fragments[buffer_id] = 0; - } - - return FALSE; -} - - -gboolean ufo_roof_buffer_get_dataset(UfoRoofBuffer *buffer, gpointer output_buffer, gulong *seqid, GError **error) { - guint buffer_id = buffer->current_id % buffer->ring_size; - void *dataset_buffer = buffer->ring_buffer + buffer_id * buffer->dataset_size; - - // FIXME: what about a complete dataset blocked by earlier one with only a few framents missing? - if (buffer->n_fragments[buffer_id] < buffer->fragments_per_dataset) return FALSE; - - memcpy(output_buffer, dataset_buffer, buffer->dataset_size); - if (seqid) *seqid = buffer->current_id; - - buffer->n_fragments[buffer_id] = 0; - buffer->current_id += 1; - - return TRUE; -} diff --git a/src/ufo-roof-buffer.h b/src/ufo-roof-buffer.h deleted file mode 100644 index 8e9c00b..0000000 --- a/src/ufo-roof-buffer.h +++ /dev/null @@ -1,47 +0,0 @@ -#ifndef __UFO_ROOF_BUFFER_H -#define __UFO_ROOF_BUUFER_H - - // This IS harmful! Just for testing -//#define UFO_ROOF_INDEPENDENT_STREAMS - -#include - - -struct _UfoRoofBuffer { - guint64 current_id; // The ID of the first (active) dataset in the buffer -#ifdef UFO_ROOF_INDEPENDENT_STREAMS - guint64 *first_id; // The ID of the first received dataset (used for numbering), -1 means not yet known -#else - guint64 first_id; // The ID of the first received dataset (used for numbering), -1 means not yet known -#endif - - guint ring_size; // Number of datasets to buffer - guint drop_buffers; // If we need to catch up - guint latency_buffers; // we skip incomplete buffers if current_id + latency_buffers is ready - uint8_t *ring_buffer; // The ring buffer - _Atomic guint *n_fragments; // Number of completed fragments in each buffer - guint *stream_fragment; // Currently processed fragment in the stream (for ordered streams) -// int *fragments; // Mark individual completed fragments (if we care for partial data) - - - guint64 max_datasets; // Only the specified number of datasets will be buffered, the rest will be silently dropped - guint n_dims; // Indicates if we just assemble one fragment after another or there is 2D/3D data structure (ROOF) - guint dataset_size; // Size (in bytes) of a full dataset - guint dataset_dims[2]; // x (in bytes), y (in rows) - guint fragment_size; // Size (in bytes) of a single fragment (we expect fixed-size fragments at the moment) - guint fragment_dims[2]; // x (in bytes), y (in rows) - - guint fragments_per_dataset; // Number of packets in dataset (used to compute when dataset is ready) - guint fragments_per_stream; // Number of packets in each of data streams (used to compute when dataset is ready) -}; - -typedef struct _UfoRoofBuffer UfoRoofBuffer; - -UfoRoofBuffer *ufo_roof_buffer_new(UfoRoofConfig *cfg, guint n_dims, guint max_datasets, GError **error); -void ufo_roof_buffer_free(UfoRoofBuffer *buf); - -gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, guint64 fragment_id, gconstpointer fragment, GError **error); -gboolean ufo_roof_buffer_skip_to_ready(UfoRoofBuffer *buffer); -gboolean ufo_roof_buffer_get_dataset(UfoRoofBuffer *buffer, gpointer output_buffer, gulong *seqid, GError **error); - -#endif diff --git a/src/ufo-roof-build-task.c b/src/ufo-roof-build-task.c index a39ed11..ee1cec5 100644 --- a/src/ufo-roof-build-task.c +++ b/src/ufo-roof-build-task.c @@ -19,6 +19,7 @@ #include #include +#include #ifdef __APPLE__ #include @@ -26,10 +27,16 @@ #include #endif +#include "hw_sched.h" + #include "ufo-roof.h" +#include "ufo-roof-read.h" #include "ufo-roof-buffer.h" +#include "ufo-roof-read-socket.h" +#include "ufo-roof-read-file.h" #include "ufo-roof-build-task.h" + typedef enum { BUILD_AUTO = 0, BUILD_RAW, @@ -37,10 +44,16 @@ typedef enum { BUILD_UFO } BuildType; -struct _UfoRoofBuildTaskPrivate { +struct _RoofBuildTaskPrivate { gchar *config; // ROOF configuration file name - UfoRoofConfig *cfg; // Parsed ROOF parameters - UfoRoofBuffer *buf; // Ring buffer for incomming UDP packet + RoofConfig *cfg; // Parsed ROOF parameters +// RoofBuffer *buf; // Ring buffer for incomming UDP packet + RoofReadInterface **rdi; // Reader interfaces, one per socket (no threading) +// RoofRead *rd; // Threading interface + HWSched sched; + + gchar *path; // UFO file path for simulation mode + guint first_file_number; // Number of a first simulated file (0 or 1) BuildType build; // What dataset do we build: ROOF sinogram or raw network data guint number; // Number of datasets to read @@ -48,17 +61,21 @@ struct _UfoRoofBuildTaskPrivate { gboolean simulate; // Indicates if we are running in network or simulation modes guint64 announced; // For debugging + guint64 generated; // Total number for control + + guint n_threads; // Number of schedulled threads + struct timespec last_fragment_timestamp; }; static void ufo_task_interface_init (UfoTaskIface *iface); -G_DEFINE_TYPE_WITH_CODE (UfoRoofBuildTask, ufo_roof_build_task, UFO_TYPE_TASK_NODE, +G_DEFINE_TYPE_WITH_CODE (RoofBuildTask, ufo_roof_build_task, UFO_TYPE_TASK_NODE, G_IMPLEMENT_INTERFACE (UFO_TYPE_TASK, ufo_task_interface_init)) -#define UFO_ROOF_BUILD_TASK_GET_PRIVATE(obj) (G_TYPE_INSTANCE_GET_PRIVATE((obj), UFO_TYPE_ROOF_BUILD_TASK, UfoRoofBuildTaskPrivate)) +#define UFO_ROOF_BUILD_TASK_GET_PRIVATE(obj) (G_TYPE_INSTANCE_GET_PRIVATE((obj), UFO_TYPE_ROOF_BUILD_TASK, RoofBuildTaskPrivate)) @@ -74,6 +91,8 @@ enum { PROP_0, PROP_STOP, PROP_SIMULATE, + PROP_PATH, + PROP_FIRST, PROP_NUMBER, PROP_BUILD, PROP_CONFIG, @@ -93,9 +112,10 @@ ufo_roof_build_task_setup (UfoTask *task, UfoResources *resources, GError **error) { + guint i; GError *gerr = NULL; - UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task); + RoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task); if (!priv->config) roof_setup_error(error, "ROOF configuration is not specified"); @@ -104,15 +124,65 @@ ufo_roof_build_task_setup (UfoTask *task, if (!priv->cfg) roof_propagate_error(error, gerr, "roof-build-setup: "); + priv->rdi = (RoofReadInterface**)calloc(priv->cfg->n_streams, sizeof(RoofReadInterface*)); + if (!priv->rdi) + roof_setup_error(error, "Failed to allocate memory for RoofReadInterface array"); + + + for (i = 0; i < priv->cfg->n_streams; i++) { + if (priv->simulate) { + if (!priv->path) + roof_setup_error(error, "Path to simulated data should be specified"); + + priv->rdi[i] = ufo_roof_read_file_new(priv->cfg, priv->path, priv->first_file_number + i, &gerr); + } else + priv->rdi[i] = ufo_roof_read_socket_new(priv->cfg, i, &gerr); + + if (!priv->rdi[i]) + roof_propagate_error(error, gerr, "roof_read_interface_new: "); + + priv->rdc[i] = ufo_roof_read_context_new(priv->cfg, priv->rdi[i], &gerr); + if (!priv->rdc[i]) + roof_propagate_error(error, gerr, "roof_read_context_new: "); + } + + + // We try to distribute sockets uniformly respecting sockets_per_thread as maximum limit + priv->n_threads = priv->cfg->n_streams / priv->cfg->sockets_per_thread; + if (priv->cfg->n_streams % priv->cfg->sockets_per_thread) priv->n_threads++; + + guint extra = 0, sockets_per_thread = priv->cfg->n_streams / priv->n_threads; + if (priv->cfg->n_streams % priv->n_threads) extra = priv->cfg->n_streams - priv->n_threads * sockets_per_thread; + + guint from, to; + for (i = 0; i < priv->n_threads; i++) { + guint to = from + sockets_per_thread; + if (i < extra) to++; + + ctx->thr[i]= ufo_roof_thread_new(priv->cfg, priv->rdc, from, to, &gerr); + if (!ctx->thr[i]) roof_propagate_error(error, gerr, "ufo_roof_thread_new (%i): ", i); + } + if (priv->build == BUILD_AUTO) { if (priv->cfg->roof_mode) priv->build = BUILD_SINO; else priv->build = BUILD_RAW; g_object_notify_by_pspec (G_OBJECT(task), properties[PROP_BUILD]); } +/* priv->buf = ufo_roof_buffer_new(priv->cfg, (priv->build == BUILD_RAW)?1:2, priv->number, &gerr); - if (!priv->buf) - roof_propagate_error(error, gerr, "roof-build-setup: "); + if ((gerr)||(!priv->buf)) + roof_propagate_error(error, gerr, "roof_buffer_new: "); + + priv->rd = ufo_roof_read_new(priv->cfg, priv->rdi, priv->buf, &gerr); + if (gerr) + roof_propagate_error(error, gerr, "roof_read_new: "); +*/ + + priv->sched = hw_sched_create(priv->cfg->n_read_threads); + if (!priv->sched) + roof_setup_error(error, "Failed to schedule builder threads"); + clock_gettime(CLOCK_REALTIME, &priv->last_fragment_timestamp); } @@ -120,12 +190,34 @@ ufo_roof_build_task_setup (UfoTask *task, static void ufo_roof_build_task_finalize (GObject *object) { - UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (object); - + RoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (object); + + if (priv->sched) { + hw_sched_destroy(priv->sched); + priv->sched = NULL; + } + +/* + if (priv->rd) { + ufo_roof_read_free(priv->rd); + priv->rd = NULL; + } + if (priv->buf) { ufo_roof_buffer_free(priv->buf); priv->buf = NULL; } +*/ + + if (priv->rdi) { + guint i; + for (i = 0; i < priv->cfg->n_streams; i++) { + if (priv->rdi[i]) + priv->rdi[i]->close(priv->rdi[i]); + } + free(priv->rdi); + priv->rdi = NULL; + } if (priv->cfg) { ufo_roof_config_free(priv->cfg); @@ -137,6 +229,10 @@ ufo_roof_build_task_finalize (GObject *object) priv->config = NULL; } + if (priv->path) { + g_free(priv->path); + priv->path = NULL; + } G_OBJECT_CLASS (ufo_roof_build_task_parent_class)->finalize (object); } @@ -149,9 +245,10 @@ ufo_roof_build_task_get_requisition (UfoTask *task, UfoRequisition *requisition, GError **error) { - UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task); + RoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task); // FIXME: Can we handle data types more elegant? + // FIXME: Kill BUILD_RAW ? if (priv->build == BUILD_RAW) { guint bytes = priv->cfg->dataset_size; requisition->n_dims = 1; @@ -184,100 +281,56 @@ ufo_roof_build_task_get_num_dimensions (UfoTask *task, static UfoTaskMode ufo_roof_build_task_get_mode (UfoTask *task) { - return UFO_TASK_MODE_CPU | UFO_TASK_MODE_REDUCTOR; + return UFO_TASK_MODE_CPU | UFO_TASK_MODE_GENERATOR; } + static gboolean -ufo_roof_build_task_process (UfoTask *task, - UfoBuffer **inputs, +ufo_roof_build_task_generate (UfoTask *task, UfoBuffer *output, UfoRequisition *requisition) { - GError *gerr = NULL; - - UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task); - UfoRoofConfig *cfg = priv->cfg; - UfoRoofBuffer *buf = priv->buf; gboolean ready = FALSE; + gulong seqid; + GError *gerr = NULL; + GValue ival = G_VALUE_INIT; + GValue lval = G_VALUE_INIT; -// UfoRequisition in_req; -// ufo_buffer_get_requisition (inputs[0], &in_req); + RoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task); + RoofConfig *cfg = priv->cfg; + RoofBuffer *buf = priv->buf; - const uint8_t *data = (uint8_t*)ufo_buffer_get_host_array(inputs[0], NULL); - UfoRoofPacketBlockHeader *header = UFO_ROOF_PACKET_BLOCK_HEADER(data, cfg); + void *output_buffer = ufo_buffer_get_host_array(output, NULL); if (priv->stop) return FALSE; - const uint8_t *fragment = data; - for (guint i = 0; i < header->n_packets; i++) { - guint64 packet_id = 0; - // Otherwise considered consecutive and handled by the buffer - if (cfg->header_size >= sizeof(UfoRoofPacketHeader)) { - UfoRoofPacketHeader *pheader = UFO_ROOF_PACKET_HEADER(fragment); - packet_id = be64toh(pheader->packet_id) + 1; - } + priv->current_dataset; + priv->current_buffer = output_buffer; - // FIXME: Can we kill here the dataset finished during the previous step of iteration - ready |= ufo_roof_buffer_set_fragment(buf, header->channel_id, packet_id, fragment + cfg->header_size, &gerr); - if (gerr) roof_print_error(gerr); + err = hw_sched_schedule_thread_task(sched, (void*)&tnv_ctx, ufo_roof_build_task_read); + if (!err) err = hw_sched_wait_task(sched); + if (err) { fprintf(stderr, "Error %i scheduling init threads", err); exit(-1); } - fragment += cfg->max_packet_size; - } - - // FIXME: if 2nd dataset is ready (2nd and 3rd?), skip the first one? - - if (ready) { - clock_gettime(CLOCK_REALTIME, &priv->last_fragment_timestamp); - } else { - struct timespec current_time; - clock_gettime(CLOCK_REALTIME, ¤t_time); - - // No new accepted events within timeout - if (((current_time.tv_sec - priv->last_fragment_timestamp.tv_sec) * 1000000 + (current_time.tv_nsec - priv->last_fragment_timestamp.tv_nsec) / 1000) > cfg->network_timeout) { - ready = ufo_roof_buffer_skip_to_ready(buf); - if (ready) { - // FIXME: shall we really reset timer here? - clock_gettime(CLOCK_REALTIME, &priv->last_fragment_timestamp); - } else { +/* + // FIXME: Wait or break. Not both. + do { + ready = ufo_roof_buffer_wait_dataset(buf, output_buffer, &seqid, cfg->network_timeout, &gerr); + if (gerr) roof_print_error(gerr); + + if (!ready) { + ready = ufo_roof_buffer_skip_to_ready(buf); + if (!ready) { priv->stop = TRUE; g_object_notify_by_pspec (G_OBJECT(task), properties[PROP_STOP]); - } + return FALSE; + } } - } - - -/* - printf("proc (%s - %u of %u) - channel: %i, packets: %i, first dataset: %i\n", ready?"yes":" no", buf->n_fragments[(buf->current_id)%buf->ring_size], buf->fragments_per_dataset, header->channel_id, header->n_packets, - (cfg->header_size >= sizeof(UfoRoofPacketHeader))?UFO_ROOF_PACKET_HEADER(data)->packet_id / (cfg->dataset_size / cfg->payload_size / cfg->n_streams):0); + } while (!ready); */ - return !ready; -} - -static gboolean -ufo_roof_build_task_generate (UfoTask *task, - UfoBuffer *output, - UfoRequisition *requisition) -{ - gboolean ready = FALSE; - gulong seqid; - GError *gerr = NULL; - GValue ival = G_VALUE_INIT; - GValue lval = G_VALUE_INIT; - - UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task); - UfoRoofConfig *cfg = priv->cfg; - UfoRoofBuffer *buf = priv->buf; - - void *output_buffer = ufo_buffer_get_host_array(output, NULL); - - if (priv->stop) - return FALSE; - - ready = ufo_roof_buffer_get_dataset(buf, output_buffer, &seqid, &gerr); - if (gerr) roof_print_error(gerr); + // FIXME: integrate fastwriter somewhere here? if (priv->build == BUILD_UFO) { switch (cfg->bit_depth) { @@ -318,8 +371,13 @@ ufo_roof_build_task_generate (UfoTask *task, g_object_notify_by_pspec (G_OBJECT(task), properties[PROP_STOP]); } + if (ready) priv->generated++; + if (((priv->number > 0)&&(priv->number <= 100))||((buf->current_id - priv->announced) > 1000)) { - printf("Processing dataset %li (%s), next: %u out of %u\n", buf->current_id + (ready?0:1), (ready?"ready ":"timeout "), buf->n_fragments[buf->current_id%buf->ring_size], buf->fragments_per_dataset); + if (ready) + printf("Processing dataset %li (ready ), next : %u out of %u\n", buf->current_id, buf->n_fragments[buf->current_id%buf->ring_size], buf->fragments_per_dataset); + else + printf("Skipping dataset %li (timeout), acquired: %u out of %u\n", buf->current_id + 1, buf->n_fragments[buf->current_id%buf->ring_size], buf->fragments_per_dataset); priv->announced = buf->current_id; } @@ -332,7 +390,7 @@ ufo_roof_build_task_set_property (GObject *object, const GValue *value, GParamSpec *pspec) { - UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (object); + RoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (object); switch (property_id) { case PROP_CONFIG: @@ -345,6 +403,13 @@ ufo_roof_build_task_set_property (GObject *object, case PROP_SIMULATE: priv->simulate = g_value_get_boolean (value); break; + case PROP_PATH: + if (priv->path) g_free(priv->path); + priv->path = g_value_dup_string(value); + break; + case PROP_FIRST: + priv->first_file_number = g_value_get_uint (value); + break; case PROP_NUMBER: priv->number = g_value_get_uint (value); break; @@ -367,11 +432,11 @@ ufo_roof_build_task_get_property (GObject *object, GValue *value, GParamSpec *pspec) { - UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (object); + RoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (object); switch (property_id) { case PROP_CONFIG: - g_value_set_string(value, priv->config); + g_value_set_string(value, priv->config); break; case PROP_STOP: g_value_set_boolean (value, priv->stop); @@ -379,6 +444,12 @@ ufo_roof_build_task_get_property (GObject *object, case PROP_SIMULATE: g_value_set_boolean (value, priv->simulate); break; + case PROP_PATH: + g_value_set_string(value, priv->path?priv->path:""); + break; + case PROP_FIRST: + g_value_set_uint (value, priv->first_file_number); + break; case PROP_NUMBER: g_value_set_uint (value, priv->number); break; @@ -394,17 +465,18 @@ ufo_roof_build_task_get_property (GObject *object, static void ufo_task_interface_init (UfoTaskIface *iface) { + roof_init(); + iface->setup = ufo_roof_build_task_setup; iface->get_num_inputs = ufo_roof_build_task_get_num_inputs; iface->get_num_dimensions = ufo_roof_build_task_get_num_dimensions; iface->get_mode = ufo_roof_build_task_get_mode; iface->get_requisition = ufo_roof_build_task_get_requisition; - iface->process = ufo_roof_build_task_process; iface->generate = ufo_roof_build_task_generate; } static void -ufo_roof_build_task_class_init (UfoRoofBuildTaskClass *klass) +ufo_roof_build_task_class_init (RoofBuildTaskClass *klass) { GObjectClass *oclass = G_OBJECT_CLASS (klass); @@ -426,7 +498,6 @@ ufo_roof_build_task_class_init (UfoRoofBuildTaskClass *klass) FALSE, G_PARAM_READWRITE); - properties[PROP_SIMULATE] = g_param_spec_boolean ("simulate", "Simulation mode", @@ -434,6 +505,20 @@ ufo_roof_build_task_class_init (UfoRoofBuildTaskClass *klass) FALSE, G_PARAM_READWRITE); + properties[PROP_PATH] = + g_param_spec_string ("path", + "Input files for simulation mode", + "Optional path to input files for simulation mode (parameter from configuration file is used if not specified)", + "", + G_PARAM_READWRITE); + + properties[PROP_FIRST] = + g_param_spec_uint ("first_file_number", + "Offset to the first read file", + "Offset to the first read file", + 0, G_MAXUINT, 0, + G_PARAM_READWRITE); + properties[PROP_NUMBER] = g_param_spec_uint("number", "Number of datasets to receive", @@ -452,11 +537,11 @@ ufo_roof_build_task_class_init (UfoRoofBuildTaskClass *klass) for (guint i = PROP_0 + 1; i < N_PROPERTIES; i++) g_object_class_install_property (oclass, i, properties[i]); - g_type_class_add_private (oclass, sizeof(UfoRoofBuildTaskPrivate)); + g_type_class_add_private (oclass, sizeof(RoofBuildTaskPrivate)); } static void -ufo_roof_build_task_init(UfoRoofBuildTask *self) +ufo_roof_build_task_init(RoofBuildTask *self) { self->priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE(self); } diff --git a/src/ufo-roof-config.c b/src/ufo-roof-config.c deleted file mode 100644 index 944ee31..0000000 --- a/src/ufo-roof-config.c +++ /dev/null @@ -1,243 +0,0 @@ -#include -#include - -#include - -#include - -#include "ufo-roof.h" -#include "ufo-roof-error.h" -#include "ufo-roof-config.h" - -#define roof_config_node_get_with_default(var, parent, type, name, default) do { \ - JsonNode *node = json_object_get_member(parent, name); \ - if (node) var = json_node_get_##type(node); \ - else var = default; \ - } while(0) - -#define roof_config_node_get_string_with_default(var, parent, name, default) do { \ - const gchar *str; \ - JsonNode *node = json_object_get_member(parent, name); \ - if (node) str = json_node_get_string(node); \ - else str = default; \ - if (var != str) { \ - if (var) g_free(var); \ - var = g_strdup(str); \ - } \ - } while(0) - -#define roof_config_node_get(var, parent, type, name) \ - roof_config_node_get_with_default(var, parent, type, name, var) - -#define roof_config_node_get_string(var, parent, name) \ - roof_config_node_get_string_with_default(var, parent, name, var) - - -typedef struct { - UfoRoofConfig cfg; - - JsonParser *parser; -} UfoRoofConfigPrivate; - -void ufo_roof_config_free(UfoRoofConfig *cfg) { - if (cfg) { - UfoRoofConfigPrivate *priv = (UfoRoofConfigPrivate*)cfg; - - if (priv->parser) - g_object_unref (priv->parser); - - free(cfg); - } -} - -UfoRoofConfig *ufo_roof_config_new(const char *config, UfoRoofConfigFlags flags, GError **error) { - UfoRoofConfigPrivate *priv; - UfoRoofConfig *cfg; - -// JsonNode *node; - JsonObject *root = NULL; - JsonObject *hardware = NULL; - JsonObject *geometry = NULL; - JsonObject *optics = NULL; - JsonObject *network = NULL; - JsonObject *performance = NULL; - JsonObject *simulation = NULL; - JsonObject *reconstruction = NULL; - JsonObject *data = NULL; - - GError *gerr = NULL; - - priv = (UfoRoofConfigPrivate*)malloc(sizeof(UfoRoofConfigPrivate)); - if (!priv) roof_new_error(error, "Can't allocate UfoRoofConfig"); - - memset(priv, 0, sizeof(UfoRoofConfigPrivate)); - - // Set defaults - cfg = &priv->cfg; - - cfg->roof_mode = FALSE; - cfg->n_planes = 2; - cfg->n_modules = 16; - cfg->channels_per_module = 16; - cfg->bit_depth = 16; - cfg->samples_per_rotation = 1000; - cfg->sample_rate = 0; - cfg->imaging_rate = 0; - - cfg->port = 52067; - cfg->n_streams = 1; - cfg->protocol = "udp"; - cfg->network_timeout = 10000000; - cfg->header_size = sizeof(UfoRoofPacketHeader); - cfg->payload_size = 0; - cfg->max_packet_size = 0; - cfg->max_packets = 100; - cfg->dataset_size = 0; - cfg->buffer_size = 2; - cfg->latency_buffers = 0; - cfg->drop_buffers = 0; - - - // Read configuration - priv->parser = json_parser_new_immutable (); - json_parser_load_from_file (priv->parser, config, &gerr); - - if (gerr != NULL) { - g_propagate_prefixed_error(error, gerr, "Error parsing JSON file (%s) with ROOF configuration: ", config); - ufo_roof_config_free(cfg); - return NULL; - } - - root = json_node_get_object (json_parser_get_root (priv->parser)); - - if (root) { - roof_config_node_get(hardware, root, object, "hardware"); - roof_config_node_get(geometry, root, object, "geometry"); - roof_config_node_get(optics, root, object, "optics"); - roof_config_node_get(network, root, object, "network"); - roof_config_node_get(reconstruction, root, object, "reconstruction"); - roof_config_node_get(performance, root, object, "performance"); - roof_config_node_get(data, root, object, "data"); - - if (flags&UFO_ROOF_CONFIG_SIMULATION) - roof_config_node_get(simulation, root, object, "simulation"); - } - - if (hardware) { - roof_config_node_get(cfg->n_planes, hardware, int, "planes"); - roof_config_node_get(cfg->n_modules, hardware, int, "modules"); - roof_config_node_get(cfg->channels_per_module, hardware, int, "channels_per_module"); - roof_config_node_get(cfg->bit_depth, hardware, int, "bit_depth"); - - roof_config_node_get(cfg->samples_per_rotation, hardware, int, "samples_per_rotation"); - roof_config_node_get(cfg->sample_rate, hardware, int, "sample_rate"); - roof_config_node_get(cfg->imaging_rate, hardware, int, "imaging_rate"); - - if ((cfg->sample_rate)||(cfg->imaging_rate)) { - if ((!cfg->sample_rate)||(!cfg->imaging_rate)||(cfg->sample_rate%cfg->imaging_rate)) { - ufo_roof_config_free(cfg); - roof_new_error(error, "Invalid sample (%u) and imaging (%u) rates are specified", cfg->sample_rate, cfg->imaging_rate); - } - - if ((json_object_get_member(hardware, "samples_per_rotation"))&&(cfg->samples_per_rotation != (cfg->sample_rate / cfg->imaging_rate))) { - ufo_roof_config_free(cfg); - roof_new_error(error, "The specified samples-per-rotation (%u) doesn't match sample/imaging rates (%u / %u)", cfg->samples_per_rotation, cfg->sample_rate, cfg->imaging_rate); - } - - cfg->samples_per_rotation = cfg->sample_rate / cfg->imaging_rate; - } - - if ((cfg->bit_depth%8)||(cfg->bit_depth > 32)) { - ufo_roof_config_free(cfg); - roof_new_error(error, "Invalid bit-depth (%u) is configured, only 8, 16, 24, 32 is currently supported", cfg->bit_depth); - } - - cfg->fan_projections = cfg->samples_per_rotation; - cfg->fan_bins = cfg->n_modules * cfg->channels_per_module; - - cfg->dataset_size = cfg->fan_projections * cfg->fan_bins * (cfg->bit_depth / 8); - cfg->n_streams = cfg->n_modules; - cfg->roof_mode = TRUE; - } - - if (network) { - roof_config_node_get(cfg->port, network, int, "port"); - roof_config_node_get(cfg->n_streams, network, int, "streams"); - - roof_config_node_get(cfg->payload_size, network, int, "payload_size"); - roof_config_node_get(cfg->header_size, network, int, "header_size"); - roof_config_node_get(cfg->max_packet_size, network, int, "max_packet_size"); - roof_config_node_get(cfg->dataset_size, network, int, "dataset_size"); - - if (!cfg->payload_size) { - ufo_roof_config_free(cfg); - roof_new_error(error, "Packet payload and header size must be set"); - } - - if ((cfg->header_size < sizeof(UfoRoofPacketHeader))&&(!strncmp(cfg->protocol, "udp", 3))) { - ufo_roof_config_free(cfg); - roof_new_error(error, "The header with packet id (%lu bytes) is expected for un-ordered protocols", sizeof(UfoRoofPacketHeader)); - } - - if (!cfg->dataset_size) - cfg->dataset_size = cfg->payload_size; - } - - if (simulation) { - roof_config_node_get(cfg->header_size, simulation, int, "header_size"); - - if (!cfg->payload_size) - cfg->payload_size = cfg->dataset_size; - } - - if (performance) { - roof_config_node_get(cfg->max_packets, performance, int, "packets_at_once"); - roof_config_node_get(cfg->buffer_size, performance, int, "buffer_size"); - roof_config_node_get(cfg->drop_buffers, performance, int, "drop_buffers"); - roof_config_node_get(cfg->latency_buffers, performance, int, "latency_buffers"); - } - - - // Check configuration consistency - guint fragments_per_dataset = cfg->dataset_size / cfg->payload_size; - guint fragments_per_stream = fragments_per_dataset / cfg->n_streams; - - // Dataset should be split in an integer number of network packets (we don't expect data from different datasets in one packet at the moment) - if ((cfg->dataset_size % cfg->payload_size)||(fragments_per_dataset%cfg->n_streams)) { - ufo_roof_config_free(cfg); - roof_new_error(error, "Inconsistent ROOF configuration: dataset_size=%u, packet_size=%u, data_streams=%u", cfg->dataset_size, cfg->payload_size, cfg->n_streams); - } - - // Packet should contain an integer number of complete projections (their parts provided by a single module) - if ((cfg->roof_mode)&&(cfg->payload_size % (cfg->channels_per_module * (cfg->bit_depth / 8)))) { - ufo_roof_config_free(cfg); - roof_new_error(error, "Inconsistent ROOF configuration: packet_size=%u, projection_size=%u (%u channels x %u bits)", cfg->payload_size, cfg->channels_per_module * (cfg->bit_depth / 8), cfg->channels_per_module, cfg->bit_depth); - } - - if (!cfg->max_packet_size) - cfg->max_packet_size = cfg->header_size + cfg->payload_size; - - if (hardware) { - if (cfg->n_modules != cfg->n_streams) { - ufo_roof_config_free(cfg); - roof_new_error(error, "Currently, number of ROOF modules (%u) is exepcted to be equal to number of independent data streams (%u)", cfg->n_modules, cfg->n_streams); - } - - if (cfg->dataset_size != (cfg->fan_projections * cfg->fan_bins * cfg->bit_depth / 8)) { - ufo_roof_config_free(cfg); - roof_new_error(error, "Specified dataset size (%u) does not match ROOF configuration (modules: %u, channels-per-module: %u, bit-depth: %u, samples-per-rotation: %u)", cfg->dataset_size, cfg->n_modules, cfg->channels_per_module, cfg->bit_depth, cfg->samples_per_rotation); - } - } - - if ((cfg->buffer_size * fragments_per_stream) < cfg->max_packets) { - cfg->max_packets = cfg->buffer_size * fragments_per_stream / 2; - } - - if (cfg->buffer_size < 4) { - cfg->drop_buffers = 0; - } else if (cfg->drop_buffers >= cfg->buffer_size) { - cfg->drop_buffers = cfg->buffer_size / 2; - } - - return cfg; -} diff --git a/src/ufo-roof-config.h b/src/ufo-roof-config.h deleted file mode 100644 index 8718381..0000000 --- a/src/ufo-roof-config.h +++ /dev/null @@ -1,64 +0,0 @@ -#ifndef __UFO_ROOF_CONFIG_H -#define __UFO_ROOF_CONFIG_H - -#include - -typedef struct { - // ROOF Hardware - gboolean roof_mode; // Indicates if ROOF is configured (1), otherwise only networking is implemented - guint n_planes; // Number of detector planes, ROOF module serves a ring segment from all planes in a round-robin fashion - guint n_modules; // Number of ROOF modules - guint channels_per_module; // Number of pixels in each module - guint samples_per_rotation; // Number of samples (projections) in a full fan sinogram; computed from sample_rate & image_rate if given - guint sample_rate; // Number of samples (projections) acquired per second, 0 - if unknown - guint imaging_rate; // Number of complete datasets (images) acquired per second, 0 - if unknown - guint bit_depth; // Number of bits per pixel (we currently support only multiples of 8) - - // Geometry - guint fan_projections; // Number of fan projections = samples_per_rotation - guint fan_bins; // Number of fan detectors = n_modules * channels_per_module - guint parallel_projections; - guint parallel_bins; -// guint detector_diameter; - - // Optics - - - // Network Server / Reader - gchar *protocol; // Protocols: tcp, udp, tcp6, udp6, ... - guint port; // First port - guint n_streams; // Number of independent data streams (expected on sequential ports), by default equal to number of ROOF modules - guint header_size; // Expected size of the packet header, for dgram protocols we need at least 32-bit sequence number. Defaults to uint32_t for udp* and 0 - otherwise - guint payload_size; // Expected size of TCP/UDP packet (without header) - guint dataset_size; // Size of a single dataset (image, sinogram, etc.). This is real size in bytes, excluding all technical headers used in communication protocol. Normally, it is computed based on ROOF hardware parameters. - - // Performance parameters - guint max_packets; // limits maximum number of packets which are read at once - guint max_packet_size; // payload_size + header_size + ... (we don't care if tail is variable length provided that the complete packet does not exceed max_packet_size bytes) - guint buffer_size; // How many datasets we can buffer. There is no sense to have more than 2 for odered protocols (default), but having larger number could help for UDP if significant order disturbances are expected - guint drop_buffers; // If we are slow and lost some buffers, we may drop more than minimally necessary to catch up. - guint latency_buffers; // We skip incomplete buffers if later (at least latency_buffer in future) dataset is already ready, 0 - never skip - guint network_timeout; // Maximum time (us) to wait for data on the socket - - - - -/* - guint planes_per_module; -*/ - - - - -} UfoRoofConfig; - -typedef enum { - UFO_ROOF_CONFIG_DEFAULT = 0, - UFO_ROOF_CONFIG_SIMULATION = 1 -} UfoRoofConfigFlags; - - -UfoRoofConfig *ufo_roof_config_new(const char *config, UfoRoofConfigFlags flags, GError **error); -void ufo_roof_config_free(UfoRoofConfig *cfg); - -#endif /* __UFO_ROOF_CONFIG_H */ diff --git a/src/ufo-roof-error.h b/src/ufo-roof-error.h deleted file mode 100644 index 5491f31..0000000 --- a/src/ufo-roof-error.h +++ /dev/null @@ -1,58 +0,0 @@ -#ifndef __UFO_ROOF_ERROR_H -#define __UFO_ROOF_ERROR_H - -#include - - -#define roof_print_error(error) do { \ - g_warning("%s", error->message); \ - g_error_free(error); \ - error = NULL; \ - } while (0) - -#define roof_set_error(error, type, ...) do { \ - if (error) g_set_error(error, UFO_TASK_ERROR, UFO_TASK_ERROR_##type, __VA_ARGS__); \ - } while (0) - -#define roof_error(error, type, ...) do { \ - if (error) g_set_error(error, UFO_TASK_ERROR, UFO_TASK_ERROR_##type, __VA_ARGS__); \ - return; \ - } while (0) - -#define roof_propagate_error(error, err, ...) do { \ - g_propagate_prefixed_error(error, err, __VA_ARGS__); \ - return; \ - } while (0) - -#define roof_error_with_retval(error, retval, type, ...) do { \ - if (error) g_set_error(error, UFO_TASK_ERROR, UFO_TASK_ERROR_##type, __VA_ARGS__); \ - return retval; \ - } while (0) - -#define roof_propagate_error_with_retval(error, retval, err, ...) do { \ - g_propagate_prefixed_error(error, err, __VA_ARGS__); \ - return retval; \ - } while (0) - - -#define roof_setup_error(error, ...) \ - roof_error(error, SETUP, __VA_ARGS__) - -#define roof_new_error(error, ...) \ - roof_error_with_retval(error, NULL, SETUP, __VA_ARGS__) - - -#define roof_network_error(error, ...) \ - roof_error(error, SETUP, __VA_ARGS__) - -#define root_set_network_error(error, ...) \ - roof_set_error(error, SETUP, __VA_ARGS__) - -#define roof_network_error_with_retval(error, retval, ...) \ - roof_error_with_retval(error, retval, SETUP, __VA_ARGS__) - -#define roof_memory_error(error, ...) \ - roof_error(error, SETUP, __VA_ARGS__) - - -#endif /* __UFO_ROOF_ERROR_H */ diff --git a/src/ufo-roof-read-file.c b/src/ufo-roof-read-file.c deleted file mode 100644 index 4ee11c6..0000000 --- a/src/ufo-roof-read-file.c +++ /dev/null @@ -1,84 +0,0 @@ -#include -#include -#include - -#include "glib.h" - -#include "ufo-roof.h" -#include "ufo-roof-read-file.h" - -typedef struct { - UfoRoofReadInterface iface; - - UfoRoofConfig *cfg; - - gchar *fname; - FILE *fd; -} UfoRoofReadFile; - -static void ufo_roof_read_file_free(UfoRoofReadInterface *iface) { - UfoRoofReadFile *reader = (UfoRoofReadFile*)iface; - - if (reader) { - if (reader->fname) - g_free(reader->fname); - - if (reader->fd) - fclose(reader->fd); - - free(reader); - } -} - -static guint ufo_roof_read_file(UfoRoofReadInterface *iface, uint8_t *buffers, GError **error) { - UfoRoofReadFile *reader = (UfoRoofReadFile*)iface; - UfoRoofConfig *cfg = reader->cfg; - - size_t bytes = 0; - size_t packet_size = cfg->header_size + cfg->payload_size; - size_t expected = cfg->max_packets * packet_size; - - while ((!feof(reader->fd))&&(!ferror(reader->fd))&&(bytes < expected)) { - size_t ret = fread(buffers + bytes, 1, expected - bytes, reader->fd); - bytes += ret; - } - - guint packets = bytes / packet_size; - - if (ferror(reader->fd)) { - roof_network_error_with_retval(error, 0, "read failed, error %i", ferror(reader->fd)); - } else if ((feof(reader->fd))&&(bytes % packet_size)) { - roof_network_error_with_retval(error, packets, "extra data in the end of input"); - } - - return packets; -} - - -UfoRoofReadInterface *ufo_roof_read_file_new(UfoRoofConfig *cfg, const char *path, guint file_id, GError **error) { - UfoRoofReadFile *reader = (UfoRoofReadFile*)calloc(1, sizeof(UfoRoofReadFile)); - if (!reader) roof_new_error(error, "Can't allocate UfoRoofReadFile"); - - // FIXME: Shall we jump if max_packet_size > header+payload (or will be extra data included in the data files)? Report error for now. - if ((cfg->header_size + cfg->payload_size) != cfg->max_packet_size) - roof_new_error(error, "packet_size (%u) should be equal to max_packet_size (%u) if UfoRoofReadFile is used", cfg->header_size + cfg->payload_size, cfg->max_packet_size); - - reader->cfg = cfg; - reader->iface.close = ufo_roof_read_file_free; - reader->iface.read =ufo_roof_read_file; - - reader->fname = g_strdup_printf(path, file_id); - if (!reader->fname) { - free(reader); - roof_new_error(error, "Can't build file name"); - } - - reader->fd = fopen(reader->fname, "rb"); - if (!reader->fd) { - g_free(reader->fname); - g_free(reader); - roof_new_error(error, "Can't open file %i at path %s", file_id, path); - } - - return (UfoRoofReadInterface*)reader; -} diff --git a/src/ufo-roof-read-file.h b/src/ufo-roof-read-file.h deleted file mode 100644 index 787b441..0000000 --- a/src/ufo-roof-read-file.h +++ /dev/null @@ -1,8 +0,0 @@ -#ifndef __UFO_ROOF_READ_FILE_H -#define __UFO_ROOF_READ_FILE_H - -#include "ufo-roof-read.h" - -UfoRoofReadInterface *ufo_roof_read_file_new(UfoRoofConfig *cfg, const char *path, guint file_id, GError **error); - -#endif diff --git a/src/ufo-roof-read-socket.c b/src/ufo-roof-read-socket.c deleted file mode 100644 index e8f7ce4..0000000 --- a/src/ufo-roof-read-socket.c +++ /dev/null @@ -1,134 +0,0 @@ -#define _GNU_SOURCE - -#include -#include -#include -#include -#include -#include - -#include "glib.h" - -#include "ufo-roof.h" -#include "ufo-roof-read-socket.h" - -typedef struct { - UfoRoofReadInterface iface; - - UfoRoofConfig *cfg; - int socket; -} UfoRoofReadSocket; - -static void ufo_roof_read_socket_free(UfoRoofReadInterface *iface) { - UfoRoofReadSocket *reader = (UfoRoofReadSocket*)iface; - - if (reader) { - if (reader->socket >= 0) - close(reader->socket); - free(reader); - } -} - -static guint ufo_roof_read_socket(UfoRoofReadInterface *iface, uint8_t *buf, GError **error) { - int packets; - struct timespec timeout_ts; - - UfoRoofReadSocket *reader = (UfoRoofReadSocket*)iface; - UfoRoofConfig *cfg = reader->cfg; - - struct mmsghdr msg[cfg->max_packets]; - struct iovec msgvec[cfg->max_packets]; - - timeout_ts.tv_sec = cfg->network_timeout / 1000000; - timeout_ts.tv_nsec = 1000 * (cfg->network_timeout % 1000000); - - // FIXME: Is it optimal? Auto-tune max_packets? Combine read & build? - memset(msg, 0, sizeof(msg)); - memset(msgvec, 0, sizeof(msgvec)); - for (guint i = 0; i < cfg->max_packets; i++) { - msgvec[i].iov_base = buf + i * cfg->max_packet_size; - msgvec[i].iov_len = cfg->max_packet_size; - msg[i].msg_hdr.msg_iov = &msgvec[i]; - msg[i].msg_hdr.msg_iovlen = 1; - } - -//retry: - // Timeout seems broken, see BUGS in 'recvmmsg' bugs page - packets = recvmmsg(reader->socket, msg, reader->cfg->max_packets, MSG_WAITFORONE, &timeout_ts); - if (packets < 0) roof_network_error_with_retval(error, 0, "recvmmsg failed, error %i", errno); - -/* - // FIXME: Shall we verify packets consistency here? We can check at least the sizes... - if ((packets == 1)&&(msg[0].msg_len < 16)) { - goto retry; - } -*/ - - return (guint)packets; -} - - -UfoRoofReadInterface *ufo_roof_read_socket_new(UfoRoofConfig *cfg, guint id, GError **error) { - int err; - int port = cfg->port + id; - char port_str[16]; - const char *addr_str = "0.0.0.0"; - struct addrinfo sockaddr_hints; - struct addrinfo *sockaddr_info; - - UfoRoofReadSocket *reader = (UfoRoofReadSocket*)calloc(1, sizeof(UfoRoofReadSocket)); - if (!reader) roof_new_error(error, "Can't allocate UfoRoofReadSocket"); - - reader->cfg = cfg; - reader->iface.close = ufo_roof_read_socket_free; - reader->iface.read =ufo_roof_read_socket; - - snprintf(port_str, sizeof(port_str), "%d", port); - port_str[sizeof(port_str) / sizeof(port_str[0]) - 1] = '\0'; - - memset(&sockaddr_hints, 0, sizeof(sockaddr_hints)); - if (!strncmp(cfg->protocol, "udp", 3)) { - sockaddr_hints.ai_family = AF_UNSPEC; - sockaddr_hints.ai_socktype = SOCK_DGRAM; - sockaddr_hints.ai_protocol = IPPROTO_UDP; - } else if (!strncmp(cfg->protocol, "tcp", 3)) { - sockaddr_hints.ai_family = AF_UNSPEC; - sockaddr_hints.ai_socktype = SOCK_STREAM; - sockaddr_hints.ai_protocol = IPPROTO_TCP; - } else { - roof_new_error(error, "Unsupported protocol (%s)", cfg->protocol); - } - - err = getaddrinfo(addr_str, port_str, &sockaddr_hints, &sockaddr_info); - if (err || !sockaddr_info) { - free(reader); - roof_new_error(error, "Invalid address (%s) or port (%s)", addr_str, port_str); - } - - reader->socket = socket(sockaddr_info->ai_family, sockaddr_info->ai_socktype | SOCK_CLOEXEC, sockaddr_info->ai_protocol); - if(reader->socket == -1) { - freeaddrinfo(sockaddr_info); - free(reader); - roof_new_error(error, "Can't create socket (%s) for address (%s) on port (%s)", cfg->protocol, addr_str, port_str); - } - - err = bind(reader->socket, sockaddr_info->ai_addr, sockaddr_info->ai_addrlen); - if(err != 0) { - freeaddrinfo(sockaddr_info); - close(reader->socket); - free(reader); - roof_new_error(error, "Error (%i) binding socket (%s) for address (%s) on port (%s)", err, cfg->protocol, addr_str, port_str); - } - -/* - // Send ping request to force initialization - char msg[4]; - addr_str = "192.168.34.83"; - getaddrinfo(addr_str, port_str, &sockaddr_hints, &sockaddr_info); - sendto(reader->socket, msg, sizeof(msg), 0, sockaddr_info->ai_addr, sockaddr_info->ai_addrlen); -*/ - - freeaddrinfo(sockaddr_info); - - return (UfoRoofReadInterface*)reader; -} diff --git a/src/ufo-roof-read-socket.h b/src/ufo-roof-read-socket.h deleted file mode 100644 index 74b0742..0000000 --- a/src/ufo-roof-read-socket.h +++ /dev/null @@ -1,8 +0,0 @@ -#ifndef __UFO_ROOF_READ_SOCKET_H -#define __UFO_ROOF_READ_SOCKET_H - -#include "ufo-roof-read.h" - -UfoRoofReadInterface *ufo_roof_read_socket_new(UfoRoofConfig *cfg, guint id, GError **error); - -#endif diff --git a/src/ufo-roof-read-task.c b/src/ufo-roof-read-task.c index 7d55b79..83b9627 100644 --- a/src/ufo-roof-read-task.c +++ b/src/ufo-roof-read-task.c @@ -35,7 +35,7 @@ struct _UfoRoofReadTaskPrivate { gchar *config; // ROOF configuration file name UfoRoofConfig *cfg; // Parsed ROOF parameters - UfoRoofReadInterface *reader; + UfoRoofReadInterface *reader[16]; guint id; // Reader ID (defince sequential port number) gboolean stop; // Flag requiring termination @@ -76,6 +76,7 @@ ufo_roof_read_task_setup (UfoTask *task, UfoResources *resources, GError **error) { + guint i; GError *gerr = NULL; UfoRoofReadTaskPrivate *priv = UFO_ROOF_READ_TASK_GET_PRIVATE (task); @@ -91,17 +92,19 @@ ufo_roof_read_task_setup (UfoTask *task, roof_setup_error(error, "Specified Stream ID is %u, but only %u data streams is configured", priv->id, priv->cfg->n_streams); // Start actual reader - if (priv->simulate) { - if (!priv->path) - roof_setup_error(error, "Path to simulated data should be specified"); - - priv->reader = ufo_roof_read_file_new(priv->cfg, priv->path, priv->id + priv->first_file_number, &gerr); - } else - priv->reader = ufo_roof_read_socket_new(priv->cfg, priv->id, &gerr); - if (!priv->reader) - roof_propagate_error(error, gerr, "roof_read_new: "); + for (i = 0; (i < priv->cfg->sockets_per_thread)&&((priv->id * priv->cfg->sockets_per_thread + i) < priv->cfg->n_streams); i++) { + if (priv->simulate) { + if (!priv->path) + roof_setup_error(error, "Path to simulated data should be specified"); + + priv->reader[i] = ufo_roof_read_file_new(priv->cfg, priv->path, priv->id * priv->cfg->sockets_per_thread + i + priv->first_file_number, &gerr); + } else + priv->reader[i] = ufo_roof_read_socket_new(priv->cfg, priv->id * priv->cfg->sockets_per_thread + i, &gerr); + if (!priv->reader[i]) + roof_propagate_error(error, gerr, "roof_read_new: "); + } } @@ -110,8 +113,10 @@ ufo_roof_read_task_finalize (GObject *object) { UfoRoofReadTaskPrivate *priv = UFO_ROOF_READ_TASK_GET_PRIVATE (object); - if (priv->reader) { - priv->reader->close(priv->reader); + for (guint i = 0; i < priv->cfg->sockets_per_thread; i++) { + if (priv->reader[i]) { + priv->reader[i]->close(priv->reader[i]); + } } if (priv->cfg) { @@ -180,16 +185,47 @@ ufo_roof_read_task_generate (UfoTask *task, void *output_buffer = ufo_buffer_get_host_array(output, NULL); UfoRoofPacketBlockHeader *header = UFO_ROOF_PACKET_BLOCK_HEADER(output_buffer, cfg); + uint64_t current_id[16] = {0}; + uint64_t grabbed[16] = {0}; + + static uint64_t errors = 0; +retry: if (priv->stop) return FALSE; - guint packets = priv->reader->read(priv->reader, output_buffer, &gerr); + for (guint sid = 0; (sid < cfg->sockets_per_thread)&&((priv->id * cfg->sockets_per_thread + sid) < priv->cfg->n_streams); sid++) { + + guint packets = priv->reader[sid]->read(priv->reader[sid], output_buffer, &gerr); if (gerr) { g_warning("Error reciving data: %s", gerr->message); g_error_free(gerr); return FALSE; } + const uint8_t *fragment = output_buffer; + for (guint i = 0; i < packets; i++) { + guint64 packet_id = 0; + + // Otherwise considered consecutive and handled by the buffer + if (cfg->header_size >= sizeof(UfoRoofPacketHeader)) { + UfoRoofPacketHeader *pheader = UFO_ROOF_PACKET_HEADER(fragment); + packet_id = be64toh(pheader->packet_id) + 1; + } + + if ((current_id[sid])&&(current_id[sid] + 1 != packet_id)) { + printf("Channel %i(%i): =======> Missing %lu packets, expecting %lu but got %lu (N %i from total packets in pack %u)\n", priv->id * cfg->sockets_per_thread + sid, sid, packet_id - current_id[sid] - 1, current_id[sid] + 1, packet_id, i, packets); + //if (++errors > 1000) exit(1); + } + current_id[sid] = packet_id; + grabbed[sid]++; + if ((grabbed[sid]%1000000)==0) printf("Channel %i(%i): Grabbed %lu Mpackets\n", priv->id * cfg->sockets_per_thread + sid, sid, grabbed[sid]/1000000); + + fragment += cfg->max_packet_size; + } + } + + goto retry; + #ifdef UFO_ROOF_DEBUG // Store first received packet on each channel... static int debug = 1; @@ -206,12 +242,12 @@ ufo_roof_read_task_generate (UfoTask *task, #endif /* UFO_ROOF_DEBUG */ // FIXME: End of data (shall we restart in the network case?) - if (!packets) - return FALSE; +// if (!packets) +// return FALSE; // Shall I use UFO metadata (ufo_buffer_set_metadata) insead? header->channel_id = priv->id; - header->n_packets = packets; +// header->n_packets = packets; return TRUE; } diff --git a/src/ufo-roof-read.h b/src/ufo-roof-read.h deleted file mode 100644 index 5f0853c..0000000 --- a/src/ufo-roof-read.h +++ /dev/null @@ -1,17 +0,0 @@ -#ifndef __UFO_ROOF_READ_H -#define __UFO_ROOF_READ_H - -#include "ufo-roof-config.h" - -typedef struct _UfoRoofReadInterface UfoRoofReadInterface; - -typedef guint (*UfoRoofReaderRead)(UfoRoofReadInterface *reader, uint8_t *buf, GError **error); -typedef void (*UfoRoofReaderClose)(UfoRoofReadInterface *reader); - -struct _UfoRoofReadInterface { - UfoRoofReaderRead read; - UfoRoofReaderClose close; -}; - - -#endif /* __UFO_ROOF_READ_H */ diff --git a/src/ufo-roof.h b/src/ufo-roof.h index 23f8429..ea422e2 100644 --- a/src/ufo-roof.h +++ b/src/ufo-roof.h @@ -1,21 +1,21 @@ -#ifndef __UFO_ROOF_H -#define __UFO_ROOF_H +#ifndef __ROOF_H +#define __ROOF_H #include "ufo-roof-config.h" #include "ufo-roof-error.h" -//#define UFO_ROOF_DEBUG -#define UFO_ROOF_PACKET_HEADER(buf) ((UfoRoofPacketHeader*)(buf)) -#define UFO_ROOF_PACKET_BLOCK_HEADER(buf, cfg) ((UfoRoofPacketBlockHeader*)(((uint8_t*)buf) + cfg->max_packets * cfg->max_packet_size)) +//#define ROOF_DEBUG +#define ROOF_PACKET_HEADER(buf) ((RoofPacketHeader*)(buf)) +#define ROOF_PACKET_BLOCK_HEADER(buf, cfg) ((RoofPacketBlockHeader*)(((uint8_t*)buf) + cfg->max_packets * cfg->max_packet_size)) typedef struct { uint64_t packet_id; // Sequential Packet ID (numbered from 0) -} UfoRoofPacketHeader; +} RoofPacketHeader; typedef struct { uint32_t channel_id; // Specifies channel on which the data were received (numbered from 0) uint32_t n_packets; // Number of packets -} UfoRoofPacketBlockHeader; +} RoofPacketBlockHeader; -#endif /* __UFO_ROOF_H */ +#endif /* __ROOF_H */ -- cgit v1.2.3