1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
|
#include <stdio.h>
#include <threads.h>
#include <ufo/ufo.h>
#include "ufo-roof-buffer.h"
#include "ufo-roof-read-thread.h"
#include "ufo-roof-read.h"
UfoRoofReadThread *guint ufo_roof_read_thread_new(UfoRoofRead *rd, guint from, guint to, GError **error) {
int i;
GError *gerr = NULL;
UfoRoofReadThread *thr = (UfoRoofReadThread*)calloc(1, sizeof(UfoRoofReadThread));
if (!ctx) roof_new_error(error, "Can't allocate UfoRoofReadThread context");
thr->rdbuf = malloc(cfg->max_packets * cfg->max_packet_size);
if (!thr->rdbuf) {
ufo_roof_read_thread_free(thr);
roof_new_error(error, "Can't allocate memory for temporary packet receiver buffer");
}
thr->rd = rd;
thr->from = from;
thr->to = to;
return thr;
}
void ufo_roof_read_thread_free(UFORoofReadThread *thr, GError **error) {
if (!thr) return;
if (thr->rdbuf) free(thr->rdbuf);
ufo_roof_thread_stop(thr, error);
free(thr);
}
static int ufo_roof_read_thread_run(void *arg) {
GError *gerr = NULL;
UfoRoofReadThread *thr = (UfoRoofReadThread*)arg;
UfoRoofConfig *cfg = thr->rd->cfg;
UfoRoofBuffer *buf = thr->rd->buf;
UfoRoofReadInterface *rdi = thr->rd->rdi;
guint from = thr->from;
guint to = thr->to;
void *rdbuf = thr->rdbuf;
uint64_t current_id[to - from] = {0};
uint64_t grabbed[to - from] = {0};
static uint64_t errors = 0;
while (thr->op != UFO_ROOF_OP_STOP) {
for (guint sid = from; sid < to; sid++) {
// FIXME break waiting on stop? If no packets are send
guint packets = rdi[sid]->read(priv->reader[sid], rdbuf, &gerr);
if (gerr) roof_print_error(gerr);
guint ready = false;
const uint8_t *fragment = (uint8_t*)rdbuf;
for (guint i = 0; i < packets; i++) {
guint64 packet_id = 0;
// Otherwise considered consecutive and handled by the buffer
if (cfg->header_size >= sizeof(UfoRoofPacketHeader)) {
UfoRoofPacketHeader *pheader = UFO_ROOF_PACKET_HEADER(fragment);
packet_id = be64toh(pheader->packet_id) + 1;
}
#ifdef UFO_ROOF_DEBUG
if ((current_id[sid - from])&&(current_id[sid - from] + 1 != packet_id)) {
printf("Channel %i(%i): =======> Missing %lu packets, expecting %lu but got %lu (N %i from total packets in pack %u)\n", priv->id * cfg->sockets_per_thread + sid, sid, packet_id - current_id[sid] - 1, current_id[sid] + 1, packet_id, i, packets);
//if (++errors > 1000) exit(1);
}
current_id[sid - from] = packet_id;
grabbed[sid - from]++;
if ((grabbed[sid - from]%1000000)==0)
printf("Channel %i: Grabbed %lu Mpackets\n", sid, grabbed[sid - from]/1000000);
#endif
ready |= ufo_roof_buffer_set_fragment(buf, sid, packet_id, fragment, &gerr);
if (gerr) roof_print_error(gerr);
fragment += cfg->max_packet_size;
} // fragment-loop
// send notification? Broadcast blocks, we don't want it.
if (ready) {
}
} // socket-loop
} // operation-loop
#ifdef UFO_ROOF_DEBUG
// Store first received packet on each channel...
static int debug = 1;
if (debug) {
char fname[256];
sprintf(fname, "channel%i_packet0.raw", priv->id);
FILE *f = fopen(fname, "w");
if (f) {
fwrite(output_buffer, 1, cfg->max_packets * cfg->max_packet_size, f);
fclose(f);
}
debug = 0;
}
#endif /* UFO_ROOF_DEBUG */
// FIXME: End of data (shall we restart in the network case?)
// if (!packets)
// return FALSE;
// Shall I use UFO metadata (ufo_buffer_set_metadata) insead?
header->channel_id = priv->id;
// header->n_packets = packets;
return TRUE;
}
}
gboolean ufo_roof_read_thread_start(UFORoofReadThread *thr, GError **error) {
int err;
if (!thr) return FALSE;
err = thrd_create(&thr->thread, ufo_roof_read_thread_run, thr);
if (err != thrd_success) roof_setup_error_with_retval(error, FALSE, "Error (%i) spawning new read thread", err);
ctx->launched = TRUE;
return TRUE;
}
gboolean ufo_roof_read_thread_stop(UFORoofReadThread *thr, GError **error) {
int err, ret;
if (!thr) return FALSE;
if (!thr->launched) return TRUE;
// Signal thread termination
err = thrd_join(&thr->thread, &ret);
if (err != thrd_success) roof_setup_error_with_retval(error, FALSE, "Error (%i) waiting for read thread termination", err);
return TRUE;
}
|