Add support for patchbay and muxes

This commit is contained in:
Rodrigo Arias 2023-01-12 19:16:52 +01:00 committed by Rodrigo Arias Mallo
parent 2d44c4763f
commit 1cea193ea3
14 changed files with 1046 additions and 422 deletions

View File

@ -1,47 +1,53 @@
# Copyright (c) 2021-2022 Barcelona Supercomputing Center (BSC)
# SPDX-License-Identifier: GPL-3.0-or-later
add_library(trace STATIC trace.c)
target_link_libraries(trace parson ovni-static)
add_library(emu STATIC
chan.c
emu.c
nosv.c
openmp.c
ovni.c
tampi.c
nodes.c
kernel.c
nanos6.c
task.c
pcf.c
prv.c
)
include_directories(
"${CMAKE_SOURCE_DIR}/src/include"
"${CMAKE_SOURCE_DIR}/src"
"${CMAKE_SOURCE_DIR}/include"
)
add_executable(ovniemu ovniemu.c)
target_link_libraries(ovniemu emu trace)
add_library(trace STATIC trace.c)
target_link_libraries(trace parson ovni-static)
add_executable(ovnidump ovnidump.c)
target_link_libraries(ovnidump emu trace)
add_library(chan STATIC
chan.c
bay.c
mux.c
)
add_executable(ovnisort ovnisort.c)
target_link_libraries(ovnisort emu trace)
# Use <PackageName>_ROOT variables if available, commonly used by MPI
# installations
if(POLICY CMP0074)
cmake_policy(SET CMP0074 NEW)
endif()
find_package(MPI REQUIRED)
add_executable(ovnisync ovnisync.c)
target_link_libraries(ovnisync m MPI::MPI_C)
install(TARGETS ovniemu ovnidump ovnisync ovnisort RUNTIME DESTINATION bin)
#add_library(emu STATIC
# chan.c
# emu.c
# nosv.c
# openmp.c
# ovni.c
# tampi.c
# nodes.c
# kernel.c
# nanos6.c
# task.c
# pcf.c
# prv.c
#)
#
#add_executable(ovniemu ovniemu.c)
#target_link_libraries(ovniemu emu trace)
#
#add_executable(ovnidump ovnidump.c)
#target_link_libraries(ovnidump emu trace)
#
#add_executable(ovnisort ovnisort.c)
#target_link_libraries(ovnisort emu trace)
#
## Use <PackageName>_ROOT variables if available, commonly used by MPI
## installations
#if(POLICY CMP0074)
# cmake_policy(SET CMP0074 NEW)
#endif()
#
#find_package(MPI REQUIRED)
#add_executable(ovnisync ovnisync.c)
#target_link_libraries(ovnisync m MPI::MPI_C)
#
#install(TARGETS ovniemu ovnidump ovnisync ovnisort RUNTIME DESTINATION bin)

151
src/emu/bay.c Normal file
View File

@ -0,0 +1,151 @@
#define ENABLE_DEBUG
#include "bay.h"
#include "common.h"
#include "uthash.h"
#include "utlist.h"
/* Called from the channel when it becomes dirty */
static int
cb_chan_is_dirty(struct chan *chan, void *arg)
{
UNUSED(chan);
struct bay_chan *bchan = arg;
struct bay *bay = bchan->bay;
/* TODO: check duplicate? */
DL_APPEND(bay->dirty, bchan);
return 0;
}
static struct bay_chan *
find_bay_chan(struct bay *bay, const char *name)
{
struct bay_chan *bchan = NULL;
HASH_FIND_STR(bay->channels, name, bchan);
return bchan;
}
struct chan *
bay_find(struct bay *bay, const char *name)
{
struct bay_chan *bchan = find_bay_chan(bay, name);
if (bchan != NULL)
return bchan->chan;
else
return NULL;
}
int
bay_register(struct bay *bay, struct chan *chan)
{
struct bay_chan *bchan = find_bay_chan(bay, chan->name);
if (bchan != NULL) {
err("bay_register: channel %s already registered\n",
chan->name);
return -1;
}
bchan = calloc(1, sizeof(struct bay_chan));
if (bchan == NULL) {
err("bay_register: calloc failed\n");
return -1;
}
bchan->chan = chan;
bchan->bay = bay;
chan_set_dirty_cb(chan, cb_chan_is_dirty, bchan);
/* Add to hash table */
HASH_ADD_STR(bay->channels, chan->name, bchan);
return 0;
}
int
bay_add_cb(struct bay *bay, struct chan *chan, bay_cb_func_t func, void *arg)
{
if (func == NULL) {
err("bay_add_cb: func is NULL\n");
return -1;
}
struct bay_chan *bchan = find_bay_chan(bay, chan->name);
if (bchan == NULL) {
err("bay_add_cb: cannot find channel %s in bay\n", chan->name);
return -1;
}
struct bay_cb *cb = calloc(1, sizeof(struct bay_cb));
if (cb == NULL) {
err("bay_add_cb: calloc failed\n");
return -1;
}
cb->func = func;
cb->arg = arg;
DL_APPEND(bchan->cb, cb);
bchan->ncallbacks++;
return 0;
}
void
bay_init(struct bay *bay)
{
memset(bay, 0, sizeof(struct bay));
bay->state = BAY_READY;
}
static int
propagate_chan(struct bay_chan *bchan)
{
dbg("propagating dirty channel %p\n", (void *) bchan);
struct bay_cb *cur = NULL;
struct bay_cb *tmp = NULL;
DL_FOREACH_SAFE(bchan->cb, cur, tmp) {
if (cur->func(bchan->chan, cur->arg) != 0) {
err("propagate_chan: callback failed\n");
return -1;
}
}
return 0;
}
int
bay_propagate(struct bay *bay)
{
dbg("propagating channels\n");
struct bay_chan *cur = NULL;
struct bay_chan *tmp = NULL;
DL_FOREACH_SAFE(bay->dirty, cur, tmp) {
/* May add more dirty channels */
if (propagate_chan(cur) != 0) {
err("bay_propagate: propagate_chan failed\n");
return -1;
}
}
bay->dirty = NULL;
return 0;
}
//int
//bay_emit(struct bay *bay)
//{
// for (chan in dirty channels) {
// /* May add more dirty channels */
// emit_chan(chan);
// }
//
// return 0;
//}

