summaryrefslogtreecommitdiffstats
path: root/src/roof-buffer.c
blob: 4ca0386217fb3896a84279703c1b1d14ebf90587 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
#include <stdio.h>
#include <stdint.h>
#include <time.h>

#include "glib.h"

#include "ufo-roof.h"
#include "ufo-roof-buffer.h"

// This is currently not thread safe. With dual-filter architecture this will be called sequentially.

RoofBuffer *roof_buffer_new(RoofConfig *cfg, guint n_dims, guint max_datasets, GError **error) {
    if ((n_dims < 1)||(n_dims > 2))
	roof_new_error(error, "Unsupported number of dimmensions %u (only plain and 2D ROOF structure is currently supported)", n_dims);

    RoofBuffer *buffer = (RoofBuffer*)calloc(1, sizeof(RoofBuffer));
    if (!buffer) roof_new_error(error, "Can't allocate RoofBuffer");

    buffer->max_datasets = max_datasets;
    buffer->n_streams = cfg->n_streams;
    buffer->ring_size = cfg->buffer_size;
    buffer->drop_buffers = cfg->drop_buffers;
    buffer->latency_buffers = cfg->latency_buffers;
    buffer->n_dims = n_dims;
    buffer->dataset_size = cfg->dataset_size;
    buffer->dataset_dims[0] = cfg->fan_bins * cfg->bit_depth / 8;
    buffer->dataset_dims[1] = cfg->fan_projections;
    buffer->fragment_size = cfg->payload_size;
    buffer->fragment_dims[0] = cfg->channels_per_module * cfg->bit_depth / 8;
    buffer->fragment_dims[1] = buffer->fragment_size / buffer->fragment_dims[0];

    buffer->fragments_per_dataset = buffer->dataset_size / buffer->fragment_size;
    buffer->fragments_per_stream = buffer->fragments_per_dataset / cfg->n_streams;
//    printf("Configuration: dataset: %u - %u fragments (%u streams x %u) x %u bytes\n", buffer->dataset_size, buffer->fragments_per_dataset, cfg->n_streams, buffer->fragments_per_stream, buffer->fragment_size);

    buffer->ring_buffer = malloc(buffer->ring_size * buffer->dataset_size);
    buffer->n_fragments = (_Atomic guint*)calloc(buffer->ring_size, sizeof(_Atomic int));
    buffer->stream_fragment = (guint*)calloc(cfg->n_streams, sizeof(guint));

#ifdef ROOF_INDEPENDENT_STREAMS
    buffer->first_id = malloc(buffer->n_streams * sizeof(guint64));
    if (!buffer->first_id) roof_new_error(error, "Can't allocate first_id buffer for ROOF datasets");
    for (guint i = 0; i < buffer->n_streams; i++) 
        buffer->first_id[i] = (guint64)-1;
#else
    buffer->first_id = (guint64)-1;
#endif
    buffer->last_id = malloc(buffer->n_streams * sizeof(guint64));

    if ((!buffer->ring_buffer)||(!buffer->n_fragments)||(!buffer->stream_fragment)) {
        roof_buffer_free(buffer);
        roof_new_error(error, "Can't allocate ring buffer for ROOF datasets, total size %u", buffer->ring_size * buffer->dataset_size);
    }

    return buffer;
}

void roof_buffer_free(RoofBuffer *buffer) {
    if (buffer) {
#ifdef ROOF_INDEPENDENT_STREAMS
        if (buffer->first_id)
            free(buffer->first_id);
#endif
        if (buffer->ring_buffer)
            free(buffer->ring_buffer);
        if (buffer->n_fragments)
            free(buffer->n_fragments);
        if (buffer->stream_fragment)
            free(buffer->stream_fragment);

        free(buffer);
    }
}

    // fragment_id is numbered from 1 (0 - means auto)
