diff options
author | Suren A. Chilingaryan <csa@suren.me> | 2020-09-03 03:00:30 +0200 |
---|---|---|
committer | Suren A. Chilingaryan <csa@suren.me> | 2020-09-03 03:00:30 +0200 |
commit | 5172421d248250b4ab3b69eb57fd83656e23a4da (patch) | |
tree | a499d9f1dd0b74b754816884a59927b3171656fc /src/save | |
parent | 7b2e6168b049be9e7852b2d364d897592eff69fc (diff) | |
download | ufo-roof-temp-master.tar.gz ufo-roof-temp-master.tar.bz2 ufo-roof-temp-master.tar.xz ufo-roof-temp-master.zip |
Diffstat (limited to 'src/save')
-rw-r--r-- | src/save/memcpy.c | 344 | ||||
-rw-r--r-- | src/save/memcpy.h | 63 | ||||
-rw-r--r-- | src/save/ufo-roof-buffer-build-task.c | 474 | ||||
-rw-r--r-- | src/save/ufo-roof-read-thread.c | 155 | ||||
-rw-r--r-- | src/save/ufo-roof-read-thread.h | 23 | ||||
-rw-r--r-- | src/save/ufo-roof-read.c | 123 | ||||
-rw-r--r-- | src/save/ufo-roof-read.h | 61 |
7 files changed, 1243 insertions, 0 deletions
diff --git a/src/save/memcpy.c b/src/save/memcpy.c new file mode 100644 index 0000000..5c29d01 --- /dev/null +++ b/src/save/memcpy.c @@ -0,0 +1,344 @@ +/********************************************************************
+ ** File: memcpy.c
+ **
+ ** Copyright (C) 1999-2010 Daniel Vik
+ **
+ ** This software is provided 'as-is', without any express or implied
+ ** warranty. In no event will the authors be held liable for any
+ ** damages arising from the use of this software.
+ ** Permission is granted to anyone to use this software for any
+ ** purpose, including commercial applications, and to alter it and
+ ** redistribute it freely, subject to the following restrictions:
+ **
+ ** 1. The origin of this software must not be misrepresented; you
+ ** must not claim that you wrote the original software. If you
+ ** use this software in a product, an acknowledgment in the
+ ** use this software in a product, an acknowledgment in the
+ ** product documentation would be appreciated but is not
+ ** required.
+ **
+ ** 2. Altered source versions must be plainly marked as such, and
+ ** must not be misrepresented as being the original software.
+ **
+ ** 3. This notice may not be removed or altered from any source
+ ** distribution.
+ **
+ **
+ ** Description: Implementation of the standard library function memcpy.
+ ** This implementation of memcpy() is ANSI-C89 compatible.
+ **
+ ** The following configuration options can be set:
+ **
+ ** LITTLE_ENDIAN - Uses processor with little endian
+ ** addressing. Default is big endian.
+ **
+ ** PRE_INC_PTRS - Use pre increment of pointers.
+ ** Default is post increment of
+ ** pointers.
+ **
+ ** INDEXED_COPY - Copying data using array indexing.
+ ** Using this option, disables the
+ ** PRE_INC_PTRS option.
+ **
+ ** MEMCPY_64BIT - Compiles memcpy for 64 bit
+ ** architectures
+ **
+ **
+ ** Best Settings:
+ **
+ ** Intel x86: LITTLE_ENDIAN and INDEXED_COPY
+ **
+ *******************************************************************/
+
+
+
+/********************************************************************
+ ** Configuration definitions.
+ *******************************************************************/
+
+#define LITTLE_ENDIAN
+#define INDEXED_COPY
+
+
+/********************************************************************
+ ** Includes for size_t definition
+ *******************************************************************/
+
+#include <stddef.h>
+
+
+/********************************************************************
+ ** Typedefs
+ *******************************************************************/
+
+typedef unsigned char UInt8;
+typedef unsigned short UInt16;
+typedef unsigned int UInt32;
+#ifdef _WIN32
+typedef unsigned __int64 UInt64;
+#else
+typedef unsigned long long UInt64;
+#endif
+
+#ifdef MEMCPY_64BIT
+typedef UInt64 UIntN;
+#define TYPE_WIDTH 8L
+#else
+typedef UInt32 UIntN;
+#define TYPE_WIDTH 4L
+#endif
+
+
+/********************************************************************
+ ** Remove definitions when INDEXED_COPY is defined.
+ *******************************************************************/
+
+#if defined (INDEXED_COPY)
+#if defined (PRE_INC_PTRS)
+#undef PRE_INC_PTRS
+#endif /*PRE_INC_PTRS*/
+#endif /*INDEXED_COPY*/
+
+
+
+/********************************************************************
+ ** Definitions for pre and post increment of pointers.
+ *******************************************************************/
+
+#if defined (PRE_INC_PTRS)
+
+#define START_VAL(x) (x)--
+#define INC_VAL(x) *++(x)
+#define CAST_TO_U8(p, o) ((UInt8*)p + o + TYPE_WIDTH)
+#define WHILE_DEST_BREAK (TYPE_WIDTH - 1)
+#define PRE_LOOP_ADJUST - (TYPE_WIDTH - 1)
+#define PRE_SWITCH_ADJUST + 1
+
+#else /*PRE_INC_PTRS*/
+
+#define START_VAL(x)
+#define INC_VAL(x) *(x)++
+#define CAST_TO_U8(p, o) ((UInt8*)p + o)
+#define WHILE_DEST_BREAK 0
+#define PRE_LOOP_ADJUST
+#define PRE_SWITCH_ADJUST
+
+#endif /*PRE_INC_PTRS*/
+
+
+
+/********************************************************************
+ ** Definitions for endians
+ *******************************************************************/
+
+#if defined (LITTLE_ENDIAN)
+
+#define SHL >>
+#define SHR <<
+
+#else /* LITTLE_ENDIAN */
+
+#define SHL <<
+#define SHR >>
+
+#endif /* LITTLE_ENDIAN */
+
+
+
+/********************************************************************
+ ** Macros for copying words of different alignment.
+ ** Uses incremening pointers.
+ *******************************************************************/
+
+#define CP_INCR() { \
+ INC_VAL(dstN) = INC_VAL(srcN); \
+}
+
+#define CP_INCR_SH(shl, shr) { \
+ dstWord = srcWord SHL shl; \
+ srcWord = INC_VAL(srcN); \
+ dstWord |= srcWord SHR shr; \
+ INC_VAL(dstN) = dstWord; \
+}
+
+
+
+/********************************************************************
+ ** Macros for copying words of different alignment.
+ ** Uses array indexes.
+ *******************************************************************/
+
+#define CP_INDEX(idx) { \
+ dstN[idx] = srcN[idx]; \
+}
+
+#define CP_INDEX_SH(x, shl, shr) { \
+ dstWord = srcWord SHL shl; \
+ srcWord = srcN[x]; \
+ dstWord |= srcWord SHR shr; \
+ dstN[x] = dstWord; \
+}
+
+
+
+/********************************************************************
+ ** Macros for copying words of different alignment.
+ ** Uses incremening pointers or array indexes depending on
+ ** configuration.
+ *******************************************************************/
+
+#if defined (INDEXED_COPY)
+
+#define CP(idx) CP_INDEX(idx)
+#define CP_SH(idx, shl, shr) CP_INDEX_SH(idx, shl, shr)
+
+#define INC_INDEX(p, o) ((p) += (o))
+
+#else /* INDEXED_COPY */
+
+#define CP(idx) CP_INCR()
+#define CP_SH(idx, shl, shr) CP_INCR_SH(shl, shr)
+
+#define INC_INDEX(p, o)
+
+#endif /* INDEXED_COPY */
+
+
+#define COPY_REMAINING(count) { \
+ START_VAL(dst8); \
+ START_VAL(src8); \
+ \
+ switch (count) { \
+ case 7: INC_VAL(dst8) = INC_VAL(src8); \
+ case 6: INC_VAL(dst8) = INC_VAL(src8); \
+ case 5: INC_VAL(dst8) = INC_VAL(src8); \
+ case 4: INC_VAL(dst8) = INC_VAL(src8); \
+ case 3: INC_VAL(dst8) = INC_VAL(src8); \
+ case 2: INC_VAL(dst8) = INC_VAL(src8); \
+ case 1: INC_VAL(dst8) = INC_VAL(src8); \
+ case 0: \
+ default: break; \
+ } \
+}
+
+#define COPY_NO_SHIFT() { \
+ UIntN* dstN = (UIntN*)(dst8 PRE_LOOP_ADJUST); \
+ UIntN* srcN = (UIntN*)(src8 PRE_LOOP_ADJUST); \
+ size_t length = count / TYPE_WIDTH; \
+ \
+ while (length & 7) { \
+ CP_INCR(); \
+ length--; \
+ } \
+ \
+ length /= 8; \
+ \
+ while (length--) { \
+ CP(0); \
+ CP(1); \
+ CP(2); \
+ CP(3); \
+ CP(4); \
+ CP(5); \
+ CP(6); \
+ CP(7); \
+ \
+ INC_INDEX(dstN, 8); \
+ INC_INDEX(srcN, 8); \
+ } \
+ \
+ src8 = CAST_TO_U8(srcN, 0); \
+ dst8 = CAST_TO_U8(dstN, 0); \
+ \
+ COPY_REMAINING(count & (TYPE_WIDTH - 1)); \
+ \
+ return dest; \
+}
+
+
+
+#define COPY_SHIFT(shift) { \
+ UIntN* dstN = (UIntN*)((((UIntN)dst8) PRE_LOOP_ADJUST) & \
+ ~(TYPE_WIDTH - 1)); \
+ UIntN* srcN = (UIntN*)((((UIntN)src8) PRE_LOOP_ADJUST) & \
+ ~(TYPE_WIDTH - 1)); \
+ size_t length = count / TYPE_WIDTH; \
+ UIntN srcWord = INC_VAL(srcN); \
+ UIntN dstWord; \
+ \
+ while (length & 7) { \
+ CP_INCR_SH(8 * shift, 8 * (TYPE_WIDTH - shift)); \
+ length--; \
+ } \
+ \
+ length /= 8; \
+ \
+ while (length--) { \
+ CP_SH(0, 8 * shift, 8 * (TYPE_WIDTH - shift)); \
+ CP_SH(1, 8 * shift, 8 * (TYPE_WIDTH - shift)); \
+ CP_SH(2, 8 * shift, 8 * (TYPE_WIDTH - shift)); \
+ CP_SH(3, 8 * shift, 8 * (TYPE_WIDTH - shift)); \
+ CP_SH(4, 8 * shift, 8 * (TYPE_WIDTH - shift)); \
+ CP_SH(5, 8 * shift, 8 * (TYPE_WIDTH - shift)); \
+ CP_SH(6, 8 * shift, 8 * (TYPE_WIDTH - shift)); \
+ CP_SH(7, 8 * shift, 8 * (TYPE_WIDTH - shift)); \
+ \
+ INC_INDEX(dstN, 8); \
+ INC_INDEX(srcN, 8); \
+ } \
+ \
+ src8 = CAST_TO_U8(srcN, (shift - TYPE_WIDTH)); \
+ dst8 = CAST_TO_U8(dstN, 0); \
+ \
+ COPY_REMAINING(count & (TYPE_WIDTH - 1)); \
+ \
+ return dest; \
+}
+
+
+/********************************************************************
+ **
+ ** void *memcpy(void *dest, const void *src, size_t count)
+ **
+ ** Args: dest - pointer to destination buffer
+ ** src - pointer to source buffer
+ ** count - number of bytes to copy
+ **
+ ** Return: A pointer to destination buffer
+ **
+ ** Purpose: Copies count bytes from src to dest.
+ ** No overlap check is performed.
+ **
+ *******************************************************************/
+
+void *fast_memcpy(void *dest, const void *src, size_t count)
+{
+ UInt8* dst8 = (UInt8*)dest;
+ UInt8* src8 = (UInt8*)src;
+
+ if (count < 8) {
+ COPY_REMAINING(count);
+ return dest;
+ }
+
+ START_VAL(dst8);
+ START_VAL(src8);
+
+ while (((UIntN)dst8 & (TYPE_WIDTH - 1)) != WHILE_DEST_BREAK) {
+ INC_VAL(dst8) = INC_VAL(src8);
+ count--;
+ }
+
+ switch ((((UIntN)src8) PRE_SWITCH_ADJUST) & (TYPE_WIDTH - 1)) {
+ case 0: COPY_NO_SHIFT(); break;
+ case 1: COPY_SHIFT(1); break;
+ case 2: COPY_SHIFT(2); break;
+ case 3: COPY_SHIFT(3); break;
+#if TYPE_WIDTH > 4
+ case 4: COPY_SHIFT(4); break;
+ case 5: COPY_SHIFT(5); break;
+ case 6: COPY_SHIFT(6); break;
+ case 7: COPY_SHIFT(7); break;
+#endif
+ }
+}
diff --git a/src/save/memcpy.h b/src/save/memcpy.h new file mode 100644 index 0000000..0714823 --- /dev/null +++ b/src/save/memcpy.h @@ -0,0 +1,63 @@ +/********************************************************************
+ ** File: memcpy.h
+ **
+ ** Copyright (C) 2005 Daniel Vik
+ **
+ ** This software is provided 'as-is', without any express or implied
+ ** warranty. In no event will the authors be held liable for any
+ ** damages arising from the use of this software.
+ ** Permission is granted to anyone to use this software for any
+ ** purpose, including commercial applications, and to alter it and
+ ** redistribute it freely, subject to the following restrictions:
+ **
+ ** 1. The origin of this software must not be misrepresented; you
+ ** must not claim that you wrote the original software. If you
+ ** use this software in a product, an acknowledgment in the
+ ** use this software in a product, an acknowledgment in the
+ ** product documentation would be appreciated but is not
+ ** required.
+ **
+ ** 2. Altered source versions must be plainly marked as such, and
+ ** must not be misrepresented as being the original software.
+ **
+ ** 3. This notice may not be removed or altered from any source
+ ** distribution.
+ **
+ **
+ ** Description: Implementation of the standard library function memcpy.
+ ** This implementation of memcpy() is ANSI-C89 compatible.
+ **
+ *******************************************************************/
+
+
+/********************************************************************
+ ** Includes for size_t definition
+ *******************************************************************/
+
+#include <stddef.h>
+
+
+/********************************************************************
+ **
+ ** void *memcpy(void *dest, const void *src, size_t count)
+ **
+ ** Args: dest - pointer to destination buffer
+ ** src - pointer to source buffer
+ ** count - number of bytes to copy
+ **
+ ** Return: A pointer to destination buffer
+ **
+ ** Purpose: Copies count bytes from src to dest. No overlap check
+ ** is performed.
+ **
+ *******************************************************************/
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+void *fast_memcpy(void *dest, const void *src, size_t count);
+
+#ifdef __cplusplus
+}
+#endif
diff --git a/src/save/ufo-roof-buffer-build-task.c b/src/save/ufo-roof-buffer-build-task.c new file mode 100644 index 0000000..bdb7a7d --- /dev/null +++ b/src/save/ufo-roof-buffer-build-task.c @@ -0,0 +1,474 @@ +/* + * Copyright (C) 2011-2015 Karlsruhe Institute of Technology + * + * This file is part of Ufo. + * + * This library is free software: you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation, either + * version 3 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see <http://www.gnu.org/licenses/>. + */ + +#include <stdio.h> +#include <endian.h> +#include <threads.h> + +#ifdef __APPLE__ +#include <OpenCL/cl.h> +#else +#include <CL/cl.h> +#endif + +#include "ufo-roof.h" +#include "ufo-roof-buffer.h" +#include "ufo-roof-build-task.h" + +typedef enum { + BUILD_AUTO = 0, + BUILD_RAW, + BUILD_SINO, + BUILD_UFO +} BuildType; + +struct _UfoRoofBuildTaskPrivate { + gchar *config; // ROOF configuration file name + UfoRoofConfig *cfg; // Parsed ROOF parameters + UfoRoofBuffer *buf; // Ring buffer for incomming UDP packet + UfoRoofReadInterface; *rdi; // Reader interfaces, one per socket (no threading) + UfoRoofRead *rd; // Threading interface + + gchar *path; // UFO file path for simulation mode + guint first_file_number; // Number of a first simulated file (0 or 1) + + BuildType build; // What dataset do we build: ROOF sinogram or raw network data + guint number; // Number of datasets to read + gboolean stop; // Stop flag + gboolean simulate; // Indicates if we are running in network or simulation modes + + guint64 announced; // For debugging + guint64 generated; // Total number for control + + struct timespec last_fragment_timestamp; +}; + +static void ufo_task_interface_init (UfoTaskIface *iface); + +G_DEFINE_TYPE_WITH_CODE (UfoRoofBuildTask, ufo_roof_build_task, UFO_TYPE_TASK_NODE, + G_IMPLEMENT_INTERFACE (UFO_TYPE_TASK, + ufo_task_interface_init)) + +#define UFO_ROOF_BUILD_TASK_GET_PRIVATE(obj) (G_TYPE_INSTANCE_GET_PRIVATE((obj), UFO_TYPE_ROOF_BUILD_TASK, UfoRoofBuildTaskPrivate)) + + + +static GEnumValue build_values[] = { + { BUILD_AUTO, "BUILD_AUTO", "auto" }, + { BUILD_RAW, "BUILD_RAW", "raw" }, + { BUILD_SINO, "BUILD_SINO", "sino" }, + { BUILD_UFO, "BUILD_UFO", "ufo" }, + { 0, NULL, NULL} +}; + +enum { + PROP_0, + PROP_STOP, + PROP_SIMULATE, + PROP_PATH, + PROP_FIRST, + PROP_NUMBER, + PROP_BUILD, + PROP_CONFIG, + N_PROPERTIES +}; + +static GParamSpec *properties[N_PROPERTIES] = { NULL, }; + +UfoNode * +ufo_roof_build_task_new (void) +{ + return UFO_NODE (g_object_new (UFO_TYPE_ROOF_BUILD_TASK, NULL)); +} + +static void +ufo_roof_build_task_setup (UfoTask *task, + UfoResources *resources, + GError **error) +{ + guint i; + GError *gerr = NULL; + + UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task); + + if (!priv->config) + roof_setup_error(error, "ROOF configuration is not specified"); + + priv->cfg = ufo_roof_config_new(priv->config, priv->simulate?UFO_ROOF_CONFIG_SIMULATION:UFO_ROOF_CONFIG_DEFAULT, &gerr); + if (!priv->cfg) + roof_propagate_error(error, gerr, "roof-build-setup: "); + + for (i = 0; i < priv->cfg->n_streams; i++) { + if (priv->simulate) { + if (!priv->path) + roof_setup_error(error, "Path to simulated data should be specified"); + + priv->rdi[i] = ufo_roof_read_file_new(priv->cfg, priv->path, priv->id * priv->cfg->sockets_per_thread + i + priv->first_file_number, &gerr); + } else + priv->rdi[i] = ufo_roof_read_socket_new(priv->cfg, priv->id * priv->cfg->sockets_per_thread + i, &gerr); + + if (!priv->rdi[i]) + roof_propagate_error(error, gerr, "roof_read_interface_new: "); + } + + if (priv->build == BUILD_AUTO) { + if (priv->cfg->roof_mode) priv->build = BUILD_SINO; + else priv->build = BUILD_RAW; + g_object_notify_by_pspec (G_OBJECT(task), properties[PROP_BUILD]); + } + + priv->buf = ufo_roof_buffer_new(priv->cfg, (priv->build == BUILD_RAW)?1:2, priv->number, &gerr); + if ((gerr)||(!priv->buf)) + roof_propagate_error(error, gerr, "roof_buffer_new: "); + + priv->rd = ufo_roof_read_new(priv->cfg, priv->rdi, priv->buf, &gerr); + if (gerr) + roof_propagate_error(error, gerr, "roof_read_new: "); + + clock_gettime(CLOCK_REALTIME, &priv->last_fragment_timestamp); +} + +static void +ufo_roof_build_task_finalize (GObject *object) +{ + UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (object); + + if (priv->rd) { + ufo_roof_read_free(priv->rd); + priv->rd = NULL; + } + + if (priv->buf) { + ufo_roof_buffer_free(priv->buf); + priv->buf = NULL; + } + + if (priv->cfg) { + ufo_roof_config_free(priv->cfg); + priv->cfg = NULL; + } + + if (priv->config) { + g_free(priv->config); + priv->config = NULL; + } + + if (priv->path) { + g_free(priv->path); + priv->path = NULL; + } + + G_OBJECT_CLASS (ufo_roof_build_task_parent_class)->finalize (object); +} + + + +static void +ufo_roof_build_task_get_requisition (UfoTask *task, + UfoBuffer **inputs, + UfoRequisition *requisition, + GError **error) +{ + UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task); + + // FIXME: Can we handle data types more elegant? + if (priv->build == BUILD_RAW) { + guint bytes = priv->cfg->dataset_size; + requisition->n_dims = 1; + requisition->dims[0] = bytes / sizeof(float) + ((bytes%sizeof(float))?1:0); + } else if (priv->build == BUILD_SINO) { + guint bytes = priv->cfg->fan_bins * priv->cfg->bit_depth / 8; + requisition->n_dims = 2; + requisition->dims[0] = bytes / sizeof(float) + ((bytes%sizeof(float))?1:0); + requisition->dims[1] = priv->cfg->fan_projections; + } else if (priv->build == BUILD_UFO) { + requisition->n_dims = 2; + requisition->dims[0] = priv->cfg->fan_bins; + requisition->dims[1] = priv->cfg->fan_projections; + } +} + +static guint +ufo_roof_build_task_get_num_inputs (UfoTask *task) +{ + return 1; +} + +static guint +ufo_roof_build_task_get_num_dimensions (UfoTask *task, + guint input) +{ + return 1; +} + +static UfoTaskMode +ufo_roof_build_task_get_mode (UfoTask *task) +{ + return UFO_TASK_MODE_CPU | UFO_TASK_MODE_GENERATOR; +} + + +static gboolean +ufo_roof_build_task_generate (UfoTask *task, + UfoBuffer *output, + UfoRequisition *requisition) +{ + gboolean ready = FALSE; + gulong seqid; + GError *gerr = NULL; + GValue ival = G_VALUE_INIT; + GValue lval = G_VALUE_INIT; + + UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task); + UfoRoofConfig *cfg = priv->cfg; + UfoRoofBuffer *buf = priv->buf; + + void *output_buffer = ufo_buffer_get_host_array(output, NULL); + + if (priv->stop) + return FALSE; + + // FIXME: Wait or break. Not both. + do { + ready = ufo_roof_buffer_wait_dataset(buf, output_buffer, &seqid, cfg->network_timeout, &gerr); + if (gerr) roof_print_error(gerr); + + if (!ready) { + ready = ufo_roof_buffer_skip_to_ready(buf); + if (!ready) { + priv->stop = TRUE; + g_object_notify_by_pspec (G_OBJECT(task), properties[PROP_STOP]); + return FALSE; + } + } + } while (!ready); + + // FIXME: integrate fastwriter somewhere here? + + if (priv->build == BUILD_UFO) { + switch (cfg->bit_depth) { + case 8: + ufo_buffer_convert(output, UFO_BUFFER_DEPTH_8U); + break; + case 16: + ufo_buffer_convert(output, UFO_BUFFER_DEPTH_16U); + break; + case 32: + ufo_buffer_convert(output, UFO_BUFFER_DEPTH_32U); + break; + default: + printf("Usupported bit-depth %u\n", cfg->bit_depth); + } + } + + // Metadata: plane and sequential number within the plane + g_value_init (&ival, G_TYPE_UINT); + g_value_init (&lval, G_TYPE_ULONG); + if (priv->build != BUILD_UFO) { + g_value_set_uint (&ival, cfg->bit_depth); + ufo_buffer_set_metadata (output, "bpp", &ival); + } + g_value_set_uint (&ival, 1 + seqid % cfg->n_planes); + ufo_buffer_set_metadata (output, "plane", &ival); + g_value_set_ulong (&lval, seqid / cfg->n_planes); + ufo_buffer_set_metadata (output, "plane_id", &lval); + g_value_set_ulong (&lval, seqid); + ufo_buffer_set_metadata (output, "seqid", &lval); + g_value_unset(&lval); + g_value_unset(&ival); + + // FIXME: Or shall we start from counting from the ID of the first registerd dataset + if ((priv->number)&&(buf->current_id >= priv->number)) { +// printf("%lu datasets processed, stopping\n", buf->current_id); + priv->stop = TRUE; + g_object_notify_by_pspec (G_OBJECT(task), properties[PROP_STOP]); + } + + if (ready) priv->generated++; + + if (((priv->number > 0)&&(priv->number <= 100))||((buf->current_id - priv->announced) > 1000)) { + if (ready) + printf("Processing dataset %li (ready ), next : %u out of %u\n", buf->current_id, buf->n_fragments[buf->current_id%buf->ring_size], buf->fragments_per_dataset); + else + printf("Skipping dataset %li (timeout), acquired: %u out of %u\n", buf->current_id + 1, buf->n_fragments[buf->current_id%buf->ring_size], buf->fragments_per_dataset); + priv->announced = buf->current_id; + } + + return ready; +} + +static void +ufo_roof_build_task_set_property (GObject *object, + guint property_id, + const GValue *value, + GParamSpec *pspec) +{ + UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (object); + + switch (property_id) { + case PROP_CONFIG: + if (priv->config) g_free(priv->config); + priv->config = g_value_dup_string(value); + break; + case PROP_STOP: + priv->stop = g_value_get_boolean (value); + break; + case PROP_SIMULATE: + priv->simulate = g_value_get_boolean (value); + break; + case PROP_PATH: + if (priv->path) g_free(priv->path); + priv->path = g_value_dup_string(value); + break; + case PROP_FIRST: + priv->first_file_number = g_value_get_uint (value); + break; + case PROP_NUMBER: + priv->number = g_value_get_uint (value); + break; + case PROP_BUILD: + priv->build = g_value_get_enum (value); + if ((priv->build == BUILD_AUTO)&&(priv->cfg)) { + if (priv->cfg->roof_mode) priv->build = BUILD_SINO; + else priv->build = BUILD_RAW; + } + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); + break; + } +} + +static void +ufo_roof_build_task_get_property (GObject *object, + guint property_id, + GValue *value, + GParamSpec *pspec) +{ + UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (object); + + switch (property_id) { + case PROP_CONFIG: + g_value_set_string(value, priv->config); + break; + case PROP_STOP: + g_value_set_boolean (value, priv->stop); + break; + case PROP_SIMULATE: + g_value_set_boolean (value, priv->simulate); + break; + case PROP_PATH: + g_value_set_string(value, priv->path?priv->path:""); + break; + case PROP_FIRST: + g_value_set_uint (value, priv->first_file_number); + break; + case PROP_NUMBER: + g_value_set_uint (value, priv->number); + break; + case PROP_BUILD: + g_value_set_enum (value, priv->build); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); + break; + } +} + +static void +ufo_task_interface_init (UfoTaskIface *iface) +{ + iface->setup = ufo_roof_build_task_setup; + iface->get_num_inputs = ufo_roof_build_task_get_num_inputs; + iface->get_num_dimensions = ufo_roof_build_task_get_num_dimensions; + iface->get_mode = ufo_roof_build_task_get_mode; + iface->get_requisition = ufo_roof_build_task_get_requisition; + iface->generate = ufo_roof_build_task_generate; +} + +static void +ufo_roof_build_task_class_init (UfoRoofBuildTaskClass *klass) +{ + GObjectClass *oclass = G_OBJECT_CLASS (klass); + + oclass->set_property = ufo_roof_build_task_set_property; + oclass->get_property = ufo_roof_build_task_get_property; + oclass->finalize = ufo_roof_build_task_finalize; + + properties[PROP_CONFIG] = + g_param_spec_string ("config", + "ROOF configuration", + "Path to ROOF configuration file", + "", + G_PARAM_READWRITE); + + properties[PROP_STOP] = + g_param_spec_boolean ("stop", + "Stop flag", + "Stop socket servers and terminates filter execution", + FALSE, + G_PARAM_READWRITE); + + properties[PROP_SIMULATE] = + g_param_spec_boolean ("simulate", + "Simulation mode", + "Read data from the specified files instead of network", + FALSE, + G_PARAM_READWRITE); + + properties[PROP_PATH] = + g_param_spec_string ("path", + "Input files for simulation mode", + "Optional path to input files for simulation mode (parameter from configuration file is used if not specified)", + "", + G_PARAM_READWRITE); + + properties[PROP_FIRST] = + g_param_spec_uint ("first_file_number", + "Offset to the first read file", + "Offset to the first read file", + 0, G_MAXUINT, 0, + G_PARAM_READWRITE); + + properties[PROP_NUMBER] = + g_param_spec_uint("number", + "Number of datasets to receive", + "Number of datasets to receive", + 0, G_MAXUINT, 0, + G_PARAM_READWRITE); + + properties[PROP_BUILD] = + g_param_spec_enum ("build", + "Build type (\"raw\", \"sino\", \"ufo\")", + "Build type (\"raw\" - raw data, \"sino\" - arrange in sinogram, \"ufo\" - arrange in sinogram and convert UFO floating-point format)", + g_enum_register_static ("build", build_values), + 0, G_PARAM_READWRITE); + + + for (guint i = PROP_0 + 1; i < N_PROPERTIES; i++) + g_object_class_install_property (oclass, i, properties[i]); + + g_type_class_add_private (oclass, sizeof(UfoRoofBuildTaskPrivate)); +} + +static void +ufo_roof_build_task_init(UfoRoofBuildTask *self) +{ + self->priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE(self); +} diff --git a/src/save/ufo-roof-read-thread.c b/src/save/ufo-roof-read-thread.c new file mode 100644 index 0000000..60868d2 --- /dev/null +++ b/src/save/ufo-roof-read-thread.c @@ -0,0 +1,155 @@ +#include <stdio.h> +#include <threads.h> + +#include <ufo/ufo.h> + +#include "ufo-roof-buffer.h" +#include "ufo-roof-read-thread.h" +#include "ufo-roof-read.h" + + +UfoRoofReadThread *guint ufo_roof_read_thread_new(UfoRoofRead *rd, guint from, guint to, GError **error) { + int i; + GError *gerr = NULL; + + UfoRoofReadThread *thr = (UfoRoofReadThread*)calloc(1, sizeof(UfoRoofReadThread)); + if (!ctx) roof_new_error(error, "Can't allocate UfoRoofReadThread context"); + + thr->rdbuf = malloc(cfg->max_packets * cfg->max_packet_size); + if (!thr->rdbuf) { + ufo_roof_read_thread_free(thr); + roof_new_error(error, "Can't allocate memory for temporary packet receiver buffer"); + } + + thr->rd = rd; + thr->from = from; + thr->to = to; + + + return thr; + +} + +void ufo_roof_read_thread_free(UFORoofReadThread *thr, GError **error) { + if (!thr) return; + if (thr->rdbuf) free(thr->rdbuf); + + ufo_roof_thread_stop(thr, error); + free(thr); +} + +static int ufo_roof_read_thread_run(void *arg) { + GError *gerr = NULL; + + UfoRoofReadThread *thr = (UfoRoofReadThread*)arg; + + UfoRoofConfig *cfg = thr->rd->cfg; + UfoRoofBuffer *buf = thr->rd->buf; + UfoRoofReadInterface *rdi = thr->rd->rdi; + + guint from = thr->from; + guint to = thr->to; + + void *rdbuf = thr->rdbuf; + + uint64_t current_id[to - from] = {0}; + uint64_t grabbed[to - from] = {0}; + + static uint64_t errors = 0; + + while (thr->op != UFO_ROOF_OP_STOP) { + for (guint sid = from; sid < to; sid++) { + // FIXME break waiting on stop? If no packets are send + guint packets = rdi[sid]->read(priv->reader[sid], rdbuf, &gerr); + if (gerr) roof_print_error(gerr); + + guint ready = false; + const uint8_t *fragment = (uint8_t*)rdbuf; + for (guint i = 0; i < packets; i++) { + guint64 packet_id = 0; + + // Otherwise considered consecutive and handled by the buffer + if (cfg->header_size >= sizeof(UfoRoofPacketHeader)) { + UfoRoofPacketHeader *pheader = UFO_ROOF_PACKET_HEADER(fragment); + packet_id = be64toh(pheader->packet_id) + 1; + } + +#ifdef UFO_ROOF_DEBUG + if ((current_id[sid - from])&&(current_id[sid - from] + 1 != packet_id)) { + printf("Channel %i(%i): =======> Missing %lu packets, expecting %lu but got %lu (N %i from total packets in pack %u)\n", priv->id * cfg->sockets_per_thread + sid, sid, packet_id - current_id[sid] - 1, current_id[sid] + 1, packet_id, i, packets); + //if (++errors > 1000) exit(1); + } + + current_id[sid - from] = packet_id; + grabbed[sid - from]++; + if ((grabbed[sid - from]%1000000)==0) + printf("Channel %i: Grabbed %lu Mpackets\n", sid, grabbed[sid - from]/1000000); +#endif + + ready |= ufo_roof_buffer_set_fragment(buf, sid, packet_id, fragment, &gerr); + if (gerr) roof_print_error(gerr); + + fragment += cfg->max_packet_size; + } // fragment-loop + + // send notification? Broadcast blocks, we don't want it. + if (ready) { + } + + } // socket-loop + } // operation-loop + + +#ifdef UFO_ROOF_DEBUG + // Store first received packet on each channel... + static int debug = 1; + if (debug) { + char fname[256]; + sprintf(fname, "channel%i_packet0.raw", priv->id); + FILE *f = fopen(fname, "w"); + if (f) { + fwrite(output_buffer, 1, cfg->max_packets * cfg->max_packet_size, f); + fclose(f); + } + debug = 0; + } +#endif /* UFO_ROOF_DEBUG */ + + // FIXME: End of data (shall we restart in the network case?) +// if (!packets) +// return FALSE; + + // Shall I use UFO metadata (ufo_buffer_set_metadata) insead? + header->channel_id = priv->id; +// header->n_packets = packets; + + return TRUE; +} + + +} + +gboolean ufo_roof_read_thread_start(UFORoofReadThread *thr, GError **error) { + int err; + if (!thr) return FALSE; + + err = thrd_create(&thr->thread, ufo_roof_read_thread_run, thr); + if (err != thrd_success) roof_setup_error_with_retval(error, FALSE, "Error (%i) spawning new read thread", err); + + ctx->launched = TRUE; + return TRUE; +} + +gboolean ufo_roof_read_thread_stop(UFORoofReadThread *thr, GError **error) { + int err, ret; + if (!thr) return FALSE; + if (!thr->launched) return TRUE; + + // Signal thread termination + + err = thrd_join(&thr->thread, &ret); + if (err != thrd_success) roof_setup_error_with_retval(error, FALSE, "Error (%i) waiting for read thread termination", err); + + return TRUE; +} + diff --git a/src/save/ufo-roof-read-thread.h b/src/save/ufo-roof-read-thread.h new file mode 100644 index 0000000..ebe8989 --- /dev/null +++ b/src/save/ufo-roof-read-thread.h @@ -0,0 +1,23 @@ +#ifndef __UFO_ROOF_READ_THREAD_H +#define __UFO_ROOF_READ_THREAD_H + +typedef struct _UfoRoofReadThread UfoRoofReadThread; + +#include "ufo-roof-read.h" + +struct _UfoRoofReadThread { + UfoRoofRead *rd; // ROOF Reader Cotext + guint from, to; // Determines ports/files which are read by this thread (from is inclusive and to - exclusive) + + gboolean launched; // Flag indicating if thread is launched + thrd_t thread; // Thread ID +}; + +/* +UfoRoofReadThread *guint ufo_roof_read_thread_new(UfoRoofRead *rd, guint from, guint to, GError **error); +void ufo_roof_read_thread_free(UFORoofReadThread *thr, GError **error); +gboolean ufo_roof_read_thread_start(UFORoofReadThread *thr, GError **error); +gboolean ufo_roof_read_thread_stop(UFORoofReadThread *thr, GError **error); +*/ + +#endif /* __UFO_ROOF_READ_THREAD_H */
\ No newline at end of file diff --git a/src/save/ufo-roof-read.c b/src/save/ufo-roof-read.c new file mode 100644 index 0000000..f3d790d --- /dev/null +++ b/src/save/ufo-roof-read.c @@ -0,0 +1,123 @@ +#include <stdio.h> +#include <assert.h> + +#include <ufo/ufo.h> + +#include "ufo-roof-buffer.h" +#include "ufo-roof-read-thread.h" +#include "ufo-roof-read.h" + + + +#include <errno.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> + +static void ufo_roof_read_optimize(UfoRoofConfig *cfg) { + // FIXME: request real-time permissions? + // FIXME: do we need this? +/* + uint32_t lat = 0; + int fd = open("/dev/cpu_dma_latency", O_RDWR); + if (fd == -1) { + fprintf(stderr, "Failed to open cpu_dma_latency: error %s", strerror(errno)); + } else { + write(fd, &lat, sizeof(lat)); + close(fd); + } +*/ +} + + +const UfoRoofReadInterfaceSettings *ufo_roof_read_get_settings(UFORoofRead *ctx, GError **error) { + assert(ctx); + return ctx->settings; +} + + +UfoRoofRead *ufo_roof_read_new(UfoRoofConfig *cfg, UfoRoofReadInterface *rdi, UfoRoofBuffer *buf, GError **error) { + guint i; + GError *gerr = NULL; + + UfoRoofRead *ctx = (UfoRoofRead*)calloc(1, sizeof(UfoRoofRead)); + if (!ctx) roof_new_error(error, "Can't allocate UfoRoofRead context"); + + ufo_roof_read_optimize(ctx); + + ctx->n_threads = cfg->n_read_threads; + ctx->cfg = cfg; + ctx->buf = buf; + ctx->rdi = rdi; + + ctx->thr = (UfoRoofReadThread*)calloc(cfg->n_read_threads, sizeof(UfoRoofReadThread)); + if (!ctx->thr) { + ufo_roof_read_free(ctx); + roof_new_error(error, "Error allocating UfoRoofReadThread contexts"); + } + + // We try to distribute sockets uniformly respecting sockets_per_thread as maximum limit + guint n_threads = priv->cfg->n_streams / priv->cfg->sockets_per_thread; + if (priv->cfg->n_streams % priv->cfg->sockets_per_thread) n_threads++; + ctx->n_threads = n_threads; + + guint extra = 0, sockets_per_thread = priv->cfg->n_streams / n_threads; + if (priv->cfg->n_streams % n_threads) extra = priv->cfg->n_streams - n_threads * sockets_per_thread; + + guint from, to; + for (i = 0, from = 0; i < n_threads; i++, from = to) { + guint to = from + sockets_per_thread; + if (i < extra) to++; + + ctx->thr[i]= ufo_roof_thread_new(ctx, from, to, &gerr); + if (!ctx->thr[i]) roof_propagate_error(error, gerr, "ufo_roof_thread_new (%i): ", i); + } + + return ctx; +} + +void ufo_roof_read_free(UfoRoofRead *ctx) { + if (!ctx) return; + + if (ctx->thr) { + int i; + ufo_roof_read_stop(ctx); + for (i = 0; i < ctx->n_threads; i++) { + if (ctx->thr[i]) + ufo_roof_read_thread_free(ctx->thr[i]); + } + free(ctx->thr); + } + free(ctx); +} + +gboolean ufo_roof_read_start(UFORoofRead *ctx, GError **error) { + gboolean ret; + GError *gerr; + + if ((!ctx)||(!ctx->thr)) return FALSE; + + for (int i = 0; i < ctx->n_threads; i++) { + if (!ctx->thr[i]) return FALSE; + ret = ufo_roof_read_thread_start(ctx, &gerr); + if (!ret) roof_propagate_error_with_retval(error, FALSE, gerr, "ufo_roof_read_thread_start (%i): ", i); + } + return TRUE; +} + +gboolean ufo_roof_read_stop(UFORoofRead *ctx, GError **error) { + gboolean ret, res = FALSE; + GError *gerr; + + if ((!ctx)||(!ctx->thr)) return FALSE; + + for (int i = 0; i < ctx->n_threads; i++) { + if (!ctx->thr[i]) return FALSE; + ret = ufo_roof_read_thread_stop(ctx, &gerr); + if (!ret) g_propagate_perfixed_error(error, gerr, "ufo_roof_read_thread_stop (%i): ", i); + res |= !ret; + } + return !res; +} + + diff --git a/src/save/ufo-roof-read.h b/src/save/ufo-roof-read.h new file mode 100644 index 0000000..16e910b --- /dev/null +++ b/src/save/ufo-roof-read.h @@ -0,0 +1,61 @@ +#ifndef __UFO_ROOF_READ_H +#define __UFO_ROOF_READ_H + +typedef struct _UfoRoofRead UfoRoofRead; + +#include "ufo-roof-config.h" +#include "ufo-roof-buffer.h" +//#include "ufo-roof-read-thread.h" + +G_BEGIN_DECLS + +typedef struct _UfoRoofReadInterface UfoRoofReadInterface; +typedef struct _UfoRoofReadInterfaceSettings UfoRoofReadInterfaceSettings; + +//typedef guint (*UfoRoofReaderRead)(UfoRoofReadInterface *reader, uint8_t *buf, GError **error); + +typedef guint (*UfoRoofReaderRead)(UfoRoofReadInterface *reader, uint8_t **buf, GError **error); +typedef void (*UfoRoofReaderClose)(UfoRoofReadInterface *reader); + +struct _UfoRoofReadInterfaceSettings { + guint padding; // Packet size + padding +}; + +struct _UfoRoofReadInterface { + UfoRoofReaderRead read; + UfoRoofReaderClose close; + + UfoRoofReadInterfaceSettings settings; +}; + +typedef enum { + UFO_ROOF_OP_STOP = 0, + UFO_ROOF_OP_READ +} UfoRoofOp; + +struct _UfoRoofReadContext { + UfoRoofConfig *cfg; + UfoRoofBuffer *buf; + UfoRoofReadInterface *rdi; + + gulong packet +// UfoRoofReadThread *thr; + + guint n_threads; + UfoRoofOp op; // Current operation (reading by default) +}; + + + +/* +UfoRoofRead *ufo_roof_read_new(UfoRoofConfig *cfg, UfoRoofReadInterface *rdi, UfoRoofBuffer *buf, GError **error); +void ufo_roof_read_free(UfoRoofRead *ctx); +gboolean ufo_roof_read_start(UFORoofRead *ctx, GError **error); +gboolean ufo_roof_read_stop(UFORoofRead *ctx, GError **error); + +const UfoRoofReadInterfaceSettings *ufo_roof_read_get_settings(UFORoofRead *ctx, GError **error); +*/ + +G_END_DECLS + +#endif /* __UFO_ROOF_READ_H */ |