61
src/emu/bay.h Normal file
View File

@ -0,0 +1,61 @@
#ifndef BAY_H
#define BAY_H
#include "chan.h"
#include "uthash.h"
/* Handle connections between channels and callbacks */
struct bay;
struct bay_cb;
struct bay_chan;
typedef int (*bay_cb_func_t)(struct chan *chan, void *ptr);
struct bay_cb {
bay_cb_func_t func;
void *arg;
/* List of callbacks in one channel */
struct bay_cb *next;
struct bay_cb *prev;
};
#define MAX_BAY_NAME 1024
struct bay_chan {
struct chan *chan;
int ncallbacks;
struct bay_cb *cb;
struct bay *bay;
/* Global hash table with all channels in bay */
UT_hash_handle hh;
/* Used by dirty list */
struct bay_chan *next;
struct bay_chan *prev;
};
enum bay_state {
BAY_CONFIG = 1,
BAY_READY,
BAY_PROPAGATING,
BAY_PROPAGATED,
BAY_EMITTING,
BAY_EMITTED
};
struct bay {
enum bay_state state;
struct bay_chan *channels;
struct bay_chan *dirty;
};
int bay_register(struct bay *bay, struct chan *chan);
struct chan *bay_find(struct bay *bay, const char *name);
int bay_add_cb(struct bay *bay, struct chan *chan, bay_cb_func_t func, void *arg);
void bay_init(struct bay *bay);
int bay_propagate(struct bay *bay);
#endif /* BAY_H */

View File

