summaryrefslogtreecommitdiffstats
path: root/src/ufo-roof-read-task.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/ufo-roof-read-task.c')
-rw-r--r--src/ufo-roof-read-task.c68
1 files changed, 52 insertions, 16 deletions
diff --git a/src/ufo-roof-read-task.c b/src/ufo-roof-read-task.c
index 7d55b79..83b9627 100644
--- a/src/ufo-roof-read-task.c
+++ b/src/ufo-roof-read-task.c
@@ -35,7 +35,7 @@
struct _UfoRoofReadTaskPrivate {
gchar *config; // ROOF configuration file name
UfoRoofConfig *cfg; // Parsed ROOF parameters
- UfoRoofReadInterface *reader;
+ UfoRoofReadInterface *reader[16];
guint id; // Reader ID (defince sequential port number)
gboolean stop; // Flag requiring termination
@@ -76,6 +76,7 @@ ufo_roof_read_task_setup (UfoTask *task,
UfoResources *resources,
GError **error)
{
+ guint i;
GError *gerr = NULL;
UfoRoofReadTaskPrivate *priv = UFO_ROOF_READ_TASK_GET_PRIVATE (task);
@@ -91,17 +92,19 @@ ufo_roof_read_task_setup (UfoTask *task,
roof_setup_error(error, "Specified Stream ID is %u, but only %u data streams is configured", priv->id, priv->cfg->n_streams);
// Start actual reader
- if (priv->simulate) {
- if (!priv->path)
- roof_setup_error(error, "Path to simulated data should be specified");
-
- priv->reader = ufo_roof_read_file_new(priv->cfg, priv->path, priv->id + priv->first_file_number, &gerr);
- } else
- priv->reader = ufo_roof_read_socket_new(priv->cfg, priv->id, &gerr);
- if (!priv->reader)
- roof_propagate_error(error, gerr, "roof_read_new: ");
+ for (i = 0; (i < priv->cfg->sockets_per_thread)&&((priv->id * priv->cfg->sockets_per_thread + i) < priv->cfg->n_streams); i++) {
+ if (priv->simulate) {
+ if (!priv->path)
+ roof_setup_error(error, "Path to simulated data should be specified");
+
+ priv->reader[i] = ufo_roof_read_file_new(priv->cfg, priv->path, priv->id * priv->cfg->sockets_per_thread + i + priv->first_file_number, &gerr);
+ } else
+ priv->reader[i] = ufo_roof_read_socket_new(priv->cfg, priv->id * priv->cfg->sockets_per_thread + i, &gerr);
+ if (!priv->reader[i])
+ roof_propagate_error(error, gerr, "roof_read_new: ");
+ }
}
@@ -110,8 +113,10 @@ ufo_roof_read_task_finalize (GObject *object)
{
UfoRoofReadTaskPrivate *priv = UFO_ROOF_READ_TASK_GET_PRIVATE (object);
- if (priv->reader) {
- priv->reader->close(priv->reader);
+ for (guint i = 0; i < priv->cfg->sockets_per_thread; i++) {
+ if (priv->reader[i]) {
+ priv->reader[i]->close(priv->reader[i]);
+ }
}
if (priv->cfg) {
@@ -180,16 +185,47 @@ ufo_roof_read_task_generate (UfoTask *task,
void *output_buffer = ufo_buffer_get_host_array(output, NULL);
UfoRoofPacketBlockHeader *header = UFO_ROOF_PACKET_BLOCK_HEADER(output_buffer, cfg);
+ uint64_t current_id[16] = {0};
+ uint64_t grabbed[16] = {0};
+
+ static uint64_t errors = 0;
+retry:
if (priv->stop)
return FALSE;
- guint packets = priv->reader->read(priv->reader, output_buffer, &gerr);
+ for (guint sid = 0; (sid < cfg->sockets_per_thread)&&((priv->id * cfg->sockets_per_thread + sid) < priv->cfg->n_streams); sid++) {
+
+ guint packets = priv->reader[sid]->read(priv->reader[sid], output_buffer, &gerr);
if (gerr) {
g_warning("Error reciving data: %s", gerr->message);
g_error_free(gerr);
return FALSE;
}
+ const uint8_t *fragment = output_buffer;
+ for (guint i = 0; i < packets; i++) {
+ guint64 packet_id = 0;
+
+ // Otherwise considered consecutive and handled by the buffer
+ if (cfg->header_size >= sizeof(UfoRoofPacketHeader)) {
+ UfoRoofPacketHeader *pheader = UFO_ROOF_PACKET_HEADER(fragment);
+ packet_id = be64toh(pheader->packet_id) + 1;
+ }
+
+ if ((current_id[sid])&&(current_id[sid] + 1 != packet_id)) {
+ printf("Channel %i(%i): =======> Missing %lu packets, expecting %lu but got %lu (N %i from total packets in pack %u)\n", priv->id * cfg->sockets_per_thread + sid, sid, packet_id - current_id[sid] - 1, current_id[sid] + 1, packet_id, i, packets);
+ //if (++errors > 1000) exit(1);
+ }
+ current_id[sid] = packet_id;
+ grabbed[sid]++;
+ if ((grabbed[sid]%1000000)==0) printf("Channel %i(%i): Grabbed %lu Mpackets\n", priv->id * cfg->sockets_per_thread + sid, sid, grabbed[sid]/1000000);
+
+ fragment += cfg->max_packet_size;
+ }
+ }
+
+ goto retry;
+
#ifdef UFO_ROOF_DEBUG
// Store first received packet on each channel...
static int debug = 1;
@@ -206,12 +242,12 @@ ufo_roof_read_task_generate (UfoTask *task,
#endif /* UFO_ROOF_DEBUG */
// FIXME: End of data (shall we restart in the network case?)
- if (!packets)
- return FALSE;
+// if (!packets)
+// return FALSE;
// Shall I use UFO metadata (ufo_buffer_set_metadata) insead?
header->channel_id = priv->id;
- header->n_packets = packets;
+// header->n_packets = packets;
return TRUE;
}