1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
|
#include <stdio.h>
#include "roof.h"
#include "roof-read.h"
RoofReadContext *roof_read_context_new(RoofConfig *cfg, RoofReadInterface *rdi, GError **error) {
guint i;
GError *gerr = NULL;
RoofReadContext *rdc = (RoofReadContext*)calloc(1, sizeof(RoofReadContext));
if (!rdc) roof_new_error(error, "Can't allocate RoofReadContext");
rdc->cfg = cfg;
rdc->rdi = rdi;
return rdc;
}
void roof_read_context_free(RoofReadContext *rdc) {
if (rdc) {
free(rdc);
}
}
void roof_read(RoofReadContext *rdc, RoofBufferInterface *bfi, GError *error) {
uint8_t *rdbuf = rdc->rdbuf;
guint n_packets = rdc->n_packets;
guint packet_id = rdc->packet_id;
guint fragment_id = rdc->fragment_id;
GError *gerr = NULL;
ReadRoofInterface *rdi = rdc->rdi;
if (rdc->rdbuf) {
rdbuf = rdc->rdbuf;
packets
else {
guint packets = rdi->read(rdi[stream_id], &rdbuf, &gerr);
if (gerr) roof_propagate_error(error, gerr, "roof_read: ");
packet_id = 0;
fragment_id = 0;
}
packet = rdbuf + packet_id * (size + padding
while (packet) {
while (rdbuf) {
//
for (guint i = 0; i < packets; i++) {
guint64 packet_id = 0;
// Otherwise considered consecutive and handled by the buffer
if (cfg->header_size >= sizeof(RoofPacketHeader)) {
RoofPacketHeader *pheader = ROOF_PACKET_HEADER(fragment);
packet_id = be64toh(pheader->packet_id) + 1;
} else {
// FIXME: consider consecutive
//fragment_id = ++buffer->stream_fragment[stream_id];
}
// FIXME: packet may contain fragments for multiple datasets
fragment_id = packet_id * fragments_per_packet;
guint64 dataset_id = (packet_id - 1) / cfg->fragments_per_stream;
guint64 fragment_id = (packet_id - 1) % cfg->fragments_per_stream;
// FIXME: verify that packet is consecutive
// if
// Drop packets of already skipped datasets
if (dataset_id < priv->current_id)
continue;
// FIXME: stop processing and return....
if (dataset_id < priv->current_id)
uint8_t *fragment_buffer = dataset_buffer +
stream_id * fragment_dims[0] + // x-coordinate
(fragment_id * fragment_dims[1]) * dataset_dims[0]; // y-coordinate
for (guint i = 0; i < buffer->fragment_dims[1]; ++i) {
memcpy(fragment_buffer + i * buffer->dataset_dims[0], (uint8_t*)fragment + i * buffer->fragment_dims[0], buffer->fragment_dims[0]);
// buffer->last_id[stream_id] = dataset_id;
ready |= roof_buffer_set_fragment(buf, sid, packet_id, fragment, &gerr);
if (gerr) roof_print_error(gerr);
fragment += cfg->max_packet_size;
}
}
void *buf = roof_buffer_get_dataset(bfi, ?);
// Computing offsets each time, etc.
roof_buffer_set_fragment
}
|