Add sort channel module

Sorts the N input values and writes them in the N outputs in order.
This commit is contained in:
Rodrigo Arias 2023-02-28 16:36:58 +01:00 committed by Rodrigo Arias Mallo
parent a4ce0e2a1e
commit 70b29b6459
5 changed files with 254 additions and 0 deletions

View File

@ -29,6 +29,7 @@ add_library(emu STATIC
loom.c
metadata.c
mux.c
sort.c
path.c
proc.c
pv/pcf.c

134
src/emu/sort.c Normal file
View File

@ -0,0 +1,134 @@
/* Copyright (c) 2021-2023 Barcelona Supercomputing Center (BSC)
* SPDX-License-Identifier: GPL-3.0-or-later */
//#define ENABLE_DEBUG
#include "sort.h"
static int
cmp_int64(const void *a, const void *b)
{
int64_t aa = *(const int64_t *) a;
int64_t bb = *(const int64_t *) b;
if (aa < bb)
return -1;
else if (aa > bb)
return +1;
else
return 0;
}
/** Called when an input channel changes its value */
static int
sort_cb_input(struct chan *in_chan, void *ptr)
{
struct sort_input *input = ptr;
struct sort *sort = input->sort;
struct value cur;
if (chan_read(in_chan, &cur) != 0) {
err("chan_read() failed\n");
return -1;
}
int64_t vcur = 0;
if (cur.type == VALUE_INT64)
vcur = cur.i;
int64_t index = input->index;
int64_t last = sort->values[index];
/* Nothing to do if no change */
if (last == vcur)
return 0;
dbg("sort input %s changed", in_chan->name);
/* Otherwise recompute the outputs */
sort->values[index] = vcur;
memcpy(sort->sorted, sort->values, sort->n * sizeof(int64_t));
qsort(sort->sorted, sort->n, sizeof(int64_t), cmp_int64);
for (int64_t i = 0; i < sort->n; i++) {
struct value val = value_int64(sort->sorted[i]);
if (chan_set(sort->outputs[i], val) != 0) {
err("chan_set failed");
return -1;
}
}
return 0;
}
int
sort_init(struct sort *sort, struct bay *bay, int64_t n)
{
memset(sort, 0, sizeof(struct sort));
sort->bay = bay;
sort->n = n;
sort->inputs = calloc(n, sizeof(struct sort_input));
if (sort->inputs == NULL) {
err("calloc failed:");
return -1;
}
sort->outputs = calloc(n, sizeof(struct chan *));
if (sort->outputs == NULL) {
err("calloc failed:");
return -1;
}
sort->values = calloc(n, sizeof(int64_t));
if (sort->values == NULL) {
err("calloc failed:");
return -1;
}
sort->sorted = calloc(n, sizeof(int64_t));
if (sort->sorted == NULL) {
err("calloc failed:");
return -1;
}
return 0;
}
int
sort_set_input(struct sort *sort, int64_t index, struct chan *chan)
{
struct sort_input *input = &sort->inputs[index];
if (input->chan != NULL) {
err("input %d already has a channel", index);
return -1;
}
input->chan = chan;
input->index = index;
input->sort = sort;
if (bay_add_cb(sort->bay, BAY_CB_DIRTY, chan, sort_cb_input, input, 1) == NULL) {
err("bay_add_cb failed");
return -1;
}
return 0;
}
int
sort_set_output(struct sort *sort, int64_t index, struct chan *chan)
{
if (sort->outputs[index] != NULL) {
err("output %d already has a channel", index);
return -1;
}
sort->outputs[index] = chan;
/* The sort module may write multiple times to the same channel if we
* update more than one input. */
chan_prop_set(chan, CHAN_DIRTY_WRITE, 1);
/* No duplicate checks are done by sort module, so we simply allow them */
chan_prop_set(chan, CHAN_DUPLICATES, 1);
return 0;
}

32
src/emu/sort.h Normal file
View File

@ -0,0 +1,32 @@
/* Copyright (c) 2021-2023 Barcelona Supercomputing Center (BSC)
* SPDX-License-Identifier: GPL-3.0-or-later */
#ifndef SORT_H
#define SORT_H
#include <stdint.h>
#include "bay.h"
struct sort;
struct sort_input {
int64_t index;
struct chan *chan;
struct sort *sort;
};
struct sort {
int64_t n;
struct sort_input *inputs;
struct chan **outputs;
int64_t *values;
int64_t *sorted;
struct bay *bay;
};
USE_RET int sort_init(struct sort *sort, struct bay *bay, int64_t n);
USE_RET int sort_set_input(struct sort *sort, int64_t index, struct chan *input);
USE_RET int sort_set_output(struct sort *sort, int64_t index, struct chan *output);
USE_RET int sort_register(struct sort *sort, struct bay *bay);
#endif /* SORT_H */

View File

@ -15,3 +15,4 @@ unit_test(thread.c)
unit_test(value.c)
unit_test(version.c)
unit_test(path.c)
unit_test(sort.c)

86
test/unit/sort.c Normal file
View File

@ -0,0 +1,86 @@
/* Copyright (c) 2021-2023 Barcelona Supercomputing Center (BSC)
* SPDX-License-Identifier: GPL-3.0-or-later */
#include "emu/sort.h"
#include "common.h"
#include "unittest.h"
#define N 10
static void
check_output(struct chan *chan, struct value expected)
{
struct value out_value = value_null();
if (chan_read(chan, &out_value) != 0)
die("chan_read() failed for channel %s", chan->name);
char buf1[128];
if (!value_is_equal(&out_value, &expected)) {
char buf2[128];
die("unexpected value found %s in output (expected %s)\n",
value_str(out_value, buf1),
value_str(expected, buf2));
}
err("output ok: chan=%s val=%s", chan->name, value_str(out_value, buf1));
}
static void
test_sort(void)
{
struct bay bay;
bay_init(&bay);
struct chan inputs[N];
struct chan outputs[N];
for (int i = 0; i < N; i++) {
chan_init(&inputs[i], CHAN_SINGLE, "input.%d", i);
chan_init(&outputs[i], CHAN_SINGLE, "output.%d", i);
}
/* Register all channels in the bay */
for (int i = 0; i < N; i++) {
OK(bay_register(&bay, &inputs[i]));
OK(bay_register(&bay, &outputs[i]));
}
/* Setup channel values */
for (int i = 0; i < N; i++) {
OK(chan_set(&inputs[i], value_int64(0)));
}
OK(bay_propagate(&bay));
struct sort sort;
OK(sort_init(&sort, &bay, N));
for (int i = 0; i < N; i++) {
OK(sort_set_input(&sort, i, &inputs[i]));
OK(sort_set_output(&sort, i, &outputs[i]));
}
for (int i = 0; i < 2; i++) {
OK(chan_set(&inputs[i], value_int64(1)));
}
OK(bay_propagate(&bay));
/* Check the outputs */
for (int i = 0; i < N - 2; i++) {
check_output(&outputs[i], value_int64(0));
}
for (int i = N - 2; i < N; i++) {
check_output(&outputs[i], value_int64(1));
}
}
int
main(void)
{
test_sort();
err("OK\n");
return 0;
}