summaryrefslogtreecommitdiffstats
path: root/src/save
diff options
context:
space:
mode:
Diffstat (limited to 'src/save')
-rw-r--r--src/save/memcpy.c344
-rw-r--r--src/save/memcpy.h63
-rw-r--r--src/save/ufo-roof-buffer-build-task.c474
-rw-r--r--src/save/ufo-roof-read-thread.c155
-rw-r--r--src/save/ufo-roof-read-thread.h23
-rw-r--r--src/save/ufo-roof-read.c123
-rw-r--r--src/save/ufo-roof-read.h61
7 files changed, 1243 insertions, 0 deletions
diff --git a/src/save/memcpy.c b/src/save/memcpy.c
new file mode 100644
index 0000000..5c29d01
--- /dev/null
+++ b/src/save/memcpy.c
@@ -0,0 +1,344 @@
+/********************************************************************
+ ** File: memcpy.c
+ **
+ ** Copyright (C) 1999-2010 Daniel Vik
+ **
+ ** This software is provided 'as-is', without any express or implied
+ ** warranty. In no event will the authors be held liable for any
+ ** damages arising from the use of this software.
+ ** Permission is granted to anyone to use this software for any
+ ** purpose, including commercial applications, and to alter it and
+ ** redistribute it freely, subject to the following restrictions:
+ **
+ ** 1. The origin of this software must not be misrepresented; you
+ ** must not claim that you wrote the original software. If you
+ ** use this software in a product, an acknowledgment in the
+ ** use this software in a product, an acknowledgment in the
+ ** product documentation would be appreciated but is not
+ ** required.
+ **
+ ** 2. Altered source versions must be plainly marked as such, and
+ ** must not be misrepresented as being the original software.
+ **
+ ** 3. This notice may not be removed or altered from any source
+ ** distribution.
+ **
+ **
+ ** Description: Implementation of the standard library function memcpy.
+ ** This implementation of memcpy() is ANSI-C89 compatible.
+ **
+ ** The following configuration options can be set:
+ **
+ ** LITTLE_ENDIAN - Uses processor with little endian
+ ** addressing. Default is big endian.
+ **
+ ** PRE_INC_PTRS - Use pre increment of pointers.
+ ** Default is post increment of
+ ** pointers.
+ **
+ ** INDEXED_COPY - Copying data using array indexing.
+ ** Using this option, disables the
+ ** PRE_INC_PTRS option.
+ **
+ ** MEMCPY_64BIT - Compiles memcpy for 64 bit
+ ** architectures
+ **
+ **
+ ** Best Settings:
+ **
+ ** Intel x86: LITTLE_ENDIAN and INDEXED_COPY
+ **
+ *******************************************************************/
+
+
+
+/********************************************************************
+ ** Configuration definitions.
+ *******************************************************************/
+
+#define LITTLE_ENDIAN
+#define INDEXED_COPY
+
+
+/********************************************************************
+ ** Includes for size_t definition
+ *******************************************************************/
+
+#include <stddef.h>
+
+
+/********************************************************************
+ ** Typedefs
+ *******************************************************************/
+
+typedef unsigned char UInt8;
+typedef unsigned short UInt16;
+typedef unsigned int UInt32;
+#ifdef _WIN32
+typedef unsigned __int64 UInt64;
+#else
+typedef unsigned long long UInt64;
+#endif
+
+#ifdef MEMCPY_64BIT
+typedef UInt64 UIntN;
+#define TYPE_WIDTH 8L
+#else
+typedef UInt32 UIntN;
+#define TYPE_WIDTH 4L
+#endif
+
+
+/********************************************************************
+ ** Remove definitions when INDEXED_COPY is defined.
+ *******************************************************************/
+
+#if defined (INDEXED_COPY)
+#if defined (PRE_INC_PTRS)
+#undef PRE_INC_PTRS
+#endif /*PRE_INC_PTRS*/
+#endif /*INDEXED_COPY*/
+
+
+
+/********************************************************************
+ ** Definitions for pre and post increment of pointers.
+ *******************************************************************/
+
+#if defined (PRE_INC_PTRS)
+
+#define START_VAL(x) (x)--
+#define INC_VAL(x) *++(x)
+#define CAST_TO_U8(p, o) ((UInt8*)p + o + TYPE_WIDTH)
+#define WHILE_DEST_BREAK (TYPE_WIDTH - 1)
+#define PRE_LOOP_ADJUST - (TYPE_WIDTH - 1)
+#define PRE_SWITCH_ADJUST + 1
+
+#else /*PRE_INC_PTRS*/
+
+#define START_VAL(x)
+#define INC_VAL(x) *(x)++
+#define CAST_TO_U8(p, o) ((UInt8*)p + o)
+#define WHILE_DEST_BREAK 0
+#define PRE_LOOP_ADJUST
+#define PRE_SWITCH_ADJUST
+
+#endif /*PRE_INC_PTRS*/
+
+
+
+/********************************************************************
+ ** Definitions for endians
+ *******************************************************************/
+
+#if defined (LITTLE_ENDIAN)
+
+#define SHL >>
+#define SHR <<
+
+#else /* LITTLE_ENDIAN */
+
+#define SHL <<
+#define SHR >>
+
+#endif /* LITTLE_ENDIAN */
+
+
+
+/********************************************************************
+ ** Macros for copying words of different alignment.
+ ** Uses incremening pointers.
+ *******************************************************************/
+
+#define CP_INCR() { \
+ INC_VAL(dstN) = INC_VAL(srcN); \
+}
+
+#define CP_INCR_SH(shl, shr) { \
+ dstWord = srcWord SHL shl; \
+ srcWord = INC_VAL(srcN); \
+ dstWord |= srcWord SHR shr; \
+ INC_VAL(dstN) = dstWord; \
+}
+
+
+
+/********************************************************************
+ ** Macros for copying words of different alignment.
+ ** Uses array indexes.
+ *******************************************************************/
+
+#define CP_INDEX(idx) { \
+ dstN[idx] = srcN[idx]; \
+}
+
+#define CP_INDEX_SH(x, shl, shr) { \
+ dstWord = srcWord SHL shl; \
+ srcWord = srcN[x]; \
+ dstWord |= srcWord SHR shr; \
+ dstN[x] = dstWord; \
+}
+
+
+
+/********************************************************************
+ ** Macros for copying words of different alignment.
+ ** Uses incremening pointers or array indexes depending on
+ ** configuration.
+ *******************************************************************/
+
+#if defined (INDEXED_COPY)
+
+#define CP(idx) CP_INDEX(idx)
+#define CP_SH(idx, shl, shr) CP_INDEX_SH(idx, shl, shr)
+
+#define INC_INDEX(p, o) ((p) += (o))
+
+#else /* INDEXED_COPY */
+
+#define CP(idx) CP_INCR()
+#define CP_SH(idx, shl, shr) CP_INCR_SH(shl, shr)
+
+#define INC_INDEX(p, o)
+
+#endif /* INDEXED_COPY */
+
+
+#define COPY_REMAINING(count) { \
+ START_VAL(dst8); \
+ START_VAL(src8); \
+ \
+ switch (count) { \
+ case 7: INC_VAL(dst8) = INC_VAL(src8); \
+ case 6: INC_VAL(dst8) = INC_VAL(src8); \
+ case 5: INC_VAL(dst8) = INC_VAL(src8); \
+ case 4: INC_VAL(dst8) = INC_VAL(src8); \
+ case 3: INC_VAL(dst8) = INC_VAL(src8); \
+ case 2: INC_VAL(dst8) = INC_VAL(src8); \
+ case 1: INC_VAL(dst8) = INC_VAL(src8); \
+ case 0: \
+ default: break; \
+ } \
+}
+
+#define COPY_NO_SHIFT() { \
+ UIntN* dstN = (UIntN*)(dst8 PRE_LOOP_ADJUST); \
+ UIntN* srcN = (UIntN*)(src8 PRE_LOOP_ADJUST); \
+ size_t length = count / TYPE_WIDTH; \
+ \
+ while (length & 7) { \
+ CP_INCR(); \
+ length--; \
+ } \
+ \
+ length /= 8; \
+ \
+ while (length--) { \
+ CP(0); \
+ CP(1); \
+ CP(2); \
+ CP(3); \
+ CP(4); \
+ CP(5); \
+ CP(6); \
+ CP(7); \
+ \
+ INC_INDEX(dstN, 8); \
+ INC_INDEX(srcN, 8); \
+ } \
+ \
+ src8 = CAST_TO_U8(srcN, 0); \
+ dst8 = CAST_TO_U8(dstN, 0); \
+ \
+ COPY_REMAINING(count & (TYPE_WIDTH - 1)); \
+ \
+ return dest; \
+}
+
+
+
+#define COPY_SHIFT(shift) { \
+ UIntN* dstN = (UIntN*)((((UIntN)dst8) PRE_LOOP_ADJUST) & \
+ ~(TYPE_WIDTH - 1)); \
+ UIntN* srcN = (UIntN*)((((UIntN)src8) PRE_LOOP_ADJUST) & \
+ ~(TYPE_WIDTH - 1)); \
+ size_t length = count / TYPE_WIDTH; \
+ UIntN srcWord = INC_VAL(srcN); \
+ UIntN dstWord; \
+ \
+ while (length & 7) { \
+ CP_INCR_SH(8 * shift, 8 * (TYPE_WIDTH - shift)); \
+ length--; \
+ } \
+ \
+ length /= 8; \
+ \
+ while (length--) { \
+ CP_SH(0, 8 * shift, 8 * (TYPE_WIDTH - shift)); \
+ CP_SH(1, 8 * shift, 8 * (TYPE_WIDTH - shift)); \
+ CP_SH(2, 8 * shift, 8 * (TYPE_WIDTH - shift)); \
+ CP_SH(3, 8 * shift, 8 * (TYPE_WIDTH - shift)); \
+ CP_SH(4, 8 * shift, 8 * (TYPE_WIDTH - shift)); \
+ CP_SH(5, 8 * shift, 8 * (TYPE_WIDTH - shift)); \
+ CP_SH(6, 8 * shift, 8 * (TYPE_WIDTH - shift)); \
+ CP_SH(7, 8 * shift, 8 * (TYPE_WIDTH - shift)); \
+ \
+ INC_INDEX(dstN, 8); \
+ INC_INDEX(srcN, 8); \
+ } \
+ \
+ src8 = CAST_TO_U8(srcN, (shift - TYPE_WIDTH)); \
+ dst8 = CAST_TO_U8(dstN, 0); \
+ \
+ COPY_REMAINING(count & (TYPE_WIDTH - 1)); \
+ \
+ return dest; \
+}
+
+
+/********************************************************************
+ **
+ ** void *memcpy(void *dest, const void *src, size_t count)
+ **
+ ** Args: dest - pointer to destination buffer
+ ** src - pointer to source buffer
+ ** count - number of bytes to copy
+ **
+ ** Return: A pointer to destination buffer
+ **
+ ** Purpose: Copies count bytes from src to dest.
+ ** No overlap check is performed.
+ **
+ *******************************************************************/
+
+void *fast_memcpy(void *dest, const void *src, size_t count)
+{
+ UInt8* dst8 = (UInt8*)dest;
+ UInt8* src8 = (UInt8*)src;
+
+ if (count < 8) {
+ COPY_REMAINING(count);
+ return dest;
+ }
+
+ START_VAL(dst8);
+ START_VAL(src8);
+
+ while (((UIntN)dst8 & (TYPE_WIDTH - 1)) != WHILE_DEST_BREAK) {
+ INC_VAL(dst8) = INC_VAL(src8);
+ count--;
+ }
+
+ switch ((((UIntN)src8) PRE_SWITCH_ADJUST) & (TYPE_WIDTH - 1)) {
+ case 0: COPY_NO_SHIFT(); break;
+ case 1: COPY_SHIFT(1); break;
+ case 2: COPY_SHIFT(2); break;
+ case 3: COPY_SHIFT(3); break;
+#if TYPE_WIDTH > 4
+ case 4: COPY_SHIFT(4); break;
+ case 5: COPY_SHIFT(5); break;
+ case 6: COPY_SHIFT(6); break;
+ case 7: COPY_SHIFT(7); break;
+#endif
+ }
+}
diff --git a/src/save/memcpy.h b/src/save/memcpy.h
new file mode 100644
index 0000000..0714823
--- /dev/null
+++ b/src/save/memcpy.h
@@ -0,0 +1,63 @@
+/********************************************************************
+ ** File: memcpy.h
+ **
+ ** Copyright (C) 2005 Daniel Vik
+ **
+ ** This software is provided 'as-is', without any express or implied
+ ** warranty. In no event will the authors be held liable for any
+ ** damages arising from the use of this software.
+ ** Permission is granted to anyone to use this software for any
+ ** purpose, including commercial applications, and to alter it and
+ ** redistribute it freely, subject to the following restrictions:
+ **
+ ** 1. The origin of this software must not be misrepresented; you
+ ** must not claim that you wrote the original software. If you
+ ** use this software in a product, an acknowledgment in the
+ ** use this software in a product, an acknowledgment in the
+ ** product documentation would be appreciated but is not
+ ** required.
+ **
+ ** 2. Altered source versions must be plainly marked as such, and
+ ** must not be misrepresented as being the original software.
+ **
+ ** 3. This notice may not be removed or altered from any source
+ ** distribution.
+ **
+ **
+ ** Description: Implementation of the standard library function memcpy.
+ ** This implementation of memcpy() is ANSI-C89 compatible.
+ **
+ *******************************************************************/
+
+
+/********************************************************************
+ ** Includes for size_t definition
+ *******************************************************************/
+
+#include <stddef.h>
+
+
+/********************************************************************
+ **
+ ** void *memcpy(void *dest, const void *src, size_t count)
+ **
+ ** Args: dest - pointer to destination buffer
+ ** src - pointer to source buffer
+ ** count - number of bytes to copy
+ **
+ ** Return: A pointer to destination buffer
+ **
+ ** Purpose: Copies count bytes from src to dest. No overlap check
+ ** is performed.
+ **
+ *******************************************************************/
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+void *fast_memcpy(void *dest, const void *src, size_t count);
+
+#ifdef __cplusplus
+}
+#endif
diff --git a/src/save/ufo-roof-buffer-build-task.c b/src/save/ufo-roof-buffer-build-task.c
new file mode 100644
index 0000000..bdb7a7d
--- /dev/null
+++ b/src/save/ufo-roof-buffer-build-task.c
@@ -0,0 +1,474 @@
+/*
+ * Copyright (C) 2011-2015 Karlsruhe Institute of Technology
+ *
+ * This file is part of Ufo.
+ *
+ * This library is free software: you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation, either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include <stdio.h>
+#include <endian.h>
+#include <threads.h>
+
+#ifdef __APPLE__
+#include <OpenCL/cl.h>
+#else
+#include <CL/cl.h>
+#endif
+
+#include "ufo-roof.h"
+#include "ufo-roof-buffer.h"
+#include "ufo-roof-build-task.h"
+
+typedef enum {
+ BUILD_AUTO = 0,
+ BUILD_RAW,
+ BUILD_SINO,
+ BUILD_UFO
+} BuildType;
+
+struct _UfoRoofBuildTaskPrivate {
+ gchar *config; // ROOF configuration file name
+ UfoRoofConfig *cfg; // Parsed ROOF parameters
+ UfoRoofBuffer *buf; // Ring buffer for incomming UDP packet
+ UfoRoofReadInterface; *rdi; // Reader interfaces, one per socket (no threading)
+ UfoRoofRead *rd; // Threading interface
+
+ gchar *path; // UFO file path for simulation mode
+ guint first_file_number; // Number of a first simulated file (0 or 1)
+
+ BuildType build; // What dataset do we build: ROOF sinogram or raw network data
+ guint number; // Number of datasets to read
+ gboolean stop; // Stop flag
+ gboolean simulate; // Indicates if we are running in network or simulation modes
+
+ guint64 announced; // For debugging
+ guint64 generated; // Total number for control
+
+ struct timespec last_fragment_timestamp;
+};
+
+static void ufo_task_interface_init (UfoTaskIface *iface);
+
+G_DEFINE_TYPE_WITH_CODE (UfoRoofBuildTask, ufo_roof_build_task, UFO_TYPE_TASK_NODE,
+ G_IMPLEMENT_INTERFACE (UFO_TYPE_TASK,
+ ufo_task_interface_init))
+
+#define UFO_ROOF_BUILD_TASK_GET_PRIVATE(obj) (G_TYPE_INSTANCE_GET_PRIVATE((obj), UFO_TYPE_ROOF_BUILD_TASK, UfoRoofBuildTaskPrivate))
+
+
+
+static GEnumValue build_values[] = {
+ { BUILD_AUTO, "BUILD_AUTO", "auto" },
+ { BUILD_RAW, "BUILD_RAW", "raw" },
+ { BUILD_SINO, "BUILD_SINO", "sino" },
+ { BUILD_UFO, "BUILD_UFO", "ufo" },
+ { 0, NULL, NULL}
+};
+
+enum {
+ PROP_0,
+ PROP_STOP,
+ PROP_SIMULATE,
+ PROP_PATH,
+ PROP_FIRST,
+ PROP_NUMBER,
+ PROP_BUILD,
+ PROP_CONFIG,
+ N_PROPERTIES
+};
+
+static GParamSpec *properties[N_PROPERTIES] = { NULL, };
+
+UfoNode *
+ufo_roof_build_task_new (void)
+{
+ return UFO_NODE (g_object_new (UFO_TYPE_ROOF_BUILD_TASK, NULL));
+}
+
+static void
+ufo_roof_build_task_setup (UfoTask *task,
+ UfoResources *resources,
+ GError **error)
+{
+ guint i;
+ GError *gerr = NULL;
+
+ UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task);
+
+ if (!priv->config)
+ roof_setup_error(error, "ROOF configuration is not specified");
+
+ priv->cfg = ufo_roof_config_new(priv->config, priv->simulate?UFO_ROOF_CONFIG_SIMULATION:UFO_ROOF_CONFIG_DEFAULT, &gerr);
+ if (!priv->cfg)
+ roof_propagate_error(error, gerr, "roof-build-setup: ");
+
+ for (i = 0; i < priv->cfg->n_streams; i++) {
+ if (priv->simulate) {
+ if (!priv->path)
+ roof_setup_error(error, "Path to simulated data should be specified");
+
+ priv->rdi[i] = ufo_roof_read_file_new(priv->cfg, priv->path, priv->id * priv->cfg->sockets_per_thread + i + priv->first_file_number, &gerr);
+ } else
+ priv->rdi[i] = ufo_roof_read_socket_new(priv->cfg, priv->id * priv->cfg->sockets_per_thread + i, &gerr);
+
+ if (!priv->rdi[i])
+ roof_propagate_error(error, gerr, "roof_read_interface_new: ");
+ }
+
+ if (priv->build == BUILD_AUTO) {
+ if (priv->cfg->roof_mode) priv->build = BUILD_SINO;
+ else priv->build = BUILD_RAW;
+ g_object_notify_by_pspec (G_OBJECT(task), properties[PROP_BUILD]);
+ }
+
+ priv->buf = ufo_roof_buffer_new(priv->cfg, (priv->build == BUILD_RAW)?1:2, priv->number, &gerr);
+ if ((gerr)||(!priv->buf))
+ roof_propagate_error(error, gerr, "roof_buffer_new: ");
+
+ priv->rd = ufo_roof_read_new(priv->cfg, priv->rdi, priv->buf, &gerr);
+ if (gerr)
+ roof_propagate_error(error, gerr, "roof_read_new: ");
+
+ clock_gettime(CLOCK_REALTIME, &priv->last_fragment_timestamp);
+}
+
+static void
+ufo_roof_build_task_finalize (GObject *object)
+{
+ UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (object);
+
+ if (priv->rd) {
+ ufo_roof_read_free(priv->rd);
+ priv->rd = NULL;
+ }
+
+ if (priv->buf) {
+ ufo_roof_buffer_free(priv->buf);
+ priv->buf = NULL;
+ }
+
+ if (priv->cfg) {
+ ufo_roof_config_free(priv->cfg);
+ priv->cfg = NULL;
+ }
+
+ if (priv->config) {
+ g_free(priv->config);
+ priv->config = NULL;
+ }
+
+ if (priv->path) {
+ g_free(priv->path);
+ priv->path = NULL;
+ }
+
+ G_OBJECT_CLASS (ufo_roof_build_task_parent_class)->finalize (object);
+}
+
+
+
+static void
+ufo_roof_build_task_get_requisition (UfoTask *task,
+ UfoBuffer **inputs,
+ UfoRequisition *requisition,
+ GError **error)
+{
+ UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task);
+
+ // FIXME: Can we handle data types more elegant?
+ if (priv->build == BUILD_RAW) {
+ guint bytes = priv->cfg->dataset_size;
+ requisition->n_dims = 1;
+ requisition->dims[0] = bytes / sizeof(float) + ((bytes%sizeof(float))?1:0);
+ } else if (priv->build == BUILD_SINO) {
+ guint bytes = priv->cfg->fan_bins * priv->cfg->bit_depth / 8;
+ requisition->n_dims = 2;
+ requisition->dims[0] = bytes / sizeof(float) + ((bytes%sizeof(float))?1:0);
+ requisition->dims[1] = priv->cfg->fan_projections;
+ } else if (priv->build == BUILD_UFO) {
+ requisition->n_dims = 2;
+ requisition->dims[0] = priv->cfg->fan_bins;
+ requisition->dims[1] = priv->cfg->fan_projections;
+ }
+}
+
+static guint
+ufo_roof_build_task_get_num_inputs (UfoTask *task)
+{
+ return 1;
+}
+
+static guint
+ufo_roof_build_task_get_num_dimensions (UfoTask *task,
+ guint input)
+{
+ return 1;
+}
+
+static UfoTaskMode
+ufo_roof_build_task_get_mode (UfoTask *task)
+{
+ return UFO_TASK_MODE_CPU | UFO_TASK_MODE_GENERATOR;
+}
+
+
+static gboolean
+ufo_roof_build_task_generate (UfoTask *task,
+ UfoBuffer *output,
+ UfoRequisition *requisition)
+{
+ gboolean ready = FALSE;
+ gulong seqid;
+ GError *gerr = NULL;
+ GValue ival = G_VALUE_INIT;
+ GValue lval = G_VALUE_INIT;
+
+ UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task);
+ UfoRoofConfig *cfg = priv->cfg;
+ UfoRoofBuffer *buf = priv->buf;
+
+ void *output_buffer = ufo_buffer_get_host_array(output, NULL);
+
+ if (priv->stop)
+ return FALSE;
+
+ // FIXME: Wait or break. Not both.
+ do {
+ ready = ufo_roof_buffer_wait_dataset(buf, output_buffer, &seqid, cfg->network_timeout, &gerr);
+ if (gerr) roof_print_error(gerr);
+
+ if (!ready) {
+ ready = ufo_roof_buffer_skip_to_ready(buf);
+ if (!ready) {
+ priv->stop = TRUE;
+ g_object_notify_by_pspec (G_OBJECT(task), properties[PROP_STOP]);
+ return FALSE;
+ }
+ }
+ } while (!ready);
+
+ // FIXME: integrate fastwriter somewhere here?
+
+ if (priv->build == BUILD_UFO) {
+ switch (cfg->bit_depth) {
+ case 8:
+ ufo_buffer_convert(output, UFO_BUFFER_DEPTH_8U);
+ break;
+ case 16:
+ ufo_buffer_convert(output, UFO_BUFFER_DEPTH_16U);
+ break;
+ case 32:
+ ufo_buffer_convert(output, UFO_BUFFER_DEPTH_32U);
+ break;
+ default:
+ printf("Usupported bit-depth %u\n", cfg->bit_depth);
+ }
+ }
+
+ // Metadata: plane and sequential number within the plane
+ g_value_init (&ival, G_TYPE_UINT);
+ g_value_init (&lval, G_TYPE_ULONG);
+ if (priv->build != BUILD_UFO) {
+ g_value_set_uint (&ival, cfg->bit_depth);
+ ufo_buffer_set_metadata (output, "bpp", &ival);
+ }
+ g_value_set_uint (&ival, 1 + seqid % cfg->n_planes);
+ ufo_buffer_set_metadata (output, "plane", &ival);
+ g_value_set_ulong (&lval, seqid / cfg->n_planes);
+ ufo_buffer_set_metadata (output, "plane_id", &lval);
+ g_value_set_ulong (&lval, seqid);
+ ufo_buffer_set_metadata (output, "seqid", &lval);
+ g_value_unset(&lval);
+ g_value_unset(&ival);
+
+ // FIXME: Or shall we start from counting from the ID of the first registerd dataset
+ if ((priv->number)&&(buf->current_id >= priv->number)) {
+// printf("%lu datasets processed, stopping\n", buf->current_id);
+ priv->stop = TRUE;
+ g_object_notify_by_pspec (G_OBJECT(task), properties[PROP_STOP]);
+ }
+
+ if (ready) priv->generated++;
+
+ if (((priv->number > 0)&&(priv->number <= 100))||((buf->current_id - priv->announced) > 1000)) {
+ if (ready)
+ printf("Processing dataset %li (ready ), next : %u out of %u\n", buf->current_id, buf->n_fragments[buf->current_id%buf->ring_size], buf->fragments_per_dataset);
+ else
+ printf("Skipping dataset %li (timeout), acquired: %u out of %u\n", buf->current_id + 1, buf->n_fragments[buf->current_id%buf->ring_size], buf->fragments_per_dataset);
+ priv->announced = buf->current_id;
+ }
+
+ return ready;
+}
+
+static void
+ufo_roof_build_task_set_property (GObject *object,
+ guint property_id,
+ const GValue *value,
+ GParamSpec *pspec)
+{
+ UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (object);
+
+ switch (property_id) {
+ case PROP_CONFIG:
+ if (priv->config) g_free(priv->config);
+ priv->config = g_value_dup_string(value);
+ break;
+ case PROP_STOP:
+ priv->stop = g_value_get_boolean (value);
+ break;
+ case PROP_SIMULATE:
+ priv->simulate = g_value_get_boolean (value);
+ break;
+ case PROP_PATH:
+ if (priv->path) g_free(priv->path);
+ priv->path = g_value_dup_string(value);
+ break;
+ case PROP_FIRST:
+ priv->first_file_number = g_value_get_uint (value);
+ break;
+ case PROP_NUMBER:
+ priv->number = g_value_get_uint (value);
+ break;
+ case PROP_BUILD:
+ priv->build = g_value_get_enum (value);
+ if ((priv->build == BUILD_AUTO)&&(priv->cfg)) {
+ if (priv->cfg->roof_mode) priv->build = BUILD_SINO;
+ else priv->build = BUILD_RAW;
+ }
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
+ break;
+ }
+}
+
+static void
+ufo_roof_build_task_get_property (GObject *object,
+ guint property_id,
+ GValue *value,
+ GParamSpec *pspec)
+{
+ UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (object);
+
+ switch (property_id) {
+ case PROP_CONFIG:
+ g_value_set_string(value, priv->config);
+ break;
+ case PROP_STOP:
+ g_value_set_boolean (value, priv->stop);
+ break;
+ case PROP_SIMULATE:
+ g_value_set_boolean (value, priv->simulate);
+ break;
+ case PROP_PATH:
+ g_value_set_string(value, priv->path?priv->path:"");
+ break;
+ case PROP_FIRST:
+ g_value_set_uint (value, priv->first_file_number);
+ break;
+ case PROP_NUMBER:
+ g_value_set_uint (value, priv->number);
+ break;
+ case PROP_BUILD:
+ g_value_set_enum (value, priv->build);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
+ break;
+ }
+}
+
+static void
+ufo_task_interface_init (UfoTaskIface *iface)
+{
+ iface->setup = ufo_roof_build_task_setup;
+ iface->get_num_inputs = ufo_roof_build_task_get_num_inputs;
+ iface->get_num_dimensions = ufo_roof_build_task_get_num_dimensions;
+ iface->get_mode = ufo_roof_build_task_get_mode;
+ iface->get_requisition = ufo_roof_build_task_get_requisition;
+ iface->generate = ufo_roof_build_task_generate;
+}
+
+static void
+ufo_roof_build_task_class_init (UfoRoofBuildTaskClass *klass)
+{
+ GObjectClass *oclass = G_OBJECT_CLASS (klass);
+
+ oclass->set_property = ufo_roof_build_task_set_property;
+ oclass->get_property = ufo_roof_build_task_get_property;
+ oclass->finalize = ufo_roof_build_task_finalize;
+
+ properties[PROP_CONFIG] =
+ g_param_spec_string ("config",
+ "ROOF configuration",
+ "Path to ROOF configuration file",
+ "",
+ G_PARAM_READWRITE);
+
+ properties[PROP_STOP] =
+ g_param_spec_boolean ("stop",
+ "Stop flag",
+ "Stop socket servers and terminates filter execution",
+ FALSE,
+ G_PARAM_READWRITE);
+
+ properties[PROP_SIMULATE] =
+ g_param_spec_boolean ("simulate",
+ "Simulation mode",
+ "Read data from the specified files instead of network",
+ FALSE,
+ G_PARAM_READWRITE);
+
+ properties[PROP_PATH] =
+ g_param_spec_string ("path",
+ "Input files for simulation mode",
+ "Optional path to input files for simulation mode (parameter from configuration file is used if not specified)",
+ "",
+ G_PARAM_READWRITE);
+
+ properties[PROP_FIRST] =
+ g_param_spec_uint ("first_file_number",
+ "Offset to the first read file",
+ "Offset to the first read file",
+ 0, G_MAXUINT, 0,
+ G_PARAM_READWRITE);
+
+ properties[PROP_NUMBER] =
+ g_param_spec_uint("number",
+ "Number of datasets to receive",
+ "Number of datasets to receive",
+ 0, G_MAXUINT, 0,
+ G_PARAM_READWRITE);
+
+ properties[PROP_BUILD] =
+ g_param_spec_enum ("build",
+ "Build type (\"raw\", \"sino\", \"ufo\")",
+ "Build type (\"raw\" - raw data, \"sino\" - arrange in sinogram, \"ufo\" - arrange in sinogram and convert UFO floating-point format)",
+ g_enum_register_static ("build", build_values),
+ 0, G_PARAM_READWRITE);
+
+
+ for (guint i = PROP_0 + 1; i < N_PROPERTIES; i++)
+ g_object_class_install_property (oclass, i, properties[i]);
+
+ g_type_class_add_private (oclass, sizeof(UfoRoofBuildTaskPrivate));
+}
+
+static void
+ufo_roof_build_task_init(UfoRoofBuildTask *self)
+{
+ self->priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE(self);
+}
diff --git a/src/save/ufo-roof-read-thread.c b/src/save/ufo-roof-read-thread.c
new file mode 100644
index 0000000..60868d2
--- /dev/null
+++ b/src/save/ufo-roof-read-thread.c
@@ -0,0 +1,155 @@
+#include <stdio.h>
+#include <threads.h>
+
+#include <ufo/ufo.h>
+
+#include "ufo-roof-buffer.h"
+#include "ufo-roof-read-thread.h"
+#include "ufo-roof-read.h"
+
+
+UfoRoofReadThread *guint ufo_roof_read_thread_new(UfoRoofRead *rd, guint from, guint to, GError **error) {
+ int i;
+ GError *gerr = NULL;
+
+ UfoRoofReadThread *thr = (UfoRoofReadThread*)calloc(1, sizeof(UfoRoofReadThread));
+ if (!ctx) roof_new_error(error, "Can't allocate UfoRoofReadThread context");
+
+ thr->rdbuf = malloc(cfg->max_packets * cfg->max_packet_size);
+ if (!thr->rdbuf) {
+ ufo_roof_read_thread_free(thr);
+ roof_new_error(error, "Can't allocate memory for temporary packet receiver buffer");
+ }
+
+ thr->rd = rd;
+ thr->from = from;
+ thr->to = to;
+
+
+ return thr;
+
+}
+
+void ufo_roof_read_thread_free(UFORoofReadThread *thr, GError **error) {
+ if (!thr) return;
+ if (thr->rdbuf) free(thr->rdbuf);
+
+ ufo_roof_thread_stop(thr, error);
+ free(thr);
+}
+
+static int ufo_roof_read_thread_run(void *arg) {
+ GError *gerr = NULL;
+
+ UfoRoofReadThread *thr = (UfoRoofReadThread*)arg;
+
+ UfoRoofConfig *cfg = thr->rd->cfg;
+ UfoRoofBuffer *buf = thr->rd->buf;
+ UfoRoofReadInterface *rdi = thr->rd->rdi;
+
+ guint from = thr->from;
+ guint to = thr->to;
+
+ void *rdbuf = thr->rdbuf;
+
+ uint64_t current_id[to - from] = {0};
+ uint64_t grabbed[to - from] = {0};
+
+ static uint64_t errors = 0;
+
+ while (thr->op != UFO_ROOF_OP_STOP) {
+ for (guint sid = from; sid < to; sid++) {
+ // FIXME break waiting on stop? If no packets are send
+ guint packets = rdi[sid]->read(priv->reader[sid], rdbuf, &gerr);
+ if (gerr) roof_print_error(gerr);
+
+ guint ready = false;
+ const uint8_t *fragment = (uint8_t*)rdbuf;
+ for (guint i = 0; i < packets; i++) {
+ guint64 packet_id = 0;
+
+ // Otherwise considered consecutive and handled by the buffer
+ if (cfg->header_size >= sizeof(UfoRoofPacketHeader)) {
+ UfoRoofPacketHeader *pheader = UFO_ROOF_PACKET_HEADER(fragment);
+ packet_id = be64toh(pheader->packet_id) + 1;
+ }
+
+#ifdef UFO_ROOF_DEBUG
+ if ((current_id[sid - from])&&(current_id[sid - from] + 1 != packet_id)) {
+ printf("Channel %i(%i): =======> Missing %lu packets, expecting %lu but got %lu (N %i from total packets in pack %u)\n", priv->id * cfg->sockets_per_thread + sid, sid, packet_id - current_id[sid] - 1, current_id[sid] + 1, packet_id, i, packets);
+ //if (++errors > 1000) exit(1);
+ }
+
+ current_id[sid - from] = packet_id;
+ grabbed[sid - from]++;
+ if ((grabbed[sid - from]%1000000)==0)
+ printf("Channel %i: Grabbed %lu Mpackets\n", sid, grabbed[sid - from]/1000000);
+#endif
+
+ ready |= ufo_roof_buffer_set_fragment(buf, sid, packet_id, fragment, &gerr);
+ if (gerr) roof_print_error(gerr);
+
+ fragment += cfg->max_packet_size;
+ } // fragment-loop
+
+ // send notification? Broadcast blocks, we don't want it.
+ if (ready) {
+ }
+
+ } // socket-loop
+ } // operation-loop
+
+
+#ifdef UFO_ROOF_DEBUG
+ // Store first received packet on each channel...
+ static int debug = 1;
+ if (debug) {
+ char fname[256];
+ sprintf(fname, "channel%i_packet0.raw", priv->id);
+ FILE *f = fopen(fname, "w");
+ if (f) {
+ fwrite(output_buffer, 1, cfg->max_packets * cfg->max_packet_size, f);
+ fclose(f);
+ }
+ debug = 0;
+ }
+#endif /* UFO_ROOF_DEBUG */
+
+ // FIXME: End of data (shall we restart in the network case?)
+// if (!packets)
+// return FALSE;
+
+ // Shall I use UFO metadata (ufo_buffer_set_metadata) insead?
+ header->channel_id = priv->id;
+// header->n_packets = packets;
+
+ return TRUE;
+}
+
+
+}
+
+gboolean ufo_roof_read_thread_start(UFORoofReadThread *thr, GError **error) {
+ int err;
+ if (!thr) return FALSE;
+
+ err = thrd_create(&thr->thread, ufo_roof_read_thread_run, thr);
+ if (err != thrd_success) roof_setup_error_with_retval(error, FALSE, "Error (%i) spawning new read thread", err);
+
+ ctx->launched = TRUE;
+ return TRUE;
+}
+
+gboolean ufo_roof_read_thread_stop(UFORoofReadThread *thr, GError **error) {
+ int err, ret;
+ if (!thr) return FALSE;
+ if (!thr->launched) return TRUE;
+
+ // Signal thread termination
+
+ err = thrd_join(&thr->thread, &ret);
+ if (err != thrd_success) roof_setup_error_with_retval(error, FALSE, "Error (%i) waiting for read thread termination", err);
+
+ return TRUE;
+}
+
diff --git a/src/save/ufo-roof-read-thread.h b/src/save/ufo-roof-read-thread.h
new file mode 100644
index 0000000..ebe8989
--- /dev/null
+++ b/src/save/ufo-roof-read-thread.h
@@ -0,0 +1,23 @@
+#ifndef __UFO_ROOF_READ_THREAD_H
+#define __UFO_ROOF_READ_THREAD_H
+
+typedef struct _UfoRoofReadThread UfoRoofReadThread;
+
+#include "ufo-roof-read.h"
+
+struct _UfoRoofReadThread {
+ UfoRoofRead *rd; // ROOF Reader Cotext
+ guint from, to; // Determines ports/files which are read by this thread (from is inclusive and to - exclusive)
+
+ gboolean launched; // Flag indicating if thread is launched
+ thrd_t thread; // Thread ID
+};
+
+/*
+UfoRoofReadThread *guint ufo_roof_read_thread_new(UfoRoofRead *rd, guint from, guint to, GError **error);
+void ufo_roof_read_thread_free(UFORoofReadThread *thr, GError **error);
+gboolean ufo_roof_read_thread_start(UFORoofReadThread *thr, GError **error);
+gboolean ufo_roof_read_thread_stop(UFORoofReadThread *thr, GError **error);
+*/
+
+#endif /* __UFO_ROOF_READ_THREAD_H */ \ No newline at end of file
diff --git a/src/save/ufo-roof-read.c b/src/save/ufo-roof-read.c
new file mode 100644
index 0000000..f3d790d
--- /dev/null
+++ b/src/save/ufo-roof-read.c
@@ -0,0 +1,123 @@
+#include <stdio.h>
+#include <assert.h>
+
+#include <ufo/ufo.h>
+
+#include "ufo-roof-buffer.h"
+#include "ufo-roof-read-thread.h"
+#include "ufo-roof-read.h"
+
+
+
+#include <errno.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+
+static void ufo_roof_read_optimize(UfoRoofConfig *cfg) {
+ // FIXME: request real-time permissions?
+ // FIXME: do we need this?
+/*
+ uint32_t lat = 0;
+ int fd = open("/dev/cpu_dma_latency", O_RDWR);
+ if (fd == -1) {
+ fprintf(stderr, "Failed to open cpu_dma_latency: error %s", strerror(errno));
+ } else {
+ write(fd, &lat, sizeof(lat));
+ close(fd);
+ }
+*/
+}
+
+
+const UfoRoofReadInterfaceSettings *ufo_roof_read_get_settings(UFORoofRead *ctx, GError **error) {
+ assert(ctx);
+ return ctx->settings;
+}
+
+
+UfoRoofRead *ufo_roof_read_new(UfoRoofConfig *cfg, UfoRoofReadInterface *rdi, UfoRoofBuffer *buf, GError **error) {
+ guint i;
+ GError *gerr = NULL;
+
+ UfoRoofRead *ctx = (UfoRoofRead*)calloc(1, sizeof(UfoRoofRead));
+ if (!ctx) roof_new_error(error, "Can't allocate UfoRoofRead context");
+
+ ufo_roof_read_optimize(ctx);
+
+ ctx->n_threads = cfg->n_read_threads;
+ ctx->cfg = cfg;
+ ctx->buf = buf;
+ ctx->rdi = rdi;
+
+ ctx->thr = (UfoRoofReadThread*)calloc(cfg->n_read_threads, sizeof(UfoRoofReadThread));
+ if (!ctx->thr) {
+ ufo_roof_read_free(ctx);
+ roof_new_error(error, "Error allocating UfoRoofReadThread contexts");
+ }
+
+ // We try to distribute sockets uniformly respecting sockets_per_thread as maximum limit
+ guint n_threads = priv->cfg->n_streams / priv->cfg->sockets_per_thread;
+ if (priv->cfg->n_streams % priv->cfg->sockets_per_thread) n_threads++;
+ ctx->n_threads = n_threads;
+
+ guint extra = 0, sockets_per_thread = priv->cfg->n_streams / n_threads;
+ if (priv->cfg->n_streams % n_threads) extra = priv->cfg->n_streams - n_threads * sockets_per_thread;
+
+ guint from, to;
+ for (i = 0, from = 0; i < n_threads; i++, from = to) {
+ guint to = from + sockets_per_thread;
+ if (i < extra) to++;
+
+ ctx->thr[i]= ufo_roof_thread_new(ctx, from, to, &gerr);
+ if (!ctx->thr[i]) roof_propagate_error(error, gerr, "ufo_roof_thread_new (%i): ", i);
+ }
+
+ return ctx;
+}
+
+void ufo_roof_read_free(UfoRoofRead *ctx) {
+ if (!ctx) return;
+
+ if (ctx->thr) {
+ int i;
+ ufo_roof_read_stop(ctx);
+ for (i = 0; i < ctx->n_threads; i++) {
+ if (ctx->thr[i])
+ ufo_roof_read_thread_free(ctx->thr[i]);
+ }
+ free(ctx->thr);
+ }
+ free(ctx);
+}
+
+gboolean ufo_roof_read_start(UFORoofRead *ctx, GError **error) {
+ gboolean ret;
+ GError *gerr;
+
+ if ((!ctx)||(!ctx->thr)) return FALSE;
+
+ for (int i = 0; i < ctx->n_threads; i++) {
+ if (!ctx->thr[i]) return FALSE;
+ ret = ufo_roof_read_thread_start(ctx, &gerr);
+ if (!ret) roof_propagate_error_with_retval(error, FALSE, gerr, "ufo_roof_read_thread_start (%i): ", i);
+ }
+ return TRUE;
+}
+
+gboolean ufo_roof_read_stop(UFORoofRead *ctx, GError **error) {
+ gboolean ret, res = FALSE;
+ GError *gerr;
+
+ if ((!ctx)||(!ctx->thr)) return FALSE;
+
+ for (int i = 0; i < ctx->n_threads; i++) {
+ if (!ctx->thr[i]) return FALSE;
+ ret = ufo_roof_read_thread_stop(ctx, &gerr);
+ if (!ret) g_propagate_perfixed_error(error, gerr, "ufo_roof_read_thread_stop (%i): ", i);
+ res |= !ret;
+ }
+ return !res;
+}
+
+
diff --git a/src/save/ufo-roof-read.h b/src/save/ufo-roof-read.h
new file mode 100644
index 0000000..16e910b
--- /dev/null
+++ b/src/save/ufo-roof-read.h
@@ -0,0 +1,61 @@
+#ifndef __UFO_ROOF_READ_H
+#define __UFO_ROOF_READ_H
+
+typedef struct _UfoRoofRead UfoRoofRead;
+
+#include "ufo-roof-config.h"
+#include "ufo-roof-buffer.h"
+//#include "ufo-roof-read-thread.h"
+
+G_BEGIN_DECLS
+
+typedef struct _UfoRoofReadInterface UfoRoofReadInterface;
+typedef struct _UfoRoofReadInterfaceSettings UfoRoofReadInterfaceSettings;
+
+//typedef guint (*UfoRoofReaderRead)(UfoRoofReadInterface *reader, uint8_t *buf, GError **error);
+
+typedef guint (*UfoRoofReaderRead)(UfoRoofReadInterface *reader, uint8_t **buf, GError **error);
+typedef void (*UfoRoofReaderClose)(UfoRoofReadInterface *reader);
+
+struct _UfoRoofReadInterfaceSettings {
+ guint padding; // Packet size + padding
+};
+
+struct _UfoRoofReadInterface {
+ UfoRoofReaderRead read;
+ UfoRoofReaderClose close;
+
+ UfoRoofReadInterfaceSettings settings;
+};
+
+typedef enum {
+ UFO_ROOF_OP_STOP = 0,
+ UFO_ROOF_OP_READ
+} UfoRoofOp;
+
+struct _UfoRoofReadContext {
+ UfoRoofConfig *cfg;
+ UfoRoofBuffer *buf;
+ UfoRoofReadInterface *rdi;
+
+ gulong packet
+// UfoRoofReadThread *thr;
+
+ guint n_threads;
+ UfoRoofOp op; // Current operation (reading by default)
+};
+
+
+
+/*
+UfoRoofRead *ufo_roof_read_new(UfoRoofConfig *cfg, UfoRoofReadInterface *rdi, UfoRoofBuffer *buf, GError **error);
+void ufo_roof_read_free(UfoRoofRead *ctx);
+gboolean ufo_roof_read_start(UFORoofRead *ctx, GError **error);
+gboolean ufo_roof_read_stop(UFORoofRead *ctx, GError **error);
+
+const UfoRoofReadInterfaceSettings *ufo_roof_read_get_settings(UFORoofRead *ctx, GError **error);
+*/
+
+G_END_DECLS
+
+#endif /* __UFO_ROOF_READ_H */