@ -1,402 +1,216 @@
/* Copyright (c) 2021 Barcelona Supercomputing Center (BSC)
/* Copyright (c) 2021-2022 Barcelona Supercomputing Center (BSC)
* SPDX-License-Identifier: GPL-3.0-or-later */
#define ENABLE_DEBUG
#include "chan.h"
#include "common.h"
#include <string.h>
#include "emu.h"
#include "prv.h"
#include "utlist.h"
static void
chan_init(struct ovni_chan *chan, enum chan_track track, int row, int type, FILE *prv, int64_t *clock)
void
chan_init(struct chan *chan, enum chan_type type, const char *name)
{
chan->n = 0;
int len = strlen(name);
if (len >= MAX_CHAN_NAME)
die("chan_init: name '%s' too long\n", name);
memset(chan, 0, sizeof(*chan));
memcpy(chan->name, name, len + 1);
chan->type = type;
chan->enabled = 0;
chan->badst = ST_NULL;
chan->ev = -1;
chan->prv = prv;
chan->clock = clock;
chan->t = *clock;
chan->row = row;
chan->dirty = 0;
chan->track = track;
chan->lastst = -1;
}
static void
mark_dirty(struct ovni_chan *chan, enum chan_dirty dirty)
{
if (chan->dirty != CHAN_CLEAN)
die("mark_dirty: chan %d already dirty\n", chan->id);
if (dirty == CHAN_CLEAN)
die("mark_dirty: cannot use CHAN_CLEAN\n");
dbg("adding dirty chan %d to list\n", chan->id);
chan->dirty = dirty;
DL_APPEND(*chan->update_list, chan);
}
void
chan_th_init(struct ovni_ethread *th,
struct ovni_chan **update_list,
enum chan id,
enum chan_track track,
int init_st,
int enabled,
int dirty,
int row,
FILE *prv,
int64_t *clock)
chan_set_dirty_cb(struct chan *chan, chan_cb_t func, void *arg)
{
struct ovni_chan *chan = &th->chan[id];
int prvth = chan_to_prvtype[id];
chan_init(chan, track, row, prvth, prv, clock);
chan->id = id;
chan->thread = th;
chan->update_list = update_list;
chan->enabled = enabled;
chan->stack[chan->n++] = init_st;
if (dirty)
mark_dirty(chan, CHAN_DIRTY_ACTIVE);
chan->dirty_cb = func;
chan->dirty_arg = arg;
}
void
chan_cpu_init(struct ovni_cpu *cpu,
struct ovni_chan **update_list,
enum chan id,
enum chan_track track,
int init_st,
int enabled,
int dirty,
int row,
FILE *prv,
int64_t *clock)
enum chan_type
chan_get_type(struct chan *chan)
{
struct ovni_chan *chan = &cpu->chan[id];
int prvcpu = chan_to_prvtype[id];
chan_init(chan, track, row, prvcpu, prv, clock);
chan->id = id;
chan->cpu = cpu;
chan->update_list = update_list;
chan->enabled = enabled;
chan->stack[chan->n++] = init_st;
if (dirty)
mark_dirty(chan, CHAN_DIRTY_ACTIVE);
return chan->type;
}
static void
chan_dump_update_list(struct ovni_chan *chan)
static int
set_dirty(struct chan *chan)
{
dbg("update list for chan %d at %p:\n", chan->id, (void *) chan);
for (struct ovni_chan *c = *chan->update_list; c; c = c->next) {
dbg(" chan %d at %p\n", c->id, (void *) c);
}
}
void
chan_enable(struct ovni_chan *chan, int enabled)
{
/* Can be dirty */
dbg("chan_enable chan=%d enabled=%d\n", chan->id, enabled);
if (chan->enabled == enabled) {
err("chan_enable: chan already in enabled=%d\n", enabled);
abort();
if (chan->is_dirty) {
err("channel %s already dirty\n", chan->name);
return -1;
}
chan->enabled = enabled;
chan->t = *chan->clock;
chan->is_dirty = 1;
/* Only append if not dirty */
if (!chan->dirty) {
mark_dirty(chan, CHAN_DIRTY_ACTIVE);
} else {
dbg("already dirty chan %d: skip update list\n",
chan->id);
chan_dump_update_list(chan);
if (chan->dirty_cb != NULL) {
if (chan->dirty_cb(chan, chan->dirty_arg) != 0) {
err("dirty callback failed\n");
return -1;
}
}
return 0;
}
void
chan_disable(struct ovni_chan *chan)
static int
check_duplicates(struct chan *chan, struct value *v)
{
chan_enable(chan, 0);
/* If duplicates are allowed just skip the check */
//if (chan->oflags & CHAN_DUP)
// return 0;
if (value_is_equal(&chan->last_value, v)) {
err("check_duplicates: same value as last_value\n");
return -1;
}
return 0;
}
int
chan_is_enabled(const struct ovni_chan *chan)
chan_set(struct chan *chan, struct value value)
{
return chan->enabled;
if (chan->type != CHAN_SINGLE) {
err("chan_set: cannot set on non-single channel\n");
return -1;
}
if (chan->is_dirty) {
err("chan_set: cannot modify dirty channel\n");
return -1;
}
if (check_duplicates(chan, &value) != 0) {
err("chan_set: cannot set a duplicated value\n");
return -1;
}
char buf[128];
dbg("chan_set: channel %p sets value %s\n", (void *) chan,
value_str(value, buf));
chan->data.value = value;
if (set_dirty(chan) != 0) {
err("chan_set: set_dirty failed\n");
return -1;
}
return 0;
}
void
chan_set(struct ovni_chan *chan, int st)
/** Adds one value to the stack. Fails if the stack is full.
*
* @param ivalue The new integer value to be added on the stack.
*
* @return On success returns 0, otherwise returns -1.
*/
int
chan_push(struct chan *chan, struct value value)
{
/* Can be dirty */
dbg("chan_set chan %d st=%d\n", chan->id, st);
if (!chan->enabled)
die("chan_set: chan %d not enabled\n", chan->id);
/* Only chan_set can set the 0 state */
if (st < 0)
die("chan_set: cannot set a negative state %d\n", st);
/* Don't enforce this check if we are dirty because the channel was
* just enabled; it may collide with a new state 0 set via chan_set()
* used by the tracking channels */
if (chan->dirty != CHAN_DIRTY_ACTIVE
&& chan->lastst >= 0
&& chan->lastst == st) {
err("chan_set id=%d cannot emit the state %d twice\n",
chan->id, st);
abort();
if (chan->type != CHAN_STACK) {
err("chan_push: cannot push on non-stack channel\n");
return -1;
}
if (chan->n == 0)
chan->n = 1;
chan->stack[chan->n - 1] = st;
chan->t = *chan->clock;
/* Only append if not dirty */
if (!chan->dirty) {
mark_dirty(chan, CHAN_DIRTY_VALUE);
} else {
dbg("already dirty chan %d: skip update list\n",
chan->id);
chan_dump_update_list(chan);
}
}
void
chan_push(struct ovni_chan *chan, int st)
{
dbg("chan_push chan %d st=%d\n", chan->id, st);
if (!chan->enabled)
die("chan_push: chan %d not enabled\n", chan->id);
if (st <= 0) {
err("chan_push: cannot push a non-positive state %d\n", st);
abort();
if (chan->is_dirty) {
err("chan_push: cannot modify dirty channel\n");
return -1;
}
/* Cannot be dirty */
if (chan->dirty != CHAN_CLEAN)
die("chan_push: chan %d not clean", chan->id);
if (chan->lastst >= 0 && chan->lastst == st) {
err("chan_push id=%d cannot emit the state %d twice\n",
chan->id, st);
abort();
if (check_duplicates(chan, &value) != 0) {
err("chan_push: cannot push a duplicated value\n");
return -1;
}
if (chan->n >= MAX_CHAN_STACK) {
struct chan_stack *stack = &chan->data.stack;
if (stack->n >= MAX_CHAN_STACK) {
err("chan_push: channel stack full\n");
abort();
}
chan->stack[chan->n++] = st;
chan->t = *chan->clock;
stack->values[stack->n++] = value;
mark_dirty(chan, CHAN_DIRTY_VALUE);
if (set_dirty(chan) != 0) {
err("chan_set: set_dirty failed\n");
return -1;
}
return 0;
}
/** Remove one value from the stack. Fails if the top of the stack
* doesn't match the expected value.
*
* @param expected The expected value on the top of the stack.
*
* @return On success returns 0, otherwise returns -1.
*/
int
chan_pop(struct ovni_chan *chan, int expected_st)
chan_pop(struct chan *chan, struct value evalue)
{
dbg("chan_pop chan %d expected_st=%d\n", chan->id, expected_st);
if (!chan->enabled)
die("chan_pop: chan %d not enabled\n", chan->id);
/* Cannot be dirty */
if (chan->dirty != CHAN_CLEAN)
die("chan_pop: chan %d not clean", chan->id);
if (chan->n <= 0) {
err("chan_pop: channel empty\n");
abort();
if (chan->type != CHAN_STACK) {
err("chan_pop: cannot pop on non-stack channel\n");
return -1;
}
int st = chan->stack[chan->n - 1];
if (expected_st >= 0 && st != expected_st) {
err("chan_pop: unexpected channel state %d (expected %d)\n",
st, expected_st);
abort();
if (chan->is_dirty) {
err("chan_pop: cannot modify dirty channel\n");
return -1;
}
chan->n--;
struct chan_stack *stack = &chan->data.stack;
/* Take the current stack value */
st = chan_get_st(chan);
/* A st == 0 can be obtained when returning to the initial state */
if (st < 0) {
err("chan_pop: obtained negative state %d from the stack\n", st);
abort();
if (stack->n <= 0) {
err("chan_pop: channel stack empty\n");
return -1;
}
if (chan->lastst >= 0 && chan->lastst == st) {
err("chan_pop id=%d cannot emit the state %d twice\n",
chan->id, st);
abort();
struct value *value = &stack->values[stack->n - 1];
if (!value_is_equal(value, &evalue)) {
err("chan_pop: unexpected value %ld (expected %ld)\n",
value->i, evalue.i);
return -1;
}
chan->t = *chan->clock;
stack->n--;
mark_dirty(chan, CHAN_DIRTY_VALUE);
if (set_dirty(chan) != 0) {
err("chan_set: set_dirty failed\n");
return -1;
}
return st;
return 0;
}
void
chan_ev(struct ovni_chan *chan, int ev)
static int
get_value(struct chan *chan, struct value *value)
{
dbg("chan_ev chan %d ev=%d\n", chan->id, ev);
if (chan->type == CHAN_SINGLE) {
*value = chan->data.value;
return 0;
}
if (!chan->enabled)
die("chan_ev: chan %d not enabled\n", chan->id);
struct chan_stack *stack = &chan->data.stack;
if (stack->n <= 0) {
err("get_value: channel stack empty\n");
return -1;
}
/* Cannot be dirty */
if (chan->dirty != CHAN_CLEAN)
die("chan_ev: chan %d is dirty\n", chan->id);
*value = stack->values[stack->n - 1];
if (ev <= 0)
die("chan_ev: cannot emit non-positive state %d\n", ev);
if (chan->lastst >= 0 && chan->lastst == ev)
die("chan_ev id=%d cannot emit the state %d twice\n",
chan->id, ev);
chan->ev = ev;
chan->t = *chan->clock;
mark_dirty(chan, CHAN_DIRTY_VALUE);
return 0;
}
/** Reads the current value of a channel */
int
chan_get_st(const struct ovni_chan *chan)
chan_read(struct chan *chan, struct value *value)
{
if (chan->enabled == 0)
return chan->badst;
if (chan->n == 0)
return 0;
if (chan->n < 0)
die("chan_get_st: chan %d has negative n\n", chan->id);
return chan->stack[chan->n - 1];
}
void
chan_copy(struct ovni_chan *dst, const struct ovni_chan *src)
{
if (!chan_is_enabled(src)) {
chan_disable(dst);
return;
if (get_value(chan, value) != 0) {
err("chan_read: get_value failed\n");
return -1;
}
if (!chan_is_enabled(dst))
chan_enable(dst, 1);
if (src->ev != -1) {
chan_ev(dst, src->ev);
} else {
int src_st = chan_get_st(src);
int dst_st = chan_get_st(dst);
if (src_st != dst_st)
chan_set(dst, src_st);
}
}
static void
emit(struct ovni_chan *chan, int64_t t, int state)
{
if (chan->dirty == CHAN_CLEAN)
die("emit: chan %d is not dirty\n", chan->id);
/* A channel can only emit the same state as lastst if is dirty because
* it has been enabled or disabled. Otherwise is a bug (ie. you have two
* consecutive ovni events?) */
if (chan->lastst != -1
&& chan->dirty == CHAN_DIRTY_VALUE
&& chan->lastst == state) {
/* TODO: Print the raw clock of the offending event */
die("emit: chan %d cannot emit the same state %d twice\n",
chan->id, state);
}
if (chan->lastst != state) {
prv_ev(chan->prv, chan->row, t, chan->type, state);
chan->lastst = state;
}
}
static void
emit_ev(struct ovni_chan *chan)
{
if (!chan->enabled)
die("emit_ev: chan %d is not enabled\n", chan->id);
if (chan->ev == -1)
die("emit_ev: chan %d cannot emit -1 ev\n", chan->id);
int new = chan->ev;
int last = chan_get_st(chan);
emit(chan, chan->t - 1, new);
emit(chan, chan->t, last);
chan->ev = -1;
}
static void
emit_st(struct ovni_chan *chan)
{
int st = chan_get_st(chan);
emit(chan, chan->t, st);
}
/* Emits either the current state or punctual event in the PRV file */
void
chan_emit(struct ovni_chan *chan)
{
if (likely(chan->dirty == 0))
return;
dbg("chan_emit chan %d\n", chan->id);
/* Emit badst if disabled */
if (chan->enabled == 0) {
/* No punctual events allowed when disabled */
if (chan->ev != -1)
die("chan_emit: no punctual event allowed when disabled\n");
emit_st(chan);
goto shower;
}
/* Otherwise, emit punctual event if any or the state */
if (chan->ev != -1)
emit_ev(chan);
else
emit_st(chan);
shower:
chan->dirty = 0;
return 0;
}

View File

@ -1,63 +1,58 @@
/* Copyright (c) 2021 Barcelona Supercomputing Center (BSC)
/* Copyright (c) 2021-2022 Barcelona Supercomputing Center (BSC)
* SPDX-License-Identifier: GPL-3.0-or-later */
#ifndef OVNI_CHAN_H
#define OVNI_CHAN_H
#ifndef CHAN_H
#define CHAN_H
#include "emu.h"
#include <stdint.h>
#include "value.h"
void
chan_th_init(struct ovni_ethread *th,
struct ovni_chan **update_list,
enum chan id,
enum chan_track track,
int init_st,
int enabled,
int dirty,
int row,
FILE *prv,
int64_t *clock);
#define MAX_CHAN_STACK 512
#define MAX_CHAN_NAME 512
void
chan_cpu_init(struct ovni_cpu *cpu,
struct ovni_chan **update_list,
enum chan id,
enum chan_track track,
int row,
int init_st,
int enabled,
int dirty,
FILE *prv,
int64_t *clock);
enum chan_type {
CHAN_SINGLE = 0,
CHAN_STACK = 1,
CHAN_MAXTYPE,
};
void
chan_enable(struct ovni_chan *chan, int enabled);
struct chan_stack {
int n;
struct value values[MAX_CHAN_STACK];
};
void
chan_disable(struct ovni_chan *chan);
union chan_data {
struct chan_stack stack;
struct value value;
};
int
chan_is_enabled(const struct ovni_chan *chan);
struct chan;
void
chan_set(struct ovni_chan *chan, int st);
typedef int (*chan_cb_t)(struct chan *chan, void *ptr);
void
chan_push(struct ovni_chan *chan, int st);
struct chan {
char name[MAX_CHAN_NAME];
enum chan_type type;
union chan_data data;
int is_dirty;
struct value err_value;
struct value last_value;
chan_cb_t dirty_cb;
void *dirty_arg;
};
int
chan_pop(struct ovni_chan *chan, int expected_st);
//int chan_enable(struct chan *chan);
//int chan_disable(struct chan *chan);
//int chan_is_enabled(const struct chan *chan);
void
chan_ev(struct ovni_chan *chan, int ev);
void chan_init(struct chan *chan, enum chan_type type, const char *name);
int chan_set(struct chan *chan, struct value value);
int chan_push(struct chan *chan, struct value value);
int chan_pop(struct chan *chan, struct value expected);
int chan_read(struct chan *chan, struct value *value);
enum chan_type chan_get_type(struct chan *chan);
int
chan_get_st(const struct ovni_chan *chan);
/* Called when it becomes dirty */
void chan_set_dirty_cb(struct chan *chan, chan_cb_t func, void *arg);
void
chan_emit(struct ovni_chan *chan);
void
chan_copy(struct ovni_chan *dst, const struct ovni_chan *src);
#endif /* OVNI_CHAN_H */
#endif /* CHAN_H */

196
src/emu/mux.c Normal file
View File

@ -0,0 +1,196 @@
#define ENABLE_DEBUG
#include "mux.h"
static int
default_select(struct mux *mux,
struct value key,
struct mux_input **input)
{
if (value_is_null(key)) {
*input = NULL;
return 0;
}
struct mux_input *tmp = mux_find_input(mux, key);
if (tmp == NULL) {
char buf[128];
err("default_select: cannot find input with key %s\n",
value_str(key, buf));
return -1;
}
*input = tmp;
return 0;
}
static int
select_input(struct mux *mux,
struct value key,
struct mux_input **input)
{
if (mux->select_func == NULL)
die("select_input: select_func is NULL\n");
if (mux->select_func(mux, key, input) != 0) {
err("select_input: select_func failed\n");
return -1;
}
return 0;
}
/** Called when the select channel changes its value */
static int
cb_select(struct chan *sel_chan, void *ptr)
{
struct mux *mux = ptr;
struct value sel_value;
if (chan_read(sel_chan, &sel_value) != 0) {
err("cb_select: chan_read(select) failed\n");
return -1;
}
char buf[128];
dbg("select channel got value %s\n",
value_str(sel_value, buf));
struct mux_input *input = NULL;
if (select_input(mux, sel_value, &input) != 0) {
err("cb_select: select_input failed\n");
return -1;
}
dbg("selecting mux input %p\n", (void *) input);
/* Set to null by default */
struct value out_value = value_null();
if (input != NULL && chan_read(input->chan, &out_value) != 0) {
err("cb_select: chan_read() failed\n");
return -1;
}
if (chan_set(mux->output, out_value) != 0) {
err("cb_select: chan_set() failed\n");
return -1;
}
return 0;
}
/** Called when the input channel changes its value */
static int
cb_input(struct chan *in_chan, void *ptr)
{
struct mux *mux = ptr;
/* TODO: We may need to cache the last read value from select to avoid
* problems reading the select channel too soon */
struct value sel_value;
if (chan_read(mux->select, &sel_value) != 0) {
err("cb_input: chan_read(select) failed\n");
return -1;
}
struct mux_input *input = NULL;
if (select_input(mux, sel_value, &input) != 0) {
err("cb_input: select_input failed\n");
return -1;
}
/* Nothing to do, the input is not selected */
if (input == NULL || input->chan != in_chan)
return 0;
dbg("selected mux input %s changed\n", in_chan->name);
/* Set to null by default */
struct value out_value = value_null();
if (chan_read(in_chan, &out_value) != 0) {
err("cb_input: chan_read() failed\n");
return -1;
}
if (chan_set(mux->output, out_value) != 0) {
err("cb_input: chan_set() failed\n");
return -1;
}
return 0;
}
int
mux_init(struct mux *mux,
struct bay *bay,
struct chan *select,
struct chan *output,
mux_select_func_t select_func)
{
if (chan_get_type(output) != CHAN_SINGLE) {
err("mux_init: output channel must be of type single\n");
return -1;
}
memset(mux, 0, sizeof(struct mux_input));
mux->select = select;
mux->output = output;
if (select_func == NULL)
select_func = default_select;
mux->select_func = select_func;
mux->bay = bay;
if (bay_add_cb(bay, select, cb_select, mux) != 0) {
err("mux_init: bay_add_cb failed\n");
return -1;
}
return 0;
}
struct mux_input *
mux_find_input(struct mux *mux, struct value value)
{
struct mux_input *input = NULL;
/* Only int64 due to garbage */
if (value.type != VALUE_INT64)
die("bad value type\n");
HASH_FIND(hh, mux->input, &value.i, sizeof(value.i), input);
return input;
}
int
mux_add_input(struct mux *mux, struct value key, struct chan *chan)
{
if (mux_find_input(mux, key) != NULL) {
char buf[128];
err("mux_add_input: input key %s already in mux\n",
value_str(key, buf));
return -1;
}
struct mux_input *input = calloc(1, sizeof(struct mux_input));
if (input == NULL) {
err("mux_add_input: calloc failed\n");
return -1;
}
input->key = key;
input->chan = chan;
HASH_ADD_KEYPTR(hh, mux->input, &input->key.i, sizeof(input->key.i), input);
if (bay_add_cb(mux->bay, chan, cb_input, mux) != 0) {
err("mux_add_input: bay_add_cb failed\n");
return -1;
}
return 0;
}

58
src/emu/mux.h Normal file
View File

@ -0,0 +1,58 @@
#ifndef MUX_H
#define MUX_H
#include <stdint.h>
#include "bay.h"
#include "uthash.h"
struct mux_input {
struct value key;
struct chan *chan;
UT_hash_handle hh;
};
struct mux;
typedef int (* mux_select_func_t)(struct mux *mux,
struct value value,
struct mux_input **input);
/* MUX logic:
*
* select input output
* ---------------------------
* null - null
* input x x
*
*/
struct mux {
struct bay *bay;
int64_t ninputs;
struct mux_input *input;
mux_select_func_t select_func;
struct chan *select;
struct chan *output;
};
void mux_input_init(struct mux_input *mux,
struct value key,
struct chan *chan);
int mux_init(struct mux *mux,
struct bay *bay,
struct chan *select,
struct chan *output,
mux_select_func_t select_func);
struct mux_input *mux_find_input(struct mux *mux,
struct value key);
int mux_add_input(struct mux *mux,
struct value key,
struct chan *input);
int mux_register(struct mux *mux,
struct bay *bay);
#endif /* MUX_H */

77
src/emu/value.h Normal file
View File

@ -0,0 +1,77 @@
#ifndef VALUE_H
#define VALUE_H
#include <stdint.h>
#include <string.h>
#include <stdio.h>
#include "common.h"
enum value_type {
VALUE_NULL = 0,
VALUE_INT64,
VALUE_DOUBLE
};
struct value {
enum value_type type;
union {
int64_t i;
double d;
};
};
static inline int
value_is_equal(struct value *a, struct value *b)
{
if (a->type != b->type)
return 0;
if (a->type == VALUE_INT64 && a->i == b->i)
return 1;
else if (a->type == VALUE_DOUBLE && a->d == b->d)
return 1;
else
return 0;
}
static inline struct value
value_int64(int64_t i)
{
struct value v = { .type = VALUE_INT64, .i = i };
return v;
}
static inline struct value
value_null(void)
{
struct value v = { .type = VALUE_NULL };
return v;
}
static inline int
value_is_null(struct value a)
{
return (a.type == VALUE_NULL);
}
static inline char *
value_str(struct value a, char *buf)
{
switch (a.type) {
case VALUE_NULL:
sprintf(buf, "{NULL}");
break;
case VALUE_INT64:
sprintf(buf, "{int64_t %ld}", a.i);
break;
case VALUE_DOUBLE:
sprintf(buf, "{double %e}", a.d);
break;
default:
die("value_str: unexpected value type\n");
}
return buf;
}
#endif /* VALUE_H */

View File

@ -7,5 +7,5 @@ set(OVNI_TEST_BUILD_DIR "${CMAKE_BINARY_DIR}/test")
include(macros.cmake)
add_subdirectory(unit)
add_subdirectory(emu)
add_subdirectory(rt)
#add_subdirectory(emu)
#add_subdirectory(rt)

View File

@ -35,7 +35,7 @@ function(unit_test source)
"${CMAKE_SOURCE_DIR}/include"
)
add_executable("${OVNI_TEST_NAME}" "${OVNI_TEST_SOURCE}")
target_link_libraries("${OVNI_TEST_NAME}" PRIVATE ovni emu)
target_link_libraries("${OVNI_TEST_NAME}" PRIVATE ovni chan)
add_test(NAME "${OVNI_TEST_NAME}"
COMMAND "${OVNI_TEST_NAME}"

View File

@ -1,4 +1,13 @@
# Copyright (c) 2022 Barcelona Supercomputing Center (BSC)
# SPDX-License-Identifier: GPL-3.0-or-later
unit_test(version.c)
#unit_test(version.c)
#unit_test(task.c)
#unit_test(taskstack.c)
#unit_test(taskstack-bad.c)
#unit_test(taskmodel.c)
#unit_test(tbm.c)
#unit_test(tbm_trace.c)
unit_test(chan.c)
unit_test(bay.c)
unit_test(mux.c)

51
test/unit/bay.c Normal file
View File

@ -0,0 +1,51 @@
#include "emu/bay.h"
#include "common.h"
static int
callback(struct chan *chan, void *ptr)
{
struct value value;
if (chan_read(chan, &value) != 0)
die("callback: chan_read failed\n");
if (value.type != VALUE_INT64)
die("callback: unexpected value type\n");
int64_t *ival = ptr;
*ival = value.i;
return 0;
}
int main(void)
{
struct bay bay;
bay_init(&bay);
struct chan chan;
chan_init(&chan, CHAN_SINGLE, "testchan");
bay_register(&bay, &chan);
int64_t data = 0;
bay_add_cb(&bay, &chan, callback, &data);
if (data != 0)
die("data changed after bay_chan_append_cb\n");
struct value one = value_int64(1);
if (chan_set(&chan, one) != 0)
die("chan_set failed\n");
if (data != 0)
die("data changed after chan_set\n");
/* Now the callback should modify 'data' */
if (bay_propagate(&bay) != 0)
die("bay_propagate failed\n");
if (data != 1)
die("data didn't change after bay_propagate\n");
return 0;
}

97
test/unit/chan.c Normal file
View File

@ -0,0 +1,97 @@
#include "emu/chan.h"
#include "common.h"
static void
check_single(void)
{
struct chan chan;
struct value one = { .type = VALUE_INT64, .i = 1 };
struct value two = { .type = VALUE_INT64, .i = 2 };
//struct value nil = { .type = VALUE_NULL, .i = 0 };
chan_init(&chan, CHAN_SINGLE, "testchan");
/* Ensure we cannot push as stack */
if (chan_push(&chan, one) == 0)
die("chan_push didn't fail\n");
/* Now we should be able to write with set */
if (chan_set(&chan, one) != 0)
die("chan_set failed\n");
/* Now is dirty, it shouldn't allow another set */
if (chan_set(&chan, two) == 0)
die("chan_set didn't fail\n");
struct value value;
if (chan_read(&chan, &value) != 0)
die("chan_read failed\n");
if (!value_is_equal(&value, &one))
die("chan_read returned unexpected value\n");
}
//static void
//check_stack(void)
//{
// struct chan chan;
//
// chan_init(&chan, CHAN_STACK);
//
// /* Ensure we cannot set as single */
// if (chan_set(&chan, 1) == 0)
// die("chan_set didn't fail\n");
//
// /* Channels are closed after init */
// if (chan_push(&chan, 1) != 0)
// die("chan_push failed\n");
//
// /* Now is closed, it shouldn't allow another value */
// if (chan_push(&chan, 2) == 0)
// die("chan_push didn't fail\n");
//
// struct chan_value value = { 0 };
//
// if (chan_flush(&chan, &value) != 0)
// die("chan_flush failed\n");
//
// if (!value.ok || value.i != 1)
// die("chan_flush returned unexpected value\n");
//
// /* Now it should allow to push another value */
// if (chan_push(&chan, 2) != 0)
// die("chan_push failed\n");
//
// if (chan_flush(&chan, &value) != 0)
// die("chan_flush failed\n");
//
// if (!value.ok || value.i != 2)
// die("chan_flush returned unexpected value\n");
//
// /* Now pop the values */
// if (chan_pop(&chan, 2) != 0)
// die("chan_pop failed\n");
//
// if (chan_flush(&chan, &value) != 0)
// die("chan_flush failed\n");
//
// if (!value.ok || value.i != 1)
// die("chan_flush returned unexpected value\n");
//
// if (chan_pop(&chan, 1) != 0)
// die("chan_pop failed\n");
//
// /* Now the stack should be empty */
//
// if (chan_pop(&chan, 1) == 0)
// die("chan_pop didn't fail\n");
//
//}
int main(void)
{
check_single();
//check_stack();
return 0;
}

109
test/unit/mux.c Normal file
View File

@ -0,0 +1,109 @@
#include "emu/mux.h"
#include "common.h"
#define N 4
//static int
//select_active_thread(struct mux *mux,
// struct value *value,
// struct mux_input **input)
//{
// if (value->type == VALUE_NULL) {
// *input = NULL;
// return 0;
// }
//
// if (value->type != VALUE_INT64) {
// err("expecting NULL or INT64 channel value\n");
// return -1;
// }
//
// enum thread_state state = (enum thread_state) value->i;
//
// if (mux->ninputs != 1) {
// err("expecting NULL or INT64 channel value\n");
// return -1;
// }
//
// switch (state) {
// case TH_ST_RUNNING:
// case TH_ST_COOLING:
// case TH_ST_WARMING:
// *input = only_input;
// break;
// case TH_ST_PAUSED:
// *input = NULL;
// break;
// }
//
// return 0;
//}
int
main(void)
{
struct bay bay;
bay_init(&bay);
struct chan inputs[N];
struct chan output;
struct chan select;
chan_init(&output, CHAN_SINGLE, "output");
chan_init(&select, CHAN_SINGLE, "select");
for (int i = 0; i < N; i++) {
char buf[MAX_CHAN_NAME];
sprintf(buf, "input.%d", i);
chan_init(&inputs[i], CHAN_SINGLE, buf);
}
/* Register all channels in the bay */
bay_register(&bay, &output);
bay_register(&bay, &select);
for (int i = 0; i < N; i++) {
bay_register(&bay, &inputs[i]);
}
struct mux mux;
mux_init(&mux, &bay, &select, &output, NULL);
for (int i = 0; i < N; i++)
mux_add_input(&mux, value_int64(i), &inputs[i]);
/* Write something to the input channels */
for (int i = 0; i < N; i++) {
if (chan_set(&inputs[i], value_int64(1000 + i)) != 0)
die("chan_set failed\n");
}
/* Propagate values and call the callbacks */
if (bay_propagate(&bay) != 0)
die("bay_propagate failed\n");
/* Change select channel */
if (chan_set(&select, value_int64(2)) != 0)
die("chan_set failed\n");
/* Propagate values and call the callbacks */
if (bay_propagate(&bay) != 0)
die("bay_propagate failed\n");
struct value out_value = value_null();
if (chan_read(&output, &out_value) != 0)
die("chan_read() failed for output channel\n");
struct value expected_value = value_int64(1002);
if (!value_is_equal(&out_value, &expected_value)) {
char buf1[128];
char buf2[128];
die("unexpected value found %s in output (expected %s)\n",
value_str(out_value, buf1),
value_str(expected_value, buf2));
}
err("OK\n");
return 0;
}