summaryrefslogtreecommitdiffstats
path: root/src/save/ufo-roof-read.c
blob: f3d790dadd4420a75b75b975e9cc7a818495c3e4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
#include <stdio.h>
#include <assert.h>

#include <ufo/ufo.h>

#include "ufo-roof-buffer.h"
#include "ufo-roof-read-thread.h"
#include "ufo-roof-read.h"



#include <errno.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>

static void ufo_roof_read_optimize(UfoRoofConfig *cfg) {
	// FIXME: request real-time permissions?
	// FIXME: do we need this?
/*
    uint32_t lat = 0;
    int fd = open("/dev/cpu_dma_latency", O_RDWR);
    if (fd == -1) {
	fprintf(stderr, "Failed to open cpu_dma_latency: error %s", strerror(errno));
    } else {
	write(fd, &lat, sizeof(lat));
	close(fd);
    }
*/
}


const UfoRoofReadInterfaceSettings *ufo_roof_read_get_settings(UFORoofRead *ctx, GError **error) {
    assert(ctx);
    return ctx->settings;
}


UfoRoofRead *ufo_roof_read_new(UfoRoofConfig *cfg, UfoRoofReadInterface *rdi, UfoRoofBuffer *buf, GError **error) {
    guint i;
    GError *gerr = NULL;

    UfoRoofRead *ctx = (UfoRoofRead*)calloc(1, sizeof(UfoRoofRead));
    if (!ctx) roof_new_error(error, "Can't allocate UfoRoofRead context");

    ufo_roof_read_optimize(ctx);

    ctx->n_threads = cfg->n_read_threads;
    ctx->cfg = cfg;
    ctx->buf = buf;
    ctx->rdi = rdi;

    ctx->thr = (UfoRoofReadThread*)calloc(cfg->n_read_threads, sizeof(UfoRoofReadThread));
    if (!ctx->thr) {
	ufo_roof_read_free(ctx);
	roof_new_error(error, "Error allocating UfoRoofReadThread contexts");
    }

	// We try to distribute sockets uniformly respecting sockets_per_thread as maximum limit
    guint n_threads = priv->cfg->n_streams / priv->cfg->sockets_per_thread;
    if (priv->cfg->n_streams % priv->cfg->sockets_per_thread) n_threads++;
    ctx->n_threads = n_threads;

    guint extra = 0, sockets_per_thread = priv->cfg->n_streams / n_threads;
    if (priv->cfg->n_streams % n_threads) extra = priv->cfg->n_streams - n_threads * sockets_per_thread;

    guint from, to;
    for (i = 0, from = 0; i < n_threads; i++, from = to) {
	guint to = from + sockets_per_thread;
	if (i < extra) to++;

	ctx->thr[i]= ufo_roof_thread_new(ctx, from, to, &gerr);
	if (!ctx->thr[i]) roof_propagate_error(error, gerr, "ufo_roof_thread_new (%i): ", i);
    }

    return ctx;
}

void ufo_roof_read_free(UfoRoofRead *ctx) {
    if (!ctx) return;

    if (ctx->thr) {
	int i;
	ufo_roof_read_stop(ctx);
	for (i = 0; i < ctx->n_threads; i++) {
	    if (ctx->thr[i]) 
		ufo_roof_read_thread_free(ctx->thr[i]);
	}
	free(ctx->thr);
    }
    free(ctx);
}

gboolean ufo_roof_read_start(UFORoofRead *ctx, GError **error) {
    gboolean ret;
    GError *gerr;

    if ((!ctx)||(!ctx->thr)) return FALSE;

    for (int i = 0; i < ctx->n_threads; i++) {
	if (!ctx->thr[i]) return FALSE;
	ret = ufo_roof_read_thread_start(ctx, &gerr);
	if (!ret) roof_propagate_error_with_retval(error, FALSE, gerr, "ufo_roof_read_thread_start (%i): ", i);
    }
    return TRUE;
}

gboolean ufo_roof_read_stop(UFORoofRead *ctx, GError **error) {
    gboolean ret, res = FALSE;
    GError *gerr;

    if ((!ctx)||(!ctx->thr)) return FALSE;

    for (int i = 0; i < ctx->n_threads; i++) {
	if (!ctx->thr[i]) return FALSE;
	ret = ufo_roof_read_thread_stop(ctx, &gerr);
	if (!ret) g_propagate_perfixed_error(error, gerr, "ufo_roof_read_thread_stop (%i): ", i);
	res |= !ret;
    }
    return !res;
}