#include #include #include #include "ufo-roof-buffer.h" #include "ufo-roof-read-thread.h" #include "ufo-roof-read.h" UfoRoofReadThread *guint ufo_roof_read_thread_new(UfoRoofRead *rd, guint from, guint to, GError **error) { int i; GError *gerr = NULL; UfoRoofReadThread *thr = (UfoRoofReadThread*)calloc(1, sizeof(UfoRoofReadThread)); if (!ctx) roof_new_error(error, "Can't allocate UfoRoofReadThread context"); thr->rdbuf = malloc(cfg->max_packets * cfg->max_packet_size); if (!thr->rdbuf) { ufo_roof_read_thread_free(thr); roof_new_error(error, "Can't allocate memory for temporary packet receiver buffer"); } thr->rd = rd; thr->from = from; thr->to = to; return thr; } void ufo_roof_read_thread_free(UFORoofReadThread *thr, GError **error) { if (!thr) return; if (thr->rdbuf) free(thr->rdbuf); ufo_roof_thread_stop(thr, error); free(thr); } static int ufo_roof_read_thread_run(void *arg) { GError *gerr = NULL; UfoRoofReadThread *thr = (UfoRoofReadThread*)arg; UfoRoofConfig *cfg = thr->rd->cfg; UfoRoofBuffer *buf = thr->rd->buf; UfoRoofReadInterface *rdi = thr->rd->rdi; guint from = thr->from; guint to = thr->to; void *rdbuf = thr->rdbuf; uint64_t current_id[to - from] = {0}; uint64_t grabbed[to - from] = {0}; static uint64_t errors = 0; while (thr->op != UFO_ROOF_OP_STOP) { for (guint sid = from; sid < to; sid++) { // FIXME break waiting on stop? If no packets are send guint packets = rdi[sid]->read(priv->reader[sid], rdbuf, &gerr); if (gerr) roof_print_error(gerr); guint ready = false; const uint8_t *fragment = (uint8_t*)rdbuf; for (guint i = 0; i < packets; i++) { guint64 packet_id = 0; // Otherwise considered consecutive and handled by the buffer if (cfg->header_size >= sizeof(UfoRoofPacketHeader)) { UfoRoofPacketHeader *pheader = UFO_ROOF_PACKET_HEADER(fragment); packet_id = be64toh(pheader->packet_id) + 1; } #ifdef UFO_ROOF_DEBUG if ((current_id[sid - from])&&(current_id[sid - from] + 1 != packet_id)) { printf("Channel %i(%i): =======> Missing %lu packets, expecting %lu but got %lu (N %i from total packets in pack %u)\n", priv->id * cfg->sockets_per_thread + sid, sid, packet_id - current_id[sid] - 1, current_id[sid] + 1, packet_id, i, packets); //if (++errors > 1000) exit(1); } current_id[sid - from] = packet_id; grabbed[sid - from]++; if ((grabbed[sid - from]%1000000)==0) printf("Channel %i: Grabbed %lu Mpackets\n", sid, grabbed[sid - from]/1000000); #endif ready |= ufo_roof_buffer_set_fragment(buf, sid, packet_id, fragment, &gerr); if (gerr) roof_print_error(gerr); fragment += cfg->max_packet_size; } // fragment-loop // send notification? Broadcast blocks, we don't want it. if (ready) { } } // socket-loop } // operation-loop #ifdef UFO_ROOF_DEBUG // Store first received packet on each channel... static int debug = 1; if (debug) { char fname[256]; sprintf(fname, "channel%i_packet0.raw", priv->id); FILE *f = fopen(fname, "w"); if (f) { fwrite(output_buffer, 1, cfg->max_packets * cfg->max_packet_size, f); fclose(f); } debug = 0; } #endif /* UFO_ROOF_DEBUG */ // FIXME: End of data (shall we restart in the network case?) // if (!packets) // return FALSE; // Shall I use UFO metadata (ufo_buffer_set_metadata) insead? header->channel_id = priv->id; // header->n_packets = packets; return TRUE; } } gboolean ufo_roof_read_thread_start(UFORoofReadThread *thr, GError **error) { int err; if (!thr) return FALSE; err = thrd_create(&thr->thread, ufo_roof_read_thread_run, thr); if (err != thrd_success) roof_setup_error_with_retval(error, FALSE, "Error (%i) spawning new read thread", err); ctx->launched = TRUE; return TRUE; } gboolean ufo_roof_read_thread_stop(UFORoofReadThread *thr, GError **error) { int err, ret; if (!thr) return FALSE; if (!thr->launched) return TRUE; // Signal thread termination err = thrd_join(&thr->thread, &ret); if (err != thrd_success) roof_setup_error_with_retval(error, FALSE, "Error (%i) waiting for read thread termination", err); return TRUE; }