1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
|
#include <stdio.h>
#include "roof.h"
#include "roof-thread.h"
RoofThreadContext *roof_thread_context_new(RoofConfig *cfg, Roof *roof, guint from, guint to, GError **error) {
GError *gerr = NULL;
RoofThreadContext *rdt = (RoofThreadContext*)calloc(1, sizeof(RoofThreadContext));
if (!rdt) roof_new_error(error, "Can't allocate RoofThreadContext");
rdt->cfg = cfg;
rdt->rdi = rdi;
return rdt;
}
void roof_thread_context_free(RoofThreadContext *rdt) {
if (rdt) {
free(rdt);
}
}
int roof_thread_read_socket(Roof *
int roof_thread_read(HWThread thr, void *hwctx, int id, void *data) {
Roof *ctx = (Roof*)data;
RoofConfig *cfg = ctx->cfg;
RoofThreadContext *rdt = ctx->rdt[id];
guint dataset_dims[2] = { cfg->fan_bins * cfg->bit_depth / 8, cfg->fan_projections };
guint fragment_dims[2] = { cfg->channels_per_module * cfg->bit_depth / 8, ????cfg->payload_size / buffer->fragment_dims[0] };
packet_id
fragment_id =
for (guint stream_id = from; stream_id < to; stream_id++) {
uint8_t *rdbuf;
guint packets = priv->rdi[stream_id]->read(priv->rdi[stream_id], &rdbuf, &gerr);
if (gerr) roof_print_error(gerr);
for (guint i = 0; i < packets; i++) {
guint64 packet_id = 0;
// Otherwise considered consecutive and handled by the buffer
if (cfg->header_size >= sizeof(RoofPacketHeader)) {
RoofPacketHeader *pheader = ROOF_PACKET_HEADER(fragment);
packet_id = be64toh(pheader->packet_id) + 1;
} else {
// FIXME: consider consecutive
//fragment_id = ++buffer->stream_fragment[stream_id];
}
// FIXME: packet may contain fragments for multiple datasets
fragment_id = packet_id * fragments_per_packet;
guint64 dataset_id = (packet_id - 1) / cfg->fragments_per_stream;
guint64 fragment_id = (packet_id - 1) % cfg->fragments_per_stream;
// FIXME: verify that packet is consecutive
// if
// Drop packets of already skipped datasets
if (dataset_id < priv->current_id)
continue;
// FIXME: stop processing and return....
if (dataset_id < priv->current_id)
uint8_t *fragment_buffer = dataset_buffer +
stream_id * fragment_dims[0] + // x-coordinate
(fragment_id * fragment_dims[1]) * dataset_dims[0]; // y-coordinate
for (guint i = 0; i < buffer->fragment_dims[1]; ++i) {
memcpy(fragment_buffer + i * buffer->dataset_dims[0], (uint8_t*)fragment + i * buffer->fragment_dims[0], buffer->fragment_dims[0]);
// buffer->last_id[stream_id] = dataset_id;
ready |= roof_buffer_set_fragment(buf, sid, packet_id, fragment, &gerr);
if (gerr) roof_print_error(gerr);
fragment += cfg->max_packet_size;
}
}
}
|