summaryrefslogtreecommitdiffstats
path: root/src/ufo-roof-buffer.c
blob: bac940c10b3b9c601840bbaa2f1361c6c4352765 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
#include <stdio.h>
#include <stdint.h>
#include <time.h>

#include "glib.h"

#include "ufo-roof.h"
#include "ufo-roof-buffer.h"

// This is currently not thread safe. With dual-filter architecture this will be called sequentially.

UfoRoofBuffer *ufo_roof_buffer_new(UfoRoofConfig *cfg, guint n_dims, guint max_datasets, GError **error) {
    if ((n_dims < 1)||(n_dims > 2))
	roof_new_error(error, "Unsupported number of dimmensions %u (only plain and 2D ROOF structure is currently supported)", n_dims);

    UfoRoofBuffer *buffer = (UfoRoofBuffer*)calloc(1, sizeof(UfoRoofBuffer));
    if (!buffer) roof_new_error(error, "Can't allocate UfoRoofBuffer");

    buffer->max_datasets = max_datasets;
    buffer->ring_size = cfg->buffer_size;
    buffer->drop_buffers = cfg->drop_buffers;
    buffer->n_dims = n_dims;
    buffer->dataset_size = cfg->dataset_size;
    buffer->dataset_dims[0] = cfg->fan_bins * cfg->bit_depth / 8;
    buffer->dataset_dims[1] = cfg->fan_projections;
    buffer->fragment_size = cfg->payload_size;
    buffer->fragment_dims[0] = cfg->channels_per_module * cfg->bit_depth / 8;
    buffer->fragment_dims[1] = buffer->fragment_size / buffer->fragment_dims[0];

    buffer->fragments_per_dataset = buffer->dataset_size / buffer->fragment_size;
    buffer->fragments_per_stream = buffer->fragments_per_dataset / cfg->n_streams;
//    printf("Configuration: dataset: %u - %u fragments (%u streams x %u) x %u bytes\n", buffer->dataset_size, buffer->fragments_per_dataset, cfg->n_streams, buffer->fragments_per_stream, buffer->fragment_size);

    buffer->ring_buffer = malloc(buffer->ring_size * buffer->dataset_size);
    buffer->n_fragments = (_Atomic guint*)calloc(buffer->ring_size, sizeof(_Atomic int));
    buffer->stream_fragment = (guint*)calloc(cfg->n_streams, sizeof(guint));

    if ((!buffer->ring_buffer)||(!buffer->n_fragments)||(!buffer->stream_fragment)) {
        ufo_roof_buffer_free(buffer);
        roof_new_error(error, "Can't allocate ring buffer for ROOF datasets, total size %u", buffer->ring_size * buffer->dataset_size);
    }

    return buffer;
}

void ufo_roof_buffer_free(UfoRoofBuffer *buffer) {
    if (buffer) {
        if (buffer->ring_buffer)
            free(buffer->ring_buffer);
        if (buffer->n_fragments)
            free(buffer->n_fragments);
        if (buffer->stream_fragment)
            free(buffer->stream_fragment);

        free(buffer);
    }
}

    // fragment_id is numbered from 1 (0 - means auto)
gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, guint fragment_id, gconstpointer fragment, GError **error) {
    gboolean ready = FALSE;
    guint buffer_id;
    guint dataset_id;

    if (!fragment_id) {
        fragment_id = ++buffer->stream_fragment[stream_id];
    }

        // If we have packets of arbitrary size, we would need dataset_id transferred along with packet_id (otherwise complex guessing is required)
    dataset_id = (fragment_id - 1) / buffer->fragments_per_stream;
    fragment_id = (fragment_id - 1) % buffer->fragments_per_stream;
    buffer_id = dataset_id % buffer->ring_size;

	// FIXME: Currently, this produces too much output. Introduce some kind of debugging mode?
    if (dataset_id < buffer->current_id)
	return FALSE;
//        roof_network_error_with_retval(error, FALSE, "Late arrived packet for dataset %i, currently processing %i", dataset_id, buffer->current_id);

    if ((buffer->max_datasets)&&(dataset_id >= buffer->max_datasets))
	return FALSE;

        // We are not fast enough, new packets are arrvining to fast        
    if (dataset_id >= (buffer->current_id + buffer->ring_size)) {
            // FIXME: Broken packets sanity checks? Allocate additional buffers on demand?

        root_set_network_error(error, "Ring buffer exhausted. Get packet for dataset %i. Dropping datasets from %i to %i, dataset %i has %i parts of %i completed",
            dataset_id, buffer->current_id, dataset_id - (buffer->ring_size - buffer->drop_buffers), buffer->current_id, buffer->n_fragments[buffer_id], buffer->fragments_per_dataset);

            // FIXME: Send semi-complete buffers further?
            // FIXME: Or shall we drop more if larger buffers are allocated?
        if ((dataset_id - buffer->current_id) > 2 * buffer->ring_size) {
	    memset(buffer->n_fragments, 0, buffer->ring_size * sizeof(_Atomic guint));
	    buffer->current_id = dataset_id;
	} else {
	    for (guint i = buffer->current_id; i <= (dataset_id - (buffer->ring_size - buffer->drop_buffers)); i++)
		buffer->n_fragments[i%buffer->ring_size] = 0;
	    buffer->current_id = dataset_id - (buffer->ring_size - buffer->drop_buffers) + 1;
	}

	if (buffer->n_fragments[buffer->current_id%buffer->ring_size] == buffer->fragments_per_dataset)
	    ready = TRUE;
	
            // FIXME: In mult-threaded case, we need to ensure that all threads are stopped writting here (and generator is not reading) before we can reassign buffer to the new dataset.
            // To avoid locking, we can store per-thread 'current_id' and only proceed to writting when all per-threads current_ids are equal or above the global value 
            // The updates may happen after writting/reading is finished.
    }

/*    printf("buffer: %u (%u), packet: %u (%ux%u %u), packet_size: %u [%x]\n", 
        buffer_id, dataset_id, stream_id * buffer->fragments_per_stream + fragment_id, stream_id, buffer->fragments_per_stream, fragment_id, buffer->fragment_size,
        ((uint32_t*)fragment)[0]
    );*/

    uint8_t *dataset_buffer = buffer->ring_buffer + buffer_id * buffer->dataset_size;
    if (buffer->n_dims == 2) {
	uint8_t *fragment_buffer = dataset_buffer + 
		stream_id * buffer->fragment_dims[0] +					// x-coordinate
		(fragment_id * buffer->fragment_dims[1]) * buffer->dataset_dims[0];	// y-coordinate

	for (int i = 0; i < buffer->fragment_dims[1]; ++i) {
	    memcpy(fragment_buffer + i * buffer->dataset_dims[0], fragment + i * buffer->fragment_dims[0], buffer->fragment_dims[0]);
	}
    } else {
	    // 1D stracture, simply putting fragment at the appropriate position in the stream
	uint8_t *fragment_buffer = dataset_buffer + (stream_id * buffer->fragments_per_stream + fragment_id) * buffer->fragment_size;
	memcpy(fragment_buffer, fragment, buffer->fragment_size);
    }

        // FIXME: Sanity checks: verify is not a dublicate fragment?
    atomic_fetch_add(&buffer->n_fragments[buffer_id], 1);

    if (buffer->n_fragments[buffer_id] == buffer->fragments_per_dataset) {
            // FIXME: what about a complete dataset blocked by earlier one with only a few framents missing?
        if (dataset_id == buffer->current_id)
	    ready = TRUE;
    }
    
    return ready;
}



gboolean ufo_roof_buffer_get_dataset(UfoRoofBuffer *buffer, gpointer output_buffer, GError **error) {
    guint buffer_id = buffer->current_id % buffer->ring_size;
    void *dataset_buffer = buffer->ring_buffer + buffer_id * buffer->dataset_size;

        // FIXME: what about a complete dataset blocked by earlier one with only a few framents missing?
    if (buffer->n_fragments[buffer_id] < buffer->fragments_per_dataset) return FALSE;

    memcpy(output_buffer, dataset_buffer, buffer->dataset_size);

    buffer->n_fragments[buffer_id] = 0;
    buffer->current_id += 1;

    return TRUE;    
}