void roof_init() { hw_sched_init(); } Roof *roof_new(UfoRoofConfig *cfg, GError **error) { guint i; GError *gerr = NULL; Roof *ctx = (Roof*)calloc(1, sizeof(Roof)); if (!ctx) roof_new_error(error, "Can't allocate Roof context"); ctx->cfg = cfg; ctx->n_threads = cfg->n_streams / cfg->sockets_per_thread; if (cfg->n_streams % cfg->sockets_per_thread) ctx->n_threads++; ctx->rdi = (RoofReadInterface**)calloc(cfg->n_streams, sizeof(RoofReadInterface*)); ctx->rdc = (RoofReadContext**)calloc(cfg->n_streams, sizeof(RoofReadContext*)); ctx->rdt = (RoofThreadContext**)calloc(ctx->n_threads, sizeof(RoofThreadContext*)); ctx->sched = hw_sched_create(cfg->n_threads); if ((!ctx->rdi)||(!ctx->rdc)||(!ctx->rdt)||(!ctx->sched)) { roof_free(ctx); roof_setup_error(error, "Failed to allocate memory for various Roof contexts"); } return ctx; } void roof_configure_simulation(Roof *ctx, const gchar *path, guint first_file_number, GError **error) { assert(ctx); ctx->simulate = 1; ctx->path = path; ctx->first_file_number = first_file_number; } void roof_configure_stop_mode(Roof *ctx, const gulong max, GError **error) { assert(ctx); ctx->max_datasets = max; } void roof_setup(Roof *ctx, GError **error) { guint i; GError *gerr = NULL; assert(ctx); RoofConfig *cfg = ctx->cfg; /* ctx->buf = roof_buffer_new(cfg, 2, ctx->max_datasets, &gerr); if ((gerr)||(!ctx->buf)) roof_propagate_error(error, gerr, "roof_buffer_new: "); */ for (i = 0; i < cfg->n_streams; i++) { if (ctx->simulate) { if (!ctx->path) roof_setup_error(error, "Path to simulated data should be specified"); ctx->rdi[i] = roof_read_file_new(cfg, ctx->path, ctx->first_file_number + i, &gerr); } else { ctx->rdi[i] = roof_read_socket_new(cfg, i, &gerr); } if (!ctx->rdi[i]) roof_propagate_error(error, gerr, "roof_read_interface_new: "); ctx->rdc[i] = roof_read_context_new(cfg, ctx->rdi[i], &gerr); if (!ctx->rdc[i]) roof_propagate_error(error, gerr, "roof_read_context_new: "); } // We try to distribute sockets uniformly respecting sockets_per_thread as maximum limit guint extra = 0, sockets_per_thread = cfg->n_streams / ctx->n_threads; if (cfg->n_streams % ctx->n_threads) extra = cfg->n_streams - ctx->n_threads * sockets_per_thread; guint from, to; for (i = 0; i < ctx->n_threads; i++) { guint to = from + sockets_per_thread; if (i < extra) to++; ctx->thr[i]= roof_thread_new(cfg, ctx, from, to, &gerr); if (!ctx->thr[i]) roof_propagate_error(error, gerr, "roof_thread_new (%i): ", i); } } void roof_free(Roof *ctx) { guint i; if (ctx) { RoofConfig *cfg = ctx->cfg; if (ctx->sched) hw_sched_destroy(priv->sched); if (ctx->rdt) { for (i = 0; i < ctx->n_threads; i++) if (ctx->rdt[i]) roof_thread_context_free(ctx->rdt[i]); free(ctx->rdt); } if (ctx->rdc) { for (i = 0; i < cfg->n_streams; i++) if (ctx->rdc[i]) roof_read_context_free(ctx->rdc[i]); free(ctx->rdc); } if (ctx->rdi) { for (i = 0; i < cfg->n_streams; i++) if (ctx->rdi[i]) roof_read_interface_free(ctx->rdi[i]); free(ctx->rdi); } if (ctx->buf) roof_buffer_free(ctx->buf); free(ctx); } } void roof_read_dataset(Roof *ctx, void *buffer, GError **error) { priv->current_dataset; priv->current_buffer = buffer; err = hw_sched_schedule_thread_task(sched, (void*)ctx, roof_thread_read); if (!err) err = hw_sched_wait_task(sched); if (err) { fprintf(stderr, "Error %i scheduling init threads", err); exit(-1); } } }