summaryrefslogtreecommitdiffstats
path: root/src/ufo-roof-buffer.c
blob: 0e0a8901f44dfc7dc3a7eab21e85f053760e0334 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
#include <stdio.h>
#include <stdint.h>
#include <time.h>

#include "glib.h"

#include "ufo-roof.h"
#include "ufo-roof-buffer.h"

// This is currently not thread safe. With dual-filter architecture this will be called sequentially.

UfoRoofBuffer *ufo_roof_buffer_new(UfoRoofConfig *cfg, guint n_dims, guint max_datasets, GError **error) {
    if ((n_dims < 1)||(n_dims > 2))
	roof_new_error(error, "Unsupported number of dimmensions %u (only plain and 2D ROOF structure is currently supported)", n_dims);

    UfoRoofBuffer *buffer = (UfoRoofBuffer*)calloc(1, sizeof(UfoRoofBuffer));
    if (!buffer) roof_new_error(error, "Can't allocate UfoRoofBuffer");

    buffer->max_datasets = max_datasets;
    buffer->ring_size = cfg->buffer_size;
    buffer->drop_buffers = cfg->drop_buffers;
    buffer->latency_buffers = cfg->latency_buffers;
    buffer->n_dims = n_dims;
    buffer->dataset_size = cfg->dataset_size;
    buffer->dataset_dims[0] = cfg->fan_bins * cfg->bit_depth / 8;
    buffer->dataset_dims[1] = cfg->fan_projections;
    buffer->fragment_size = cfg->payload_size;
    buffer->fragment_dims[0] = cfg->channels_per_module * cfg->bit_depth / 8;
    buffer->fragment_dims[1] = buffer->fragment_size / buffer->fragment_dims[0];

    buffer->fragments_per_dataset = buffer->dataset_size / buffer->fragment_size;
    buffer->fragments_per_stream = buffer->fragments_per_dataset / cfg->n_streams;
//    printf("Configuration: dataset: %u - %u fragments (%u streams x %u) x %u bytes\n", buffer->dataset_size, buffer->fragments_per_dataset, cfg->n_streams, buffer->fragments_per_stream, buffer->fragment_size);

    buffer->ring_buffer = malloc(buffer->ring_size * buffer->dataset_size);
    buffer->n_fragments = (_Atomic guint*)calloc(buffer->ring_size, sizeof(_Atomic int));
    buffer->stream_fragment = (guint*)calloc(cfg->n_streams, sizeof(guint));

#ifdef UFO_ROOF_INDEPENDENT_STREAMS
    buffer->first_id = malloc(buffer->ring_size * sizeof(guint64));
    if (!buffer->first_id) roof_new_error(error, "Can't allocate first_id buffer for ROOF datasets");
    for (guint i = 0; i < buffer->ring_size; i++) 
        buffer->first_id[i] = (guint64)-1;
#else
    buffer->first_id = (guint64)-1;
#endif

    if ((!buffer->ring_buffer)||(!buffer->n_fragments)||(!buffer->stream_fragment)) {
        ufo_roof_buffer_free(buffer);
        roof_new_error(error, "Can't allocate ring buffer for ROOF datasets, total size %u", buffer->ring_size * buffer->dataset_size);
    }

    return buffer;
}

void ufo_roof_buffer_free(UfoRoofBuffer *buffer) {
    if (buffer) {
#ifdef UFO_ROOF_INDEPENDENT_STREAMS
        if (buffer->first_id)
            free(buffer->first_id);
#endif
        if (buffer->ring_buffer)
            free(buffer->ring_buffer);
        if (buffer->n_fragments)
            free(buffer->n_fragments);
        if (buffer->stream_fragment)
            free(buffer->stream_fragment);

        free(buffer);
    }
}

    // fragment_id is numbered from 1 (0 - means auto)
gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, guint64 fragment_id, gconstpointer fragment, GError **error) {
    gboolean ready = FALSE;
    guint buffer_id;
    guint64 first_id;
    guint64 dataset_id;

    if (!fragment_id) {
        fragment_id = ++buffer->stream_fragment[stream_id];
    }

        // If we have packets of arbitrary size, we would need dataset_id transferred along with packet_id (otherwise complex guessing is required)
    dataset_id = (fragment_id - 1) / buffer->fragments_per_stream;
    fragment_id = (fragment_id - 1) % buffer->fragments_per_stream;

#ifdef UFO_ROOF_INDEPENDENT_STREAMS
    if (buffer->first_id[stream_id] == (guint64)-1)
        buffer->first_id[stream_id] = dataset_id;
    first_id = buffer->first_id[stream_id];
#else
    if (buffer->first_id == (guint64)-1)
        buffer->first_id = dataset_id;
    first_id = buffer->first_id;
#endif
    if (dataset_id < first_id)
	return FALSE;

    dataset_id -= first_id;
    buffer_id = dataset_id % buffer->ring_size;

	// FIXME: Currently, this produces too much output. Introduce some kind of debugging mode?
    if (dataset_id < buffer->current_id) {
        roof_network_error_with_retval(error, FALSE, "Late arrived packet for dataset %li, currently processing %li", dataset_id, buffer->current_id);
	return FALSE;
    }

    if ((buffer->max_datasets)&&(dataset_id >= buffer->max_datasets)) {
//        printf("Stream %i: dataset %li < %li, first_id: %li\n", stream_id, dataset_id, buffer->max_datasets, first_id);
	return FALSE;
    }
        // We are not fast enough, new packets are arrvining to fast        
    if (dataset_id >= (buffer->current_id + buffer->ring_size)) {
            // FIXME: Broken packets sanity checks? Allocate additional buffers on demand?

        root_set_network_error(error, "Ring buffer exhausted. Get packet for dataset %li. Dropping datasets from %li to %li, dataset %li has %i parts of %i completed",
            dataset_id, buffer->current_id, dataset_id - (buffer->ring_size - buffer->drop_buffers), buffer->current_id, buffer->n_fragments[buffer_id], buffer->fragments_per_dataset);

            // FIXME: Send semi-complete buffers further?
            // FIXME: Or shall we drop more if larger buffers are allocated?
        if ((dataset_id - buffer->current_id) > 2 * buffer->ring_size) {
	    memset(buffer->n_fragments, 0, buffer->ring_size * sizeof(_Atomic guint));
	    buffer->current_id = dataset_id;
	} else {
	    for (guint i = buffer->current_id; i <= (dataset_id - (buffer->ring_size - buffer->drop_buffers)); i++)
		buffer->n_fragments[i%buffer->ring_size] = 0;
	    buffer->current_id = dataset_id - (buffer->ring_size - buffer->drop_buffers) + 1;
	}

	if (buffer->n_fragments[buffer->current_id%buffer->ring_size] == buffer->fragments_per_dataset)
	    ready = TRUE;
	
            // FIXME: In mult-threaded case, we need to ensure that all threads are stopped writting here (and generator is not reading) before we can reassign buffer to the new dataset.
            // To avoid locking, we can store per-thread 'current_id' and only proceed to writting when all per-threads current_ids are equal or above the global value 
            // The updates may happen after writting/reading is finished.
    }

/*
    printf("dataset: %lu (%u) is %u of %u complete, new packet: %lu (%ux%u %lu), packet_size: %u [%x]\n", 
        dataset_id, buffer_id,
	buffer->n_fragments[buffer_id] + 1, buffer->fragments_per_dataset,
	stream_id * buffer->fragments_per_stream + fragment_id, stream_id, buffer->fragments_per_stream, fragment_id, 
	buffer->fragment_size, ((uint32_t*)fragment)[0]
    );
*/

    uint8_t *dataset_buffer = buffer->ring_buffer + buffer_id * buffer->dataset_size;
    if (buffer->n_dims == 2) {
	uint8_t *fragment_buffer = dataset_buffer + 
		stream_id * buffer->fragment_dims[0] +					// x-coordinate
		(fragment_id * buffer->fragment_dims[1]) * buffer->dataset_dims[0];	// y-coordinate

	for (guint i = 0; i < buffer->fragment_dims[1]; ++i) {
	    memcpy(fragment_buffer + i * buffer->dataset_dims[0], (uint8_t*)fragment + i * buffer->fragment_dims[0], buffer->fragment_dims[0]);
	}
    } else {
	    // 1D stracture, simply putting fragment at the appropriate position in the stream
	uint8_t *fragment_buffer = dataset_buffer + (stream_id * buffer->fragments_per_stream + fragment_id) * buffer->fragment_size;
	memcpy(fragment_buffer, fragment, buffer->fragment_size);
    }

        // FIXME: Sanity checks: verify is not a dublicate fragment?
    atomic_fetch_add(&buffer->n_fragments[buffer_id], 1);

    if (buffer->n_fragments[buffer_id] == buffer->fragments_per_dataset) {
            // FIXME: what about a complete dataset blocked by earlier one with only a few framents missing?
        if (dataset_id == buffer->current_id)
	    ready = TRUE;
	else if ((buffer->latency_buffers)&&(dataset_id >= (buffer->current_id + buffer->latency_buffers)))
	    ready = ufo_roof_buffer_skip_to_ready(buffer);
    }
    
    return ready;
}

gboolean ufo_roof_buffer_skip_to_ready(UfoRoofBuffer *buffer) {
    for (guint i = 0; i < buffer->ring_size; i++) {
        guint64 id = buffer->current_id + i;
        guint buffer_id = id % buffer->ring_size;

        if (buffer->n_fragments[buffer_id] == buffer->fragments_per_dataset) {
            buffer->current_id = id;
            return TRUE;
        }

//	printf("Skipping event %lu (%u), only %u of %u fragments are ready\n", id, buffer_id, buffer->n_fragments[buffer_id], buffer->fragments_per_dataset);
	buffer->n_fragments[buffer_id] = 0;
    }

    return FALSE;
}


gboolean ufo_roof_buffer_get_dataset(UfoRoofBuffer *buffer, gpointer output_buffer, gulong *seqid, GError **error) {
    guint buffer_id = buffer->current_id % buffer->ring_size;
    void *dataset_buffer = buffer->ring_buffer + buffer_id * buffer->dataset_size;

        // FIXME: what about a complete dataset blocked by earlier one with only a few framents missing?
    if (buffer->n_fragments[buffer_id] < buffer->fragments_per_dataset) return FALSE;

    memcpy(output_buffer, dataset_buffer, buffer->dataset_size);
    if (seqid) *seqid = buffer->current_id;

    buffer->n_fragments[buffer_id] = 0;
    buffer->current_id += 1;

    return TRUE;
}