gboolean roof_buffer_set_fragment(RoofBuffer *buffer, guint stream_id, guint64 fragment_id, gconstpointer fragment, GError **error) {
    gboolean ready = FALSE;
    guint buffer_id;
    guint64 first_id;
    guint64 dataset_id;

    if (!fragment_id) {
        fragment_id = ++buffer->stream_fragment[stream_id];
    }

        // If we have packets of arbitrary size, we would need dataset_id transferred along with packet_id (otherwise complex guessing is required)
    dataset_id = (fragment_id - 1) / buffer->fragments_per_stream;
    fragment_id = (fragment_id - 1) % buffer->fragments_per_stream;

#ifdef ROOF_INDEPENDENT_STREAMS
    if (buffer->first_id[stream_id] == (guint64)-1)
        buffer->first_id[stream_id] = dataset_id;
    first_id = buffer->first_id[stream_id];
#else
    if (buffer->first_id == (guint64)-1)
        buffer->first_id = dataset_id;
    first_id = buffer->first_id;
#endif
    if (dataset_id < first_id)
	return FALSE;

    dataset_id -= first_id;
    buffer_id = dataset_id % buffer->ring_size;

	// FIXME: Currently, this produces too much output. Introduce some kind of debugging mode?
    if (dataset_id < buffer->current_id) {
//        roof_network_error_with_retval(error, FALSE, "Late arrived packet for dataset %li, currently processing %li", dataset_id, buffer->current_id);
	return FALSE;
    }

    if ((buffer->max_datasets)&&(dataset_id >= buffer->max_datasets)) {
//        printf("Stream %i: dataset %li < %li, first_id: %li\n", stream_id, dataset_id, buffer->max_datasets, first_id);
	return FALSE;
    }

    buffer->last_id[stream_id] = dataset_id;

        // We are not fast enough, new packets are arrvining to fast        
    if (dataset_id >= (buffer->current_id + buffer->ring_size)) {
            // FIXME: Broken packets sanity checks? Allocate additional buffers on demand?

	guint64 min_id = (guint64)-1;
	guint64 max_id = 0;
	for (guint i = 0; i < buffer->n_streams; i++) {
	    if (min_id > buffer->last_id[stream_id]) min_id = buffer->last_id[stream_id];
	    if (max_id < buffer->last_id[stream_id]) max_id = buffer->last_id[stream_id];
	}
	printf("Desync: %lu (Min: %lu, Max: %lu), stream: %u (of %u), dataset: %lu\n", max_id - min_id, min_id, max_id, stream_id, buffer->n_streams, dataset_id);
	printf("Parts: %i %i %i %i\n", buffer->n_fragments[buffer_id], buffer->n_fragments[(buffer_id + 1)%buffer->ring_size], buffer->n_fragments[(buffer_id + 2)%buffer->ring_size], buffer->n_fragments[(buffer_id + 3)%buffer->ring_size]);

        root_set_network_error(error, "Ring buffer exhausted. Get packet for dataset %li. Dropping datasets from %li to %li, dataset %li has %i parts of %i completed, already reported %lu",
            dataset_id, buffer->current_id, dataset_id - (buffer->ring_size - buffer->drop_buffers), buffer->current_id, buffer->n_fragments[buffer_id], buffer->fragments_per_dataset, buffer->n_ready);

            // FIXME: Send semi-complete buffers further?
            // FIXME: Or shall we drop more if larger buffers are allocated?
        if ((dataset_id - buffer->current_id) > 2 * buffer->ring_size) {
	    memset(buffer->n_fragments, 0, buffer->ring_size * sizeof(_Atomic guint));
		// FIXME: Can finally received old buffer hit the counter here? Locking?
	    buffer->current_id = dataset_id;
	} else {
	    for (guint i = buffer->current_id; i <= (dataset_id - (buffer->ring_size - buffer->drop_buffers)); i++)
		buffer->n_fragments[i%buffer->ring_size] = 0;
		// FIXME: Can finally received old buffers hit the counters here? Locking?
	    buffer->current_id = dataset_id - (buffer->ring_size - buffer->drop_buffers) + 1;
	}

	    // FIXME: Can other threads obsolete the buffer meanwhile? This is not single threaded any more
	if (buffer->n_fragments[buffer->current_id%buffer->ring_size] == buffer->fragments_per_dataset)
	    ready = TRUE;
	
            // FIXME: In mult-threaded case, we need to ensure that all threads are stopped writting here (and generator is not reading) before we can reassign buffer to the new dataset.
            // To avoid locking, we can store per-thread 'current_id' and only proceed to writting when all per-threads current_ids are equal or above the global value 
            // The updates may happen after writting/reading is finished.
    }

/*
    printf("dataset: %lu (%u) is %u of %u complete, new packet: %lu (%ux%u %lu), packet_size: %u [%x]\n", 
        dataset_id, buffer_id,
	buffer->n_fragments[buffer_id] + 1, buffer->fragments_per_dataset,
	stream_id * buffer->fragments_per_stream + fragment_id, stream_id, buffer->fragments_per_stream, fragment_id, 
	buffer->fragment_size, ((uint32_t*)fragment)[0]
    );
*/

	// FIXME: What if buffer obsolete meanwhile by other threads. We are not single threaded
    uint8_t *dataset_buffer = buffer->ring_buffer + buffer_id * buffer->dataset_size;
    if (buffer->n_dims == 2) {
	uint8_t *fragment_buffer = dataset_buffer + 
		stream_id * buffer->fragment_dims[0] +					// x-coordinate
		(fragment_id * buffer->fragment_dims[1]) * buffer->dataset_dims[0];	// y-coordinate

	for (guint i = 0; i < buffer->fragment_dims[1]; ++i) {
	    memcpy(fragment_buffer + i * buffer->dataset_dims[0], (uint8_t*)fragment + i * buffer->fragment_dims[0], buffer->fragment_dims[0]);
	}
    } else {
	    // 1D stracture, simply putting fragment at the appropriate position in the stream
	uint8_t *fragment_buffer = dataset_buffer + (stream_id * buffer->fragments_per_stream + fragment_id) * buffer->fragment_size;
	memcpy(fragment_buffer, fragment, buffer->fragment_size);
    }

        // FIXME: Sanity checks: verify is not a dublicate fragment?
	// This stuff is anyway is single-threaded which is a problem.
    //atomic_fetch_add(&buffer->n_fragments[buffer_id], 1);
    buffer->n_fragments[buffer_id]++;

    if (buffer->n_fragments[buffer_id] == buffer->fragments_per_dataset) {
            // FIXME: what about a complete dataset blocked by earlier one with only a few framents missing?
        if (dataset_id == buffer->current_id)
	    ready = TRUE;
	else if ((buffer->latency_buffers)&&(dataset_id >= (buffer->current_id + buffer->latency_buffers))) {
	    ready = roof_buffer_skip_to_ready(buffer);
	}

	if (ready) buffer->n_ready++;
    }

    return ready;
}

