1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
|
#include <stdio.h>
#include <assert.h>
#include <ufo/ufo.h>
#include "ufo-roof-buffer.h"
#include "ufo-roof-read-thread.h"
#include "ufo-roof-read.h"
#include <errno.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
static void ufo_roof_read_optimize(UfoRoofConfig *cfg) {
// FIXME: request real-time permissions?
// FIXME: do we need this?
/*
uint32_t lat = 0;
int fd = open("/dev/cpu_dma_latency", O_RDWR);
if (fd == -1) {
fprintf(stderr, "Failed to open cpu_dma_latency: error %s", strerror(errno));
} else {
write(fd, &lat, sizeof(lat));
close(fd);
}
*/
}
const UfoRoofReadInterfaceSettings *ufo_roof_read_get_settings(UFORoofRead *ctx, GError **error) {
assert(ctx);
return ctx->settings;
}
UfoRoofRead *ufo_roof_read_new(UfoRoofConfig *cfg, UfoRoofReadInterface *rdi, UfoRoofBuffer *buf, GError **error) {
guint i;
GError *gerr = NULL;
UfoRoofRead *ctx = (UfoRoofRead*)calloc(1, sizeof(UfoRoofRead));
if (!ctx) roof_new_error(error, "Can't allocate UfoRoofRead context");
ufo_roof_read_optimize(ctx);
ctx->n_threads = cfg->n_read_threads;
ctx->cfg = cfg;
ctx->buf = buf;
ctx->rdi = rdi;
ctx->thr = (UfoRoofReadThread*)calloc(cfg->n_read_threads, sizeof(UfoRoofReadThread));
if (!ctx->thr) {
ufo_roof_read_free(ctx);
roof_new_error(error, "Error allocating UfoRoofReadThread contexts");
}
// We try to distribute sockets uniformly respecting sockets_per_thread as maximum limit
guint n_threads = priv->cfg->n_streams / priv->cfg->sockets_per_thread;
if (priv->cfg->n_streams % priv->cfg->sockets_per_thread) n_threads++;
ctx->n_threads = n_threads;
guint extra = 0, sockets_per_thread = priv->cfg->n_streams / n_threads;
if (priv->cfg->n_streams % n_threads) extra = priv->cfg->n_streams - n_threads * sockets_per_thread;
guint from, to;
for (i = 0, from = 0; i < n_threads; i++, from = to) {
guint to = from + sockets_per_thread;
if (i < extra) to++;
ctx->thr[i]= ufo_roof_thread_new(ctx, from, to, &gerr);
if (!ctx->thr[i]) roof_propagate_error(error, gerr, "ufo_roof_thread_new (%i): ", i);
}
return ctx;
}
void ufo_roof_read_free(UfoRoofRead *ctx) {
if (!ctx) return;
if (ctx->thr) {
int i;
ufo_roof_read_stop(ctx);
for (i = 0; i < ctx->n_threads; i++) {
if (ctx->thr[i])
ufo_roof_read_thread_free(ctx->thr[i]);
}
free(ctx->thr);
}
free(ctx);
}
gboolean ufo_roof_read_start(UFORoofRead *ctx, GError **error) {
gboolean ret;
GError *gerr;
if ((!ctx)||(!ctx->thr)) return FALSE;
for (int i = 0; i < ctx->n_threads; i++) {
if (!ctx->thr[i]) return FALSE;
ret = ufo_roof_read_thread_start(ctx, &gerr);
if (!ret) roof_propagate_error_with_retval(error, FALSE, gerr, "ufo_roof_read_thread_start (%i): ", i);
}
return TRUE;
}
gboolean ufo_roof_read_stop(UFORoofRead *ctx, GError **error) {
gboolean ret, res = FALSE;
GError *gerr;
if ((!ctx)||(!ctx->thr)) return FALSE;
for (int i = 0; i < ctx->n_threads; i++) {
if (!ctx->thr[i]) return FALSE;
ret = ufo_roof_read_thread_stop(ctx, &gerr);
if (!ret) g_propagate_perfixed_error(error, gerr, "ufo_roof_read_thread_stop (%i): ", i);
res |= !ret;
}
return !res;
}
|