Add multiple callback types in the patchbay

This commit is contained in:
Rodrigo Arias 2023-01-16 15:21:56 +01:00 committed by Rodrigo Arias Mallo
parent ecc07012c0
commit e240937e58
6 changed files with 55 additions and 45 deletions

View File

@ -6,6 +6,11 @@
#include "uthash.h" #include "uthash.h"
#include "utlist.h" #include "utlist.h"
static char *propname[BAY_CB_MAX] = {
[BAY_CB_DIRTY] = "dirty",
[BAY_CB_EMIT] = "emit"
};
/* Called from the channel when it becomes dirty */ /* Called from the channel when it becomes dirty */
static int static int
cb_chan_is_dirty(struct chan *chan, void *arg) cb_chan_is_dirty(struct chan *chan, void *arg)
@ -14,6 +19,12 @@ cb_chan_is_dirty(struct chan *chan, void *arg)
struct bay_chan *bchan = arg; struct bay_chan *bchan = arg;
struct bay *bay = bchan->bay; struct bay *bay = bchan->bay;
if (bay->state != BAY_READY && bay->state != BAY_PROPAGATING) {
err("cannot add dirty channel %s in current bay state\n",
chan->name);
return -1;
}
if (bchan->is_dirty) { if (bchan->is_dirty) {
err("channel %s already on dirty list\n", chan->name); err("channel %s already on dirty list\n", chan->name);
return -1; return -1;
@ -94,7 +105,8 @@ bay_remove(struct bay *bay, struct chan *chan)
} }
int int
bay_add_cb(struct bay *bay, struct chan *chan, bay_cb_func_t func, void *arg) bay_add_cb(struct bay *bay, enum bay_cb_type type,
struct chan *chan, bay_cb_func_t func, void *arg)
{ {
if (func == NULL) { if (func == NULL) {
err("bay_add_cb: func is NULL\n"); err("bay_add_cb: func is NULL\n");
@ -117,8 +129,8 @@ bay_add_cb(struct bay *bay, struct chan *chan, bay_cb_func_t func, void *arg)
cb->func = func; cb->func = func;
cb->arg = arg; cb->arg = arg;
DL_APPEND(bchan->cb, cb); DL_APPEND(bchan->cb[type], cb);
bchan->ncallbacks++; bchan->ncallbacks[type]++;
return 0; return 0;
} }
@ -130,13 +142,14 @@ bay_init(struct bay *bay)
} }
static int static int
propagate_chan(struct bay_chan *bchan) propagate_chan(struct bay_chan *bchan, enum bay_cb_type type)
{ {
dbg("- propagating dirty channel %s\n", bchan->chan->name); dbg("- propagating channel '%s' phase %s\n",
bchan->chan->name, propname[type]);
struct bay_cb *cur = NULL; struct bay_cb *cur = NULL;
struct bay_cb *tmp = NULL; struct bay_cb *tmp = NULL;
DL_FOREACH_SAFE(bchan->cb, cur, tmp) { DL_FOREACH_SAFE(bchan->cb[type], cur, tmp) {
if (cur->func(bchan->chan, cur->arg) != 0) { if (cur->func(bchan->chan, cur->arg) != 0) {
err("propagate_chan: callback failed\n"); err("propagate_chan: callback failed\n");
return -1; return -1;
@ -149,20 +162,31 @@ propagate_chan(struct bay_chan *bchan)
int int
bay_propagate(struct bay *bay) bay_propagate(struct bay *bay)
{ {
dbg("-- propagating channels begins\n"); struct bay_chan *cur, *tmp;
struct bay_chan *cur = NULL; bay->state = BAY_PROPAGATING;
struct bay_chan *tmp = NULL;
DL_FOREACH_SAFE(bay->dirty, cur, tmp) { DL_FOREACH_SAFE(bay->dirty, cur, tmp) {
/* May add more dirty channels */ /* May add more dirty channels */
if (propagate_chan(cur) != 0) { if (propagate_chan(cur, BAY_CB_DIRTY) != 0) {
err("bay_propagate: propagate_chan failed\n"); err("bay_propagate: propagate_chan failed\n");
return -1; return -1;
} }
} }
/* Flush channels after running all the dirty callbacks, so we /* Once the dirty callbacks have been propagated,
* capture any potential double write when running the * begin the emit stage */
* callbacks */ bay->state = BAY_EMITTING;
DL_FOREACH_SAFE(bay->dirty, cur, tmp) {
/* May add more dirty channels */
if (propagate_chan(cur, BAY_CB_EMIT) != 0) {
err("bay_propagate: propagate_chan failed\n");
return -1;
}
}
/* Flush channels after running all the dirty and emit
* callbacks, so we capture any potential double write when
* running the callbacks */
bay->state = BAY_FLUSHING;
DL_FOREACH_SAFE(bay->dirty, cur, tmp) { DL_FOREACH_SAFE(bay->dirty, cur, tmp) {
if (chan_flush(cur->chan) != 0) { if (chan_flush(cur->chan) != 0) {
err("bay_propagate: chan_flush failed\n"); err("bay_propagate: chan_flush failed\n");
@ -172,19 +196,7 @@ bay_propagate(struct bay *bay)
} }
bay->dirty = NULL; bay->dirty = NULL;
bay->state = BAY_READY;
dbg("-- propagating channels ends\n");
return 0; return 0;
} }
//int
//bay_emit(struct bay *bay)
//{
// for (chan in dirty channels) {
// /* May add more dirty channels */
// emit_chan(chan);
// }
//
// return 0;
//}

View File

@ -10,6 +10,12 @@ struct bay;
struct bay_cb; struct bay_cb;
struct bay_chan; struct bay_chan;
enum bay_cb_type {
BAY_CB_DIRTY = 0,
BAY_CB_EMIT,
BAY_CB_MAX,
};
typedef int (*bay_cb_func_t)(struct chan *chan, void *ptr); typedef int (*bay_cb_func_t)(struct chan *chan, void *ptr);
struct bay_cb { struct bay_cb {
@ -25,8 +31,8 @@ struct bay_cb {
struct bay_chan { struct bay_chan {
struct chan *chan; struct chan *chan;
int ncallbacks; int ncallbacks[BAY_CB_MAX];
struct bay_cb *cb; struct bay_cb *cb[BAY_CB_MAX];
struct bay *bay; struct bay *bay;
int is_dirty; int is_dirty;
@ -39,12 +45,11 @@ struct bay_chan {
}; };
enum bay_state { enum bay_state {
BAY_CONFIG = 1, BAY_UKNOWN = 0,
BAY_READY, BAY_READY,
BAY_PROPAGATING, BAY_PROPAGATING,
BAY_PROPAGATED,
BAY_EMITTING, BAY_EMITTING,
BAY_EMITTED BAY_FLUSHING
}; };
struct bay { struct bay {
@ -53,11 +58,11 @@ struct bay {
struct bay_chan *dirty; struct bay_chan *dirty;
}; };
void bay_init(struct bay *bay);
int bay_register(struct bay *bay, struct chan *chan); int bay_register(struct bay *bay, struct chan *chan);
int bay_remove(struct bay *bay, struct chan *chan); int bay_remove(struct bay *bay, struct chan *chan);
struct chan *bay_find(struct bay *bay, const char *name); struct chan *bay_find(struct bay *bay, const char *name);
int bay_add_cb(struct bay *bay, struct chan *chan, bay_cb_func_t func, void *arg); int bay_add_cb(struct bay *bay, enum bay_cb_type type, struct chan *chan, bay_cb_func_t func, void *arg);
void bay_init(struct bay *bay);
int bay_propagate(struct bay *bay); int bay_propagate(struct bay *bay);
#endif /* BAY_H */ #endif /* BAY_H */

View File

@ -173,7 +173,7 @@ mux_init(struct mux *mux,
mux->select_func = select_func; mux->select_func = select_func;
mux->bay = bay; mux->bay = bay;
if (bay_add_cb(bay, select, cb_select, mux) != 0) { if (bay_add_cb(bay, BAY_CB_DIRTY, select, cb_select, mux) != 0) {
err("mux_init: bay_add_cb failed\n"); err("mux_init: bay_add_cb failed\n");
return -1; return -1;
} }
@ -222,7 +222,7 @@ mux_add_input(struct mux *mux, struct value key, struct chan *chan)
HASH_ADD_KEYPTR(hh, mux->input, &input->key, sizeof(input->key), input); HASH_ADD_KEYPTR(hh, mux->input, &input->key, sizeof(input->key), input);
if (bay_add_cb(mux->bay, chan, cb_input, mux) != 0) { if (bay_add_cb(mux->bay, BAY_CB_DIRTY, chan, cb_input, mux) != 0) {
err("mux_add_input: bay_add_cb failed\n"); err("mux_add_input: bay_add_cb failed\n");
return -1; return -1;
} }

View File

@ -17,15 +17,6 @@ typedef int (* mux_select_func_t)(struct mux *mux,
struct value value, struct value value,
struct mux_input **input); struct mux_input **input);
/* MUX logic:
*
* select input output
* ---------------------------
* null - null
* input x x
*
*/
struct mux { struct mux {
struct bay *bay; struct bay *bay;
int64_t ninputs; int64_t ninputs;

View File

@ -27,6 +27,8 @@
/* Poison assert */ /* Poison assert */
#pragma GCC poison assert #pragma GCC poison assert
#define USE_RET __attribute__((warn_unused_result))
/* clang-format on */ /* clang-format on */
#endif /* COMMON_H */ #endif /* COMMON_H */

View File

@ -62,7 +62,7 @@ test_callback(struct bay *bay)
die("bay_register failed\n"); die("bay_register failed\n");
int64_t data = 0; int64_t data = 0;
if (bay_add_cb(bay, &chan, callback, &data) != 0) if (bay_add_cb(bay, BAY_CB_DIRTY, &chan, callback, &data) != 0)
die("bay_add_cb failed\n"); die("bay_add_cb failed\n");
if (data != 0) if (data != 0)