gboolean roof_buffer_skip_to_ready(RoofBuffer *buffer) {
    for (guint i = 0; i < buffer->ring_size; i++) {
        guint64 id = buffer->current_id + i;
        guint buffer_id = id % buffer->ring_size;

        if (buffer->n_fragments[buffer_id] == buffer->fragments_per_dataset) {
            buffer->current_id = id;
            return TRUE;
        }

	printf("Skipping dataset %lu (%u), only %u of %u fragments are ready\n", id, buffer_id, buffer->n_fragments[buffer_id], buffer->fragments_per_dataset);
	buffer->n_fragments[buffer_id] = 0;
    }

    return FALSE;
}


gboolean roof_buffer_get_dataset(RoofBuffer *buffer, gpointer output_buffer, gulong *seqid, GError **error) {
    guint buffer_id = buffer->current_id % buffer->ring_size;
    void *dataset_buffer = buffer->ring_buffer + buffer_id * buffer->dataset_size;

        // FIXME: what about a complete dataset blocked by earlier one with only a few framents missing?
    if (buffer->n_fragments[buffer_id] < buffer->fragments_per_dataset) return FALSE;

    memcpy(output_buffer, dataset_buffer, buffer->dataset_size);
    if (seqid) *seqid = buffer->current_id;

    buffer->n_fragments[buffer_id] = 0;
    buffer->current_id += 1;

    return TRUE;
}

gboolean roof_buffer_wait_dataset(RoofBuffer *buffer, gpointer output_buffer, gulong *seqid, gulong timeout, GError **error) {
    struct timespec start_time, current_time;
    clock_gettime(CLOCK_REALTIME, &start_time);

    guint buffer_id = buffer->current_id % buffer->ring_size;

        // FIXME: what about a complete dataset blocked by earlier one with only a few framents missing?
    while (buffer->n_fragments[buffer_id] < buffer->fragments_per_dataset) {
	// FIXME: get some sleep? How we can interrupt it?
	// just small nanosleep?
	if (timeout) {
	    clock_gettime(CLOCK_REALTIME, &current_time);
	    gulong wait = (current_time.tv_sec - start_time.tv_sec) * 1000000 +  (current_time.tv_nsec - start_time.tv_nsec) / 1000;
	    if (wait > timeout) return FALSE;
	}
    }

    return roof_buffer_get_dataset(buffer, output_buffer, seqid, error);
}