diff options
Diffstat (limited to 'src/ufo-roof-build-task.c')
-rw-r--r-- | src/ufo-roof-build-task.c | 271 |
1 files changed, 178 insertions, 93 deletions
diff --git a/src/ufo-roof-build-task.c b/src/ufo-roof-build-task.c index a39ed11..ee1cec5 100644 --- a/src/ufo-roof-build-task.c +++ b/src/ufo-roof-build-task.c @@ -19,6 +19,7 @@ #include <stdio.h> #include <endian.h> +#include <threads.h> #ifdef __APPLE__ #include <OpenCL/cl.h> @@ -26,10 +27,16 @@ #include <CL/cl.h> #endif +#include "hw_sched.h" + #include "ufo-roof.h" +#include "ufo-roof-read.h" #include "ufo-roof-buffer.h" +#include "ufo-roof-read-socket.h" +#include "ufo-roof-read-file.h" #include "ufo-roof-build-task.h" + typedef enum { BUILD_AUTO = 0, BUILD_RAW, @@ -37,10 +44,16 @@ typedef enum { BUILD_UFO } BuildType; -struct _UfoRoofBuildTaskPrivate { +struct _RoofBuildTaskPrivate { gchar *config; // ROOF configuration file name - UfoRoofConfig *cfg; // Parsed ROOF parameters - UfoRoofBuffer *buf; // Ring buffer for incomming UDP packet + RoofConfig *cfg; // Parsed ROOF parameters +// RoofBuffer *buf; // Ring buffer for incomming UDP packet + RoofReadInterface **rdi; // Reader interfaces, one per socket (no threading) +// RoofRead *rd; // Threading interface + HWSched sched; + + gchar *path; // UFO file path for simulation mode + guint first_file_number; // Number of a first simulated file (0 or 1) BuildType build; // What dataset do we build: ROOF sinogram or raw network data guint number; // Number of datasets to read @@ -48,17 +61,21 @@ struct _UfoRoofBuildTaskPrivate { gboolean simulate; // Indicates if we are running in network or simulation modes guint64 announced; // For debugging + guint64 generated; // Total number for control + + guint n_threads; // Number of schedulled threads + struct timespec last_fragment_timestamp; }; static void ufo_task_interface_init (UfoTaskIface *iface); -G_DEFINE_TYPE_WITH_CODE (UfoRoofBuildTask, ufo_roof_build_task, UFO_TYPE_TASK_NODE, +G_DEFINE_TYPE_WITH_CODE (RoofBuildTask, ufo_roof_build_task, UFO_TYPE_TASK_NODE, G_IMPLEMENT_INTERFACE (UFO_TYPE_TASK, ufo_task_interface_init)) -#define UFO_ROOF_BUILD_TASK_GET_PRIVATE(obj) (G_TYPE_INSTANCE_GET_PRIVATE((obj), UFO_TYPE_ROOF_BUILD_TASK, UfoRoofBuildTaskPrivate)) +#define UFO_ROOF_BUILD_TASK_GET_PRIVATE(obj) (G_TYPE_INSTANCE_GET_PRIVATE((obj), UFO_TYPE_ROOF_BUILD_TASK, RoofBuildTaskPrivate)) @@ -74,6 +91,8 @@ enum { PROP_0, PROP_STOP, PROP_SIMULATE, + PROP_PATH, + PROP_FIRST, PROP_NUMBER, PROP_BUILD, PROP_CONFIG, @@ -93,9 +112,10 @@ ufo_roof_build_task_setup (UfoTask *task, UfoResources *resources, GError **error) { + guint i; GError *gerr = NULL; - UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task); + RoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task); if (!priv->config) roof_setup_error(error, "ROOF configuration is not specified"); @@ -104,15 +124,65 @@ ufo_roof_build_task_setup (UfoTask *task, if (!priv->cfg) roof_propagate_error(error, gerr, "roof-build-setup: "); + priv->rdi = (RoofReadInterface**)calloc(priv->cfg->n_streams, sizeof(RoofReadInterface*)); + if (!priv->rdi) + roof_setup_error(error, "Failed to allocate memory for RoofReadInterface array"); + + + for (i = 0; i < priv->cfg->n_streams; i++) { + if (priv->simulate) { + if (!priv->path) + roof_setup_error(error, "Path to simulated data should be specified"); + + priv->rdi[i] = ufo_roof_read_file_new(priv->cfg, priv->path, priv->first_file_number + i, &gerr); + } else + priv->rdi[i] = ufo_roof_read_socket_new(priv->cfg, i, &gerr); + + if (!priv->rdi[i]) + roof_propagate_error(error, gerr, "roof_read_interface_new: "); + + priv->rdc[i] = ufo_roof_read_context_new(priv->cfg, priv->rdi[i], &gerr); + if (!priv->rdc[i]) + roof_propagate_error(error, gerr, "roof_read_context_new: "); + } + + + // We try to distribute sockets uniformly respecting sockets_per_thread as maximum limit + priv->n_threads = priv->cfg->n_streams / priv->cfg->sockets_per_thread; + if (priv->cfg->n_streams % priv->cfg->sockets_per_thread) priv->n_threads++; + + guint extra = 0, sockets_per_thread = priv->cfg->n_streams / priv->n_threads; + if (priv->cfg->n_streams % priv->n_threads) extra = priv->cfg->n_streams - priv->n_threads * sockets_per_thread; + + guint from, to; + for (i = 0; i < priv->n_threads; i++) { + guint to = from + sockets_per_thread; + if (i < extra) to++; + + ctx->thr[i]= ufo_roof_thread_new(priv->cfg, priv->rdc, from, to, &gerr); + if (!ctx->thr[i]) roof_propagate_error(error, gerr, "ufo_roof_thread_new (%i): ", i); + } + if (priv->build == BUILD_AUTO) { if (priv->cfg->roof_mode) priv->build = BUILD_SINO; else priv->build = BUILD_RAW; g_object_notify_by_pspec (G_OBJECT(task), properties[PROP_BUILD]); } +/* priv->buf = ufo_roof_buffer_new(priv->cfg, (priv->build == BUILD_RAW)?1:2, priv->number, &gerr); - if (!priv->buf) - roof_propagate_error(error, gerr, "roof-build-setup: "); + if ((gerr)||(!priv->buf)) + roof_propagate_error(error, gerr, "roof_buffer_new: "); + + priv->rd = ufo_roof_read_new(priv->cfg, priv->rdi, priv->buf, &gerr); + if (gerr) + roof_propagate_error(error, gerr, "roof_read_new: "); +*/ + + priv->sched = hw_sched_create(priv->cfg->n_read_threads); + if (!priv->sched) + roof_setup_error(error, "Failed to schedule builder threads"); + clock_gettime(CLOCK_REALTIME, &priv->last_fragment_timestamp); } @@ -120,12 +190,34 @@ ufo_roof_build_task_setup (UfoTask *task, static void ufo_roof_build_task_finalize (GObject *object) { - UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (object); - + RoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (object); + + if (priv->sched) { + hw_sched_destroy(priv->sched); + priv->sched = NULL; + } + +/* + if (priv->rd) { + ufo_roof_read_free(priv->rd); + priv->rd = NULL; + } + if (priv->buf) { ufo_roof_buffer_free(priv->buf); priv->buf = NULL; } +*/ + + if (priv->rdi) { + guint i; + for (i = 0; i < priv->cfg->n_streams; i++) { + if (priv->rdi[i]) + priv->rdi[i]->close(priv->rdi[i]); + } + free(priv->rdi); + priv->rdi = NULL; + } if (priv->cfg) { ufo_roof_config_free(priv->cfg); @@ -137,6 +229,10 @@ ufo_roof_build_task_finalize (GObject *object) priv->config = NULL; } + if (priv->path) { + g_free(priv->path); + priv->path = NULL; + } G_OBJECT_CLASS (ufo_roof_build_task_parent_class)->finalize (object); } @@ -149,9 +245,10 @@ ufo_roof_build_task_get_requisition (UfoTask *task, UfoRequisition *requisition, GError **error) { - UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task); + RoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task); // FIXME: Can we handle data types more elegant? + // FIXME: Kill BUILD_RAW ? if (priv->build == BUILD_RAW) { guint bytes = priv->cfg->dataset_size; requisition->n_dims = 1; @@ -184,100 +281,56 @@ ufo_roof_build_task_get_num_dimensions (UfoTask *task, static UfoTaskMode ufo_roof_build_task_get_mode (UfoTask *task) { - return UFO_TASK_MODE_CPU | UFO_TASK_MODE_REDUCTOR; + return UFO_TASK_MODE_CPU | UFO_TASK_MODE_GENERATOR; } + static gboolean -ufo_roof_build_task_process (UfoTask *task, - UfoBuffer **inputs, +ufo_roof_build_task_generate (UfoTask *task, UfoBuffer *output, UfoRequisition *requisition) { - GError *gerr = NULL; - - UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task); - UfoRoofConfig *cfg = priv->cfg; - UfoRoofBuffer *buf = priv->buf; gboolean ready = FALSE; + gulong seqid; + GError *gerr = NULL; + GValue ival = G_VALUE_INIT; + GValue lval = G_VALUE_INIT; -// UfoRequisition in_req; -// ufo_buffer_get_requisition (inputs[0], &in_req); + RoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task); + RoofConfig *cfg = priv->cfg; + RoofBuffer *buf = priv->buf; - const uint8_t *data = (uint8_t*)ufo_buffer_get_host_array(inputs[0], NULL); - UfoRoofPacketBlockHeader *header = UFO_ROOF_PACKET_BLOCK_HEADER(data, cfg); + void *output_buffer = ufo_buffer_get_host_array(output, NULL); if (priv->stop) return FALSE; - const uint8_t *fragment = data; - for (guint i = 0; i < header->n_packets; i++) { - guint64 packet_id = 0; - // Otherwise considered consecutive and handled by the buffer - if (cfg->header_size >= sizeof(UfoRoofPacketHeader)) { - UfoRoofPacketHeader *pheader = UFO_ROOF_PACKET_HEADER(fragment); - packet_id = be64toh(pheader->packet_id) + 1; - } + priv->current_dataset; + priv->current_buffer = output_buffer; - // FIXME: Can we kill here the dataset finished during the previous step of iteration - ready |= ufo_roof_buffer_set_fragment(buf, header->channel_id, packet_id, fragment + cfg->header_size, &gerr); - if (gerr) roof_print_error(gerr); + err = hw_sched_schedule_thread_task(sched, (void*)&tnv_ctx, ufo_roof_build_task_read); + if (!err) err = hw_sched_wait_task(sched); + if (err) { fprintf(stderr, "Error %i scheduling init threads", err); exit(-1); } - fragment += cfg->max_packet_size; - } - - // FIXME: if 2nd dataset is ready (2nd and 3rd?), skip the first one? - - if (ready) { - clock_gettime(CLOCK_REALTIME, &priv->last_fragment_timestamp); - } else { - struct timespec current_time; - clock_gettime(CLOCK_REALTIME, ¤t_time); - - // No new accepted events within timeout - if (((current_time.tv_sec - priv->last_fragment_timestamp.tv_sec) * 1000000 + (current_time.tv_nsec - priv->last_fragment_timestamp.tv_nsec) / 1000) > cfg->network_timeout) { - ready = ufo_roof_buffer_skip_to_ready(buf); - if (ready) { - // FIXME: shall we really reset timer here? - clock_gettime(CLOCK_REALTIME, &priv->last_fragment_timestamp); - } else { +/* + // FIXME: Wait or break. Not both. + do { + ready = ufo_roof_buffer_wait_dataset(buf, output_buffer, &seqid, cfg->network_timeout, &gerr); + if (gerr) roof_print_error(gerr); + + if (!ready) { + ready = ufo_roof_buffer_skip_to_ready(buf); + if (!ready) { priv->stop = TRUE; g_object_notify_by_pspec (G_OBJECT(task), properties[PROP_STOP]); - } + return FALSE; + } } - } - - -/* - printf("proc (%s - %u of %u) - channel: %i, packets: %i, first dataset: %i\n", ready?"yes":" no", buf->n_fragments[(buf->current_id)%buf->ring_size], buf->fragments_per_dataset, header->channel_id, header->n_packets, - (cfg->header_size >= sizeof(UfoRoofPacketHeader))?UFO_ROOF_PACKET_HEADER(data)->packet_id / (cfg->dataset_size / cfg->payload_size / cfg->n_streams):0); + } while (!ready); */ - return !ready; -} - -static gboolean -ufo_roof_build_task_generate (UfoTask *task, - UfoBuffer *output, - UfoRequisition *requisition) -{ - gboolean ready = FALSE; - gulong seqid; - GError *gerr = NULL; - GValue ival = G_VALUE_INIT; - GValue lval = G_VALUE_INIT; - - UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task); - UfoRoofConfig *cfg = priv->cfg; - UfoRoofBuffer *buf = priv->buf; - - void *output_buffer = ufo_buffer_get_host_array(output, NULL); - - if (priv->stop) - return FALSE; - - ready = ufo_roof_buffer_get_dataset(buf, output_buffer, &seqid, &gerr); - if (gerr) roof_print_error(gerr); + // FIXME: integrate fastwriter somewhere here? if (priv->build == BUILD_UFO) { switch (cfg->bit_depth) { @@ -318,8 +371,13 @@ ufo_roof_build_task_generate (UfoTask *task, g_object_notify_by_pspec (G_OBJECT(task), properties[PROP_STOP]); } + if (ready) priv->generated++; + if (((priv->number > 0)&&(priv->number <= 100))||((buf->current_id - priv->announced) > 1000)) { - printf("Processing dataset %li (%s), next: %u out of %u\n", buf->current_id + (ready?0:1), (ready?"ready ":"timeout "), buf->n_fragments[buf->current_id%buf->ring_size], buf->fragments_per_dataset); + if (ready) + printf("Processing dataset %li (ready ), next : %u out of %u\n", buf->current_id, buf->n_fragments[buf->current_id%buf->ring_size], buf->fragments_per_dataset); + else + printf("Skipping dataset %li (timeout), acquired: %u out of %u\n", buf->current_id + 1, buf->n_fragments[buf->current_id%buf->ring_size], buf->fragments_per_dataset); priv->announced = buf->current_id; } @@ -332,7 +390,7 @@ ufo_roof_build_task_set_property (GObject *object, const GValue *value, GParamSpec *pspec) { - UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (object); + RoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (object); switch (property_id) { case PROP_CONFIG: @@ -345,6 +403,13 @@ ufo_roof_build_task_set_property (GObject *object, case PROP_SIMULATE: priv->simulate = g_value_get_boolean (value); break; + case PROP_PATH: + if (priv->path) g_free(priv->path); + priv->path = g_value_dup_string(value); + break; + case PROP_FIRST: + priv->first_file_number = g_value_get_uint (value); + break; case PROP_NUMBER: priv->number = g_value_get_uint (value); break; @@ -367,11 +432,11 @@ ufo_roof_build_task_get_property (GObject *object, GValue *value, GParamSpec *pspec) { - UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (object); + RoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (object); switch (property_id) { case PROP_CONFIG: - g_value_set_string(value, priv->config); + g_value_set_string(value, priv->config); break; case PROP_STOP: g_value_set_boolean (value, priv->stop); @@ -379,6 +444,12 @@ ufo_roof_build_task_get_property (GObject *object, case PROP_SIMULATE: g_value_set_boolean (value, priv->simulate); break; + case PROP_PATH: + g_value_set_string(value, priv->path?priv->path:""); + break; + case PROP_FIRST: + g_value_set_uint (value, priv->first_file_number); + break; case PROP_NUMBER: g_value_set_uint (value, priv->number); break; @@ -394,17 +465,18 @@ ufo_roof_build_task_get_property (GObject *object, static void ufo_task_interface_init (UfoTaskIface *iface) { + roof_init(); + iface->setup = ufo_roof_build_task_setup; iface->get_num_inputs = ufo_roof_build_task_get_num_inputs; iface->get_num_dimensions = ufo_roof_build_task_get_num_dimensions; iface->get_mode = ufo_roof_build_task_get_mode; iface->get_requisition = ufo_roof_build_task_get_requisition; - iface->process = ufo_roof_build_task_process; iface->generate = ufo_roof_build_task_generate; } static void -ufo_roof_build_task_class_init (UfoRoofBuildTaskClass *klass) +ufo_roof_build_task_class_init (RoofBuildTaskClass *klass) { GObjectClass *oclass = G_OBJECT_CLASS (klass); @@ -426,7 +498,6 @@ ufo_roof_build_task_class_init (UfoRoofBuildTaskClass *klass) FALSE, G_PARAM_READWRITE); - properties[PROP_SIMULATE] = g_param_spec_boolean ("simulate", "Simulation mode", @@ -434,6 +505,20 @@ ufo_roof_build_task_class_init (UfoRoofBuildTaskClass *klass) FALSE, G_PARAM_READWRITE); + properties[PROP_PATH] = + g_param_spec_string ("path", + "Input files for simulation mode", + "Optional path to input files for simulation mode (parameter from configuration file is used if not specified)", + "", + G_PARAM_READWRITE); + + properties[PROP_FIRST] = + g_param_spec_uint ("first_file_number", + "Offset to the first read file", + "Offset to the first read file", + 0, G_MAXUINT, 0, + G_PARAM_READWRITE); + properties[PROP_NUMBER] = g_param_spec_uint("number", "Number of datasets to receive", @@ -452,11 +537,11 @@ ufo_roof_build_task_class_init (UfoRoofBuildTaskClass *klass) for (guint i = PROP_0 + 1; i < N_PROPERTIES; i++) g_object_class_install_property (oclass, i, properties[i]); - g_type_class_add_private (oclass, sizeof(UfoRoofBuildTaskPrivate)); + g_type_class_add_private (oclass, sizeof(RoofBuildTaskPrivate)); } static void -ufo_roof_build_task_init(UfoRoofBuildTask *self) +ufo_roof_build_task_init(RoofBuildTask *self) { self->priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE(self); } |