Complete Nanos6 model
This commit is contained in:
		
							parent
							
								
									524ccc4dd5
								
							
						
					
					
						commit
						4a8255e227
					
				@ -25,7 +25,9 @@ cb_chan_is_dirty(struct chan *chan, void *arg)
 | 
				
			|||||||
		return -1;
 | 
							return -1;
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						dbg("adding dirty chan %s", chan->name)
 | 
				
			||||||
	DL_APPEND(bay->dirty, bchan);
 | 
						DL_APPEND(bay->dirty, bchan);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return 0;
 | 
						return 0;
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -152,8 +154,7 @@ propagate_chan(struct bay_chan *bchan, enum bay_cb_type type)
 | 
				
			|||||||
			bchan->chan->name, propname[type]);
 | 
								bchan->chan->name, propname[type]);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	struct bay_cb *cur = NULL;
 | 
						struct bay_cb *cur = NULL;
 | 
				
			||||||
	struct bay_cb *tmp = NULL;
 | 
						DL_FOREACH(bchan->cb[type], cur) {
 | 
				
			||||||
	DL_FOREACH_SAFE(bchan->cb[type], cur, tmp) {
 | 
					 | 
				
			||||||
		if (cur->func(bchan->chan, cur->arg) != 0) {
 | 
							if (cur->func(bchan->chan, cur->arg) != 0) {
 | 
				
			||||||
			err("propagate_chan: callback failed\n");
 | 
								err("propagate_chan: callback failed\n");
 | 
				
			||||||
			return -1;
 | 
								return -1;
 | 
				
			||||||
@ -166,9 +167,9 @@ propagate_chan(struct bay_chan *bchan, enum bay_cb_type type)
 | 
				
			|||||||
int
 | 
					int
 | 
				
			||||||
bay_propagate(struct bay *bay)
 | 
					bay_propagate(struct bay *bay)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
	struct bay_chan *cur, *tmp;
 | 
						struct bay_chan *cur;
 | 
				
			||||||
	bay->state = BAY_PROPAGATING;
 | 
						bay->state = BAY_PROPAGATING;
 | 
				
			||||||
	DL_FOREACH_SAFE(bay->dirty, cur, tmp) {
 | 
						DL_FOREACH(bay->dirty, cur) {
 | 
				
			||||||
		/* May add more dirty channels */
 | 
							/* May add more dirty channels */
 | 
				
			||||||
		if (propagate_chan(cur, BAY_CB_DIRTY) != 0) {
 | 
							if (propagate_chan(cur, BAY_CB_DIRTY) != 0) {
 | 
				
			||||||
			err("bay_propagate: propagate_chan failed\n");
 | 
								err("bay_propagate: propagate_chan failed\n");
 | 
				
			||||||
@ -176,10 +177,12 @@ bay_propagate(struct bay *bay)
 | 
				
			|||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						dbg("<> dirty phase complete");
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	/* Once the dirty callbacks have been propagated,
 | 
						/* Once the dirty callbacks have been propagated,
 | 
				
			||||||
	 * begin the emit stage */
 | 
						 * begin the emit stage */
 | 
				
			||||||
	bay->state = BAY_EMITTING;
 | 
						bay->state = BAY_EMITTING;
 | 
				
			||||||
	DL_FOREACH_SAFE(bay->dirty, cur, tmp) {
 | 
						DL_FOREACH(bay->dirty, cur) {
 | 
				
			||||||
		/* May add more dirty channels */
 | 
							/* May add more dirty channels */
 | 
				
			||||||
		if (propagate_chan(cur, BAY_CB_EMIT) != 0) {
 | 
							if (propagate_chan(cur, BAY_CB_EMIT) != 0) {
 | 
				
			||||||
			err("bay_propagate: propagate_chan failed\n");
 | 
								err("bay_propagate: propagate_chan failed\n");
 | 
				
			||||||
@ -187,11 +190,13 @@ bay_propagate(struct bay *bay)
 | 
				
			|||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						dbg("<> emit phase complete");
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	/* Flush channels after running all the dirty and emit
 | 
						/* Flush channels after running all the dirty and emit
 | 
				
			||||||
	 * callbacks, so we capture any potential double write when
 | 
						 * callbacks, so we capture any potential double write when
 | 
				
			||||||
	 * running the callbacks */
 | 
						 * running the callbacks */
 | 
				
			||||||
	bay->state = BAY_FLUSHING;
 | 
						bay->state = BAY_FLUSHING;
 | 
				
			||||||
	DL_FOREACH_SAFE(bay->dirty, cur, tmp) {
 | 
						DL_FOREACH(bay->dirty, cur) {
 | 
				
			||||||
		if (chan_flush(cur->chan) != 0) {
 | 
							if (chan_flush(cur->chan) != 0) {
 | 
				
			||||||
			err("bay_propagate: chan_flush failed\n");
 | 
								err("bay_propagate: chan_flush failed\n");
 | 
				
			||||||
			return -1;
 | 
								return -1;
 | 
				
			||||||
 | 
				
			|||||||
@ -1,7 +1,7 @@
 | 
				
			|||||||
/* Copyright (c) 2021-2022 Barcelona Supercomputing Center (BSC)
 | 
					/* Copyright (c) 2021-2022 Barcelona Supercomputing Center (BSC)
 | 
				
			||||||
 * SPDX-License-Identifier: GPL-3.0-or-later */
 | 
					 * SPDX-License-Identifier: GPL-3.0-or-later */
 | 
				
			||||||
 | 
					
 | 
				
			||||||
//#define ENABLE_DEBUG
 | 
					#define ENABLE_DEBUG
 | 
				
			||||||
 | 
					
 | 
				
			||||||
#include "chan.h"
 | 
					#include "chan.h"
 | 
				
			||||||
#include "common.h"
 | 
					#include "common.h"
 | 
				
			||||||
@ -53,6 +53,7 @@ set_dirty(struct chan *chan)
 | 
				
			|||||||
	chan->is_dirty = 1;
 | 
						chan->is_dirty = 1;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if (chan->dirty_cb != NULL) {
 | 
						if (chan->dirty_cb != NULL) {
 | 
				
			||||||
 | 
							dbg("%s: calling dirty callback", chan->name);
 | 
				
			||||||
		if (chan->dirty_cb(chan, chan->dirty_arg) != 0) {
 | 
							if (chan->dirty_cb(chan, chan->dirty_arg) != 0) {
 | 
				
			||||||
			err("%s: dirty callback failed", chan->name);
 | 
								err("%s: dirty callback failed", chan->name);
 | 
				
			||||||
			return -1;
 | 
								return -1;
 | 
				
			||||||
 | 
				
			|||||||
@ -16,6 +16,8 @@ static const char *chan_name[] = {
 | 
				
			|||||||
	[CPU_CHAN_TID] = "tid_running",
 | 
						[CPU_CHAN_TID] = "tid_running",
 | 
				
			||||||
	[CPU_CHAN_APPID] = "appid_running",
 | 
						[CPU_CHAN_APPID] = "appid_running",
 | 
				
			||||||
	[CPU_CHAN_FLUSH] = "flush_running",
 | 
						[CPU_CHAN_FLUSH] = "flush_running",
 | 
				
			||||||
 | 
						[CPU_CHAN_THRUN] = "th_running",
 | 
				
			||||||
 | 
						[CPU_CHAN_THACT] = "th_active",
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
static int chan_type[] = {
 | 
					static int chan_type[] = {
 | 
				
			||||||
@ -24,6 +26,8 @@ static int chan_type[] = {
 | 
				
			|||||||
	[CPU_CHAN_NRUN] = 3,
 | 
						[CPU_CHAN_NRUN] = 3,
 | 
				
			||||||
	[CPU_CHAN_APPID] = 5,
 | 
						[CPU_CHAN_APPID] = 5,
 | 
				
			||||||
	[CPU_CHAN_FLUSH] = 7,
 | 
						[CPU_CHAN_FLUSH] = 7,
 | 
				
			||||||
 | 
						[CPU_CHAN_THRUN] = -1,
 | 
				
			||||||
 | 
						[CPU_CHAN_THACT] = -1,
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
void
 | 
					void
 | 
				
			||||||
@ -69,11 +73,18 @@ cpu_init_end(struct cpu *cpu)
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	for (int i = 0; i < CPU_CHAN_MAX; i++) {
 | 
						for (int i = 0; i < CPU_CHAN_MAX; i++) {
 | 
				
			||||||
 | 
							if (chan_name[i] == NULL)
 | 
				
			||||||
 | 
								die("chan_name is null");
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		chan_init(&cpu->chan[i], CHAN_SINGLE,
 | 
							chan_init(&cpu->chan[i], CHAN_SINGLE,
 | 
				
			||||||
				chan_fmt, cpu->gindex, chan_name[i]);
 | 
									chan_fmt, cpu->gindex, chan_name[i]);
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	chan_prop_set(&cpu->chan[CPU_CHAN_NRUN], CHAN_DUPLICATES, 1);
 | 
						chan_prop_set(&cpu->chan[CPU_CHAN_NRUN], CHAN_DUPLICATES, 1);
 | 
				
			||||||
 | 
						chan_prop_set(&cpu->chan[CPU_CHAN_TID], CHAN_DUPLICATES, 1);
 | 
				
			||||||
 | 
						chan_prop_set(&cpu->chan[CPU_CHAN_PID], CHAN_DUPLICATES, 1);
 | 
				
			||||||
 | 
						chan_prop_set(&cpu->chan[CPU_CHAN_THRUN], CHAN_DUPLICATES, 1);
 | 
				
			||||||
 | 
						chan_prop_set(&cpu->chan[CPU_CHAN_THACT], CHAN_DUPLICATES, 1);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	cpu->is_init = 1;
 | 
						cpu->is_init = 1;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -104,8 +115,11 @@ cpu_connect(struct cpu *cpu, struct bay *bay, struct recorder *rec)
 | 
				
			|||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		long type = chan_type[i];
 | 
							long type = chan_type[i];
 | 
				
			||||||
 | 
							if (type < 0)
 | 
				
			||||||
 | 
								continue;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		long row = cpu->gindex;
 | 
							long row = cpu->gindex;
 | 
				
			||||||
		if (prv_register(prv, row, type, bay, c)) {
 | 
							if (prv_register(prv, row, type, bay, c, PRV_DUP)) {
 | 
				
			||||||
			err("prv_register failed");
 | 
								err("prv_register failed");
 | 
				
			||||||
			return -1;
 | 
								return -1;
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
@ -155,14 +169,17 @@ cpu_update(struct cpu *cpu)
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	struct value tid_running;
 | 
						struct value tid_running;
 | 
				
			||||||
	struct value pid_running;
 | 
						struct value pid_running;
 | 
				
			||||||
 | 
						struct value gid_running;
 | 
				
			||||||
	if (running == 1) {
 | 
						if (running == 1) {
 | 
				
			||||||
		cpu->th_running = th_running;
 | 
							cpu->th_running = th_running;
 | 
				
			||||||
		tid_running = value_int64(th_running->tid);
 | 
							tid_running = value_int64(th_running->tid);
 | 
				
			||||||
		pid_running = value_int64(th_running->proc->pid);
 | 
							pid_running = value_int64(th_running->proc->pid);
 | 
				
			||||||
 | 
							gid_running = value_int64(th_running->gindex);
 | 
				
			||||||
	} else {
 | 
						} else {
 | 
				
			||||||
		cpu->th_running = NULL;
 | 
							cpu->th_running = NULL;
 | 
				
			||||||
		tid_running = value_null();
 | 
							tid_running = value_null();
 | 
				
			||||||
		pid_running = value_null();
 | 
							pid_running = value_null();
 | 
				
			||||||
 | 
							gid_running = value_null();
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if (chan_set(&cpu->chan[CPU_CHAN_TID], tid_running) != 0) {
 | 
						if (chan_set(&cpu->chan[CPU_CHAN_TID], tid_running) != 0) {
 | 
				
			||||||
@ -173,17 +190,29 @@ cpu_update(struct cpu *cpu)
 | 
				
			|||||||
		err("chan_set pid failed");
 | 
							err("chan_set pid failed");
 | 
				
			||||||
		return -1;
 | 
							return -1;
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						if (chan_set(&cpu->chan[CPU_CHAN_THRUN], gid_running) != 0) {
 | 
				
			||||||
 | 
							err("chan_set gid_running failed");
 | 
				
			||||||
 | 
							return -1;
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if (active == 1)
 | 
						struct value gid_active;
 | 
				
			||||||
 | 
						if (active == 1) {
 | 
				
			||||||
		cpu->th_active = th_active;
 | 
							cpu->th_active = th_active;
 | 
				
			||||||
	else
 | 
							gid_active = value_int64(th_active->gindex);
 | 
				
			||||||
 | 
						} else {
 | 
				
			||||||
		cpu->th_active = NULL;
 | 
							cpu->th_active = NULL;
 | 
				
			||||||
 | 
							gid_active = value_null();
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	/* Update nth_running number in the channel */
 | 
						/* Update nth_running number in the channel */
 | 
				
			||||||
	if (chan_set(&cpu->chan[CPU_CHAN_NRUN], value_int64(running)) != 0) {
 | 
						if (chan_set(&cpu->chan[CPU_CHAN_NRUN], value_int64(running)) != 0) {
 | 
				
			||||||
		err("chan_set nth_running failed");
 | 
							err("chan_set nth_running failed");
 | 
				
			||||||
		return -1;
 | 
							return -1;
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						if (chan_set(&cpu->chan[CPU_CHAN_THACT], gid_active) != 0) {
 | 
				
			||||||
 | 
							err("chan_set gid_active failed");
 | 
				
			||||||
 | 
							return -1;
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return 0;
 | 
						return 0;
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
				
			|||||||
@ -20,6 +20,8 @@ enum cpu_chan {
 | 
				
			|||||||
	CPU_CHAN_TID,
 | 
						CPU_CHAN_TID,
 | 
				
			||||||
	CPU_CHAN_APPID,
 | 
						CPU_CHAN_APPID,
 | 
				
			||||||
	CPU_CHAN_FLUSH,
 | 
						CPU_CHAN_FLUSH,
 | 
				
			||||||
 | 
						CPU_CHAN_THRUN, /* gindex */
 | 
				
			||||||
 | 
						CPU_CHAN_THACT, /* gindex */
 | 
				
			||||||
	CPU_CHAN_MAX,
 | 
						CPU_CHAN_MAX,
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
				
			|||||||
@ -160,6 +160,7 @@ emu_step(struct emu *emu)
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	if (bay_propagate(&emu->bay) != 0) {
 | 
						if (bay_propagate(&emu->bay) != 0) {
 | 
				
			||||||
		err("bay_propagate failed");
 | 
							err("bay_propagate failed");
 | 
				
			||||||
 | 
							panic(emu);
 | 
				
			||||||
		return -1;
 | 
							return -1;
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
				
			|||||||
										
											
												File diff suppressed because it is too large
												Load Diff
											
										
									
								
							@ -12,65 +12,19 @@ extern struct model_spec model_nanos6;
 | 
				
			|||||||
#include "mux.h"
 | 
					#include "mux.h"
 | 
				
			||||||
#include "task.h"
 | 
					#include "task.h"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
enum nanos6_chan_type {
 | 
					 | 
				
			||||||
	NANOS6_CHAN_TASKID = 0,
 | 
					 | 
				
			||||||
	NANOS6_CHAN_TYPE,
 | 
					 | 
				
			||||||
	NANOS6_CHAN_SUBSYSTEM,
 | 
					 | 
				
			||||||
	NANOS6_CHAN_RANK,
 | 
					 | 
				
			||||||
	NANOS6_CHAN_THREAD,
 | 
					 | 
				
			||||||
	NANOS6_CHAN_MAX,
 | 
					 | 
				
			||||||
};
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
enum nanos6_ss_state {
 | 
					 | 
				
			||||||
	ST_NANOS6_TASK_BODY = 1,
 | 
					 | 
				
			||||||
	ST_NANOS6_TASK_CREATING,
 | 
					 | 
				
			||||||
	ST_NANOS6_TASK_SUBMIT,
 | 
					 | 
				
			||||||
	ST_NANOS6_TASK_SPAWNING,
 | 
					 | 
				
			||||||
	ST_NANOS6_TASK_FOR,
 | 
					 | 
				
			||||||
	ST_NANOS6_SCHED_ADDING,
 | 
					 | 
				
			||||||
	ST_NANOS6_SCHED_PROCESSING,
 | 
					 | 
				
			||||||
	ST_NANOS6_SCHED_SERVING,
 | 
					 | 
				
			||||||
	ST_NANOS6_DEP_REG,
 | 
					 | 
				
			||||||
	ST_NANOS6_DEP_UNREG,
 | 
					 | 
				
			||||||
	ST_NANOS6_BLK_TASKWAIT,
 | 
					 | 
				
			||||||
	ST_NANOS6_BLK_WAITFOR,
 | 
					 | 
				
			||||||
	ST_NANOS6_BLK_BLOCKING,
 | 
					 | 
				
			||||||
	ST_NANOS6_BLK_UNBLOCKING,
 | 
					 | 
				
			||||||
	ST_NANOS6_ALLOCATING,
 | 
					 | 
				
			||||||
	ST_NANOS6_FREEING,
 | 
					 | 
				
			||||||
	ST_NANOS6_HANDLING_TASK,
 | 
					 | 
				
			||||||
	ST_NANOS6_WORKER_LOOP,
 | 
					 | 
				
			||||||
	ST_NANOS6_SWITCH_TO,
 | 
					 | 
				
			||||||
	ST_NANOS6_MIGRATE,
 | 
					 | 
				
			||||||
	ST_NANOS6_SUSPEND,
 | 
					 | 
				
			||||||
	ST_NANOS6_RESUME,
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	/* Value 51 is broken in old Paraver */
 | 
					 | 
				
			||||||
	EV_NANOS6_SCHED_RECV = 60,
 | 
					 | 
				
			||||||
	EV_NANOS6_SCHED_SEND,
 | 
					 | 
				
			||||||
	EV_NANOS6_SCHED_SELF,
 | 
					 | 
				
			||||||
	EV_NANOS6_CPU_IDLE,
 | 
					 | 
				
			||||||
	EV_NANOS6_CPU_ACTIVE,
 | 
					 | 
				
			||||||
	EV_NANOS6_SIGNAL,
 | 
					 | 
				
			||||||
};
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
enum nanos6_thread_type {
 | 
					 | 
				
			||||||
	ST_NANOS6_TH_LEADER = 1,
 | 
					 | 
				
			||||||
	ST_NANOS6_TH_MAIN = 2,
 | 
					 | 
				
			||||||
	ST_NANOS6_TH_WORKER = 3,
 | 
					 | 
				
			||||||
	ST_NANOS6_TH_EXTERNAL = 4,
 | 
					 | 
				
			||||||
};
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
struct nanos6_thread {
 | 
					struct nanos6_thread {
 | 
				
			||||||
	struct chan chans[NANOS6_CHAN_MAX];
 | 
						struct chan *ch;	/* Raw, modified by nanos6 */
 | 
				
			||||||
	struct chan fchans[NANOS6_CHAN_MAX];
 | 
						struct chan *ch_run;	/* Tracking running thread */
 | 
				
			||||||
	struct chan *ochans[NANOS6_CHAN_MAX];
 | 
						struct chan *ch_act;	/* Tracking active thread */
 | 
				
			||||||
	struct mux muxers[NANOS6_CHAN_MAX];
 | 
						struct chan **ch_out;	/* Output to PRV */
 | 
				
			||||||
 | 
						struct mux *mux_run;
 | 
				
			||||||
 | 
						struct mux *mux_act;
 | 
				
			||||||
	struct task_stack task_stack;
 | 
						struct task_stack task_stack;
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
struct nanos6_cpu {
 | 
					struct nanos6_cpu {
 | 
				
			||||||
	struct chan chans[NANOS6_CHAN_MAX];
 | 
						struct chan *ch;
 | 
				
			||||||
 | 
						struct mux *mux;
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
struct nanos6_proc {
 | 
					struct nanos6_proc {
 | 
				
			||||||
 | 
				
			|||||||
@ -48,6 +48,8 @@ cb_select(struct chan *sel_chan, void *ptr)
 | 
				
			|||||||
{
 | 
					{
 | 
				
			||||||
	struct mux *mux = ptr;
 | 
						struct mux *mux = ptr;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						dbg("selecting input for output chan chan=%s", mux->output->name);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	struct value sel_value;
 | 
						struct value sel_value;
 | 
				
			||||||
	if (chan_read(sel_chan, &sel_value) != 0) {
 | 
						if (chan_read(sel_chan, &sel_value) != 0) {
 | 
				
			||||||
		err("cb_select: chan_read(select) failed\n");
 | 
							err("cb_select: chan_read(select) failed\n");
 | 
				
			||||||
@ -104,8 +106,8 @@ cb_input(struct chan *in_chan, void *ptr)
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	/* Nothing to do, the input is not selected */
 | 
						/* Nothing to do, the input is not selected */
 | 
				
			||||||
	if (input == NULL || input->chan != in_chan) {
 | 
						if (input == NULL || input->chan != in_chan) {
 | 
				
			||||||
		dbg("mux: input channel %s changed but not selected\n",
 | 
							//dbg("input channel %s changed but not selected\n",
 | 
				
			||||||
				in_chan->name);
 | 
							//		in_chan->name);
 | 
				
			||||||
		return 0;
 | 
							return 0;
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -119,6 +121,11 @@ cb_input(struct chan *in_chan, void *ptr)
 | 
				
			|||||||
		return -1;
 | 
							return -1;
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						char buf[128];
 | 
				
			||||||
 | 
						UNUSED(buf);
 | 
				
			||||||
 | 
						dbg("setting output chan %s to value %s",
 | 
				
			||||||
 | 
								mux->output->name, value_str(out_value, buf));
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if (chan_set(mux->output, out_value) != 0) {
 | 
						if (chan_set(mux->output, out_value) != 0) {
 | 
				
			||||||
		err("cb_input: chan_set() failed\n");
 | 
							err("cb_input: chan_set() failed\n");
 | 
				
			||||||
		return -1;
 | 
							return -1;
 | 
				
			||||||
@ -165,7 +172,7 @@ mux_init(struct mux *mux,
 | 
				
			|||||||
	 * as the last output value, so we allow duplicates too */
 | 
						 * as the last output value, so we allow duplicates too */
 | 
				
			||||||
	chan_prop_set(output, CHAN_DUPLICATES, 1);
 | 
						chan_prop_set(output, CHAN_DUPLICATES, 1);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	memset(mux, 0, sizeof(struct mux_input));
 | 
						memset(mux, 0, sizeof(struct mux));
 | 
				
			||||||
	mux->select = select;
 | 
						mux->select = select;
 | 
				
			||||||
	mux->output = output;
 | 
						mux->output = output;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
				
			|||||||
@ -39,6 +39,7 @@ USE_RET int mux_init(struct mux *mux,
 | 
				
			|||||||
USE_RET struct mux_input *mux_find_input(struct mux *mux,
 | 
					USE_RET struct mux_input *mux_find_input(struct mux *mux,
 | 
				
			||||||
		struct value key);
 | 
							struct value key);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					/* TODO: use an index to select the input in O(1) */
 | 
				
			||||||
USE_RET int mux_add_input(struct mux *mux,
 | 
					USE_RET int mux_add_input(struct mux *mux,
 | 
				
			||||||
		struct value key,
 | 
							struct value key,
 | 
				
			||||||
		struct chan *input);
 | 
							struct chan *input);
 | 
				
			||||||
 | 
				
			|||||||
@ -8,6 +8,7 @@
 | 
				
			|||||||
#include "thread.h"
 | 
					#include "thread.h"
 | 
				
			||||||
#include "parson.h"
 | 
					#include "parson.h"
 | 
				
			||||||
#include "uthash.h"
 | 
					#include "uthash.h"
 | 
				
			||||||
 | 
					#include "extend.h"
 | 
				
			||||||
#include <stddef.h>
 | 
					#include <stddef.h>
 | 
				
			||||||
#include <stdint.h>
 | 
					#include <stdint.h>
 | 
				
			||||||
#include <linux/limits.h>
 | 
					#include <linux/limits.h>
 | 
				
			||||||
@ -36,6 +37,7 @@ struct proc {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	//struct model_ctx ctx;
 | 
						//struct model_ctx ctx;
 | 
				
			||||||
	UT_hash_handle hh; /* procs in the loom */
 | 
						UT_hash_handle hh; /* procs in the loom */
 | 
				
			||||||
 | 
						struct extend ext;
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
int proc_relpath_get_pid(const char *relpath, int *pid);
 | 
					int proc_relpath_get_pid(const char *relpath, int *pid);
 | 
				
			||||||
 | 
				
			|||||||
@ -84,13 +84,16 @@ emit(struct prv *prv, struct prv_chan *rchan)
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	/* Ensure we don't emit the same value twice */
 | 
						/* Ensure we don't emit the same value twice */
 | 
				
			||||||
	if (rchan->last_value_set) {
 | 
						if (rchan->last_value_set && value_is_equal(&value, &rchan->last_value)) {
 | 
				
			||||||
		/* TODO: skip optionally */
 | 
							char buf[128];
 | 
				
			||||||
		if (value_is_equal(&value, &rchan->last_value)) {
 | 
							if (rchan->flags & PRV_DUP) {
 | 
				
			||||||
			char buf[128];
 | 
								dbg("skipping duplicated value %s for channel %s\n",
 | 
				
			||||||
			err("skipping duplicated value %s for channel %s\n",
 | 
					 | 
				
			||||||
					value_str(value, buf), chan->name);
 | 
										value_str(value, buf), chan->name);
 | 
				
			||||||
			return 0;
 | 
								return 0;
 | 
				
			||||||
 | 
							} else {
 | 
				
			||||||
 | 
								err("error duplicated value %s for channel %s\n",
 | 
				
			||||||
 | 
										value_str(value, buf), chan->name);
 | 
				
			||||||
 | 
								return -1;
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -139,7 +142,7 @@ cb_prv(struct chan *chan, void *ptr)
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
int
 | 
					int
 | 
				
			||||||
prv_register(struct prv *prv, long row, long type, struct bay *bay, struct chan *chan)
 | 
					prv_register(struct prv *prv, long row, long type, struct bay *bay, struct chan *chan, long flags)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
	/* FIXME: use the type instead of channel name as key */
 | 
						/* FIXME: use the type instead of channel name as key */
 | 
				
			||||||
	struct prv_chan *rchan = find_prv_chan(prv, chan->name);
 | 
						struct prv_chan *rchan = find_prv_chan(prv, chan->name);
 | 
				
			||||||
@ -160,6 +163,7 @@ prv_register(struct prv *prv, long row, long type, struct bay *bay, struct chan
 | 
				
			|||||||
	rchan->prv = prv;
 | 
						rchan->prv = prv;
 | 
				
			||||||
	rchan->last_value = value_null();
 | 
						rchan->last_value = value_null();
 | 
				
			||||||
	rchan->last_value_set = 0;
 | 
						rchan->last_value_set = 0;
 | 
				
			||||||
 | 
						rchan->flags = flags;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	/* Add emit callback */
 | 
						/* Add emit callback */
 | 
				
			||||||
	if (bay_add_cb(bay, BAY_CB_EMIT, chan, cb_prv, rchan) != 0) {
 | 
						if (bay_add_cb(bay, BAY_CB_EMIT, chan, cb_prv, rchan) != 0) {
 | 
				
			||||||
 | 
				
			|||||||
@ -11,11 +11,16 @@
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
struct prv;
 | 
					struct prv;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					enum prv_flags {
 | 
				
			||||||
 | 
						PRV_DUP = 1,
 | 
				
			||||||
 | 
					};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
struct prv_chan {
 | 
					struct prv_chan {
 | 
				
			||||||
	struct prv *prv;
 | 
						struct prv *prv;
 | 
				
			||||||
	struct chan *chan;
 | 
						struct chan *chan;
 | 
				
			||||||
	long row_base1;
 | 
						long row_base1;
 | 
				
			||||||
	long type;
 | 
						long type;
 | 
				
			||||||
 | 
						long flags;
 | 
				
			||||||
	int last_value_set;
 | 
						int last_value_set;
 | 
				
			||||||
	struct value last_value;
 | 
						struct value last_value;
 | 
				
			||||||
	UT_hash_handle hh; /* Indexed by chan->name */
 | 
						UT_hash_handle hh; /* Indexed by chan->name */
 | 
				
			||||||
@ -30,7 +35,7 @@ struct prv {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
int prv_open(struct prv *prv, long nrows, const char *path);
 | 
					int prv_open(struct prv *prv, long nrows, const char *path);
 | 
				
			||||||
int prv_open_file(struct prv *prv, long nrows, FILE *file);
 | 
					int prv_open_file(struct prv *prv, long nrows, FILE *file);
 | 
				
			||||||
int prv_register(struct prv *prv, long row, long type, struct bay *bay, struct chan *c);
 | 
					int prv_register(struct prv *prv, long row, long type, struct bay *bay, struct chan *chan, long flags);
 | 
				
			||||||
int prv_advance(struct prv *prv, int64_t time);
 | 
					int prv_advance(struct prv *prv, int64_t time);
 | 
				
			||||||
void prv_close(struct prv *prv);
 | 
					void prv_close(struct prv *prv);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
				
			|||||||
@ -143,7 +143,7 @@ thread_connect(struct thread *th, struct bay *bay, struct recorder *rec)
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
		long type = chan_type[i];
 | 
							long type = chan_type[i];
 | 
				
			||||||
		long row = th->gindex;
 | 
							long row = th->gindex;
 | 
				
			||||||
		if (prv_register(prv, row, type, bay, c)) {
 | 
							if (prv_register(prv, row, type, bay, c, PRV_DUP)) {
 | 
				
			||||||
			err("prv_register failed");
 | 
								err("prv_register failed");
 | 
				
			||||||
			return -1;
 | 
								return -1;
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
@ -213,7 +213,7 @@ thread_select_active(struct mux *mux,
 | 
				
			|||||||
	enum thread_state state = (enum thread_state) value.i;
 | 
						enum thread_state state = (enum thread_state) value.i;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if (mux->ninputs != 1) {
 | 
						if (mux->ninputs != 1) {
 | 
				
			||||||
		err("expecting NULL or INT64 channel value");
 | 
							err("mux doesn't have one input but %d", mux->ninputs);
 | 
				
			||||||
		return -1;
 | 
							return -1;
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
				
			|||||||
@ -32,6 +32,8 @@ value_is_equal(struct value *a, struct value *b)
 | 
				
			|||||||
		return 1;
 | 
							return 1;
 | 
				
			||||||
	else if (a->type == VALUE_DOUBLE && a->d == b->d)
 | 
						else if (a->type == VALUE_DOUBLE && a->d == b->d)
 | 
				
			||||||
		return 1;
 | 
							return 1;
 | 
				
			||||||
 | 
						else if (a->type == VALUE_NULL && b->type == VALUE_NULL)
 | 
				
			||||||
 | 
							return 1;
 | 
				
			||||||
	else
 | 
						else
 | 
				
			||||||
		return 0;
 | 
							return 0;
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
				
			|||||||
@ -861,7 +861,8 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 | 
				
			|||||||
#define DL_FOREACH2(head, el, next) \
 | 
					#define DL_FOREACH2(head, el, next) \
 | 
				
			||||||
	for ((el) = (head); el; (el) = (el)->next)
 | 
						for ((el) = (head); el; (el) = (el)->next)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
/* this version is safe for deleting the elements during iteration */
 | 
					/* this version is safe for deleting the elements during iteration (not for
 | 
				
			||||||
 | 
					 * appending!) */
 | 
				
			||||||
#define DL_FOREACH_SAFE(head, el, tmp) \
 | 
					#define DL_FOREACH_SAFE(head, el, tmp) \
 | 
				
			||||||
	DL_FOREACH_SAFE2(head, el, tmp, next)
 | 
						DL_FOREACH_SAFE2(head, el, tmp, next)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user