Speedup the sort module

Improvements:

- Don't propagate values if they didn't change
- Use custom sort algorithm to speedup the sorting
- Allocate a contiguous array of channel outputs
This commit is contained in:
Rodrigo Arias 2023-03-08 16:58:29 +01:00 committed by Rodrigo Arias Mallo
parent 1909d8106c
commit 35872354e0
5 changed files with 226 additions and 47 deletions

View File

@ -19,6 +19,58 @@ cmp_int64(const void *a, const void *b)
return 0; return 0;
} }
/** Replaces the value old in the array arr by new, while keeping the
* array arr sorted.
*
* Preconditions:
* - arr is sorted
* - old is in arr
* - old != new
*/
void
sort_replace(int64_t *arr, int64_t n, int64_t old, int64_t new)
{
if (unlikely(old == new))
die("old == new");
/* Remove the old first then insert the new */
if (old < new) {
int64_t i = 0;
/* Quick jump to middle if less than old */
int64_t m = n / 2;
if (arr[m] < old)
i = m;
/* Skip content until old, no need to copy */
for (; arr[i] < old; i++)
;
/* Copy middle section replacing old */
for (; arr[i + 1] <= new && i < n - 1; i++)
arr[i] = arr[i + 1];
/* Place new */
arr[i] = new;
} else { /* new < old */
int64_t i = 0;
/* Find old, must be found */
for (; arr[i] < old; i++)
;
/* Shift right to replace old */
for (; arr[i - 1] > new && i > 0; i--)
arr[i] = arr[i - 1];
/* Invariant: Either i == 0 or arr[i] <= new
* and the element arr[i] must be gone */
/* Place new */
arr[i] = new;
}
}
/** Called when an input channel changes its value */ /** Called when an input channel changes its value */
static int static int
sort_cb_input(struct chan *in_chan, void *ptr) sort_cb_input(struct chan *in_chan, void *ptr)
@ -32,27 +84,47 @@ sort_cb_input(struct chan *in_chan, void *ptr)
return -1; return -1;
} }
int64_t vcur = 0; int64_t new = 0;
if (cur.type == VALUE_INT64) if (cur.type == VALUE_INT64)
vcur = cur.i; new = cur.i;
int64_t index = input->index; int64_t index = input->index;
int64_t last = sort->values[index]; int64_t old = sort->values[index];
/* Nothing to do if no change */ /* Nothing to do if no change */
if (last == vcur) if (old == new)
return 0; return 0;
dbg("sort input %s changed", in_chan->name); dbg("sort input %s changed", in_chan->name);
/* Otherwise recompute the outputs */ /* Otherwise recompute the outputs */
sort->values[index] = vcur; sort->values[index] = new;
memcpy(sort->sorted, sort->values, sort->n * sizeof(int64_t));
qsort(sort->sorted, sort->n, sizeof(int64_t), cmp_int64); if (likely(sort->copied)) {
sort_replace(sort->sorted, sort->n, old, new);
} else {
memcpy(sort->sorted, sort->values, sort->n * sizeof(int64_t));
qsort(sort->sorted, sort->n, sizeof(int64_t), cmp_int64);
sort->copied = 1;
}
for (int64_t i = 0; i < sort->n; i++) { for (int64_t i = 0; i < sort->n; i++) {
struct value val = value_int64(sort->sorted[i]); struct value val = value_int64(sort->sorted[i]);
if (chan_set(sort->outputs[i], val) != 0) { struct value last;
if (chan_read(&sort->outputs[i], &last) != 0) {
err("chan_read failed");
return -1;
}
if (value_is_equal(&last, &val))
continue;
char buf[128];
dbg("writting value %s into channel %s",
value_str(val, buf),
sort->outputs[i].name);
if (chan_set(&sort->outputs[i], val) != 0) {
err("chan_set failed"); err("chan_set failed");
return -1; return -1;
} }
@ -62,7 +134,7 @@ sort_cb_input(struct chan *in_chan, void *ptr)
} }
int int
sort_init(struct sort *sort, struct bay *bay, int64_t n) sort_init(struct sort *sort, struct bay *bay, int64_t n, const char *name)
{ {
memset(sort, 0, sizeof(struct sort)); memset(sort, 0, sizeof(struct sort));
sort->bay = bay; sort->bay = bay;
@ -72,7 +144,7 @@ sort_init(struct sort *sort, struct bay *bay, int64_t n)
err("calloc failed:"); err("calloc failed:");
return -1; return -1;
} }
sort->outputs = calloc(n, sizeof(struct chan *)); sort->outputs = calloc(n, sizeof(struct chan));
if (sort->outputs == NULL) { if (sort->outputs == NULL) {
err("calloc failed:"); err("calloc failed:");
return -1; return -1;
@ -88,6 +160,25 @@ sort_init(struct sort *sort, struct bay *bay, int64_t n)
return -1; return -1;
} }
/* Init and register outputs */
for (int64_t i = 0; i < n; i++) {
struct chan *out = &sort->outputs[i];
chan_init(out, CHAN_SINGLE, "%s.out%ld", name, i);
/* The sort module may write multiple times to the same
* channel if we update more than one input. */
chan_prop_set(out, CHAN_DIRTY_WRITE, 1);
/* No duplicate checks are done by sort module, so we
* simply allow them */
chan_prop_set(out, CHAN_DUPLICATES, 1);
if (bay_register(bay, out) != 0) {
err("bay_register out%ld failed", i);
return -1;
}
}
return 0; return 0;
} }
@ -113,22 +204,8 @@ sort_set_input(struct sort *sort, int64_t index, struct chan *chan)
return 0; return 0;
} }
int struct chan *
sort_set_output(struct sort *sort, int64_t index, struct chan *chan) sort_get_output(struct sort *sort, int64_t index)
{ {
if (sort->outputs[index] != NULL) { return &sort->outputs[index];
err("output %d already has a channel", index);
return -1;
}
sort->outputs[index] = chan;
/* The sort module may write multiple times to the same channel if we
* update more than one input. */
chan_prop_set(chan, CHAN_DIRTY_WRITE, 1);
/* No duplicate checks are done by sort module, so we simply allow them */
chan_prop_set(chan, CHAN_DUPLICATES, 1);
return 0;
} }

View File

@ -18,15 +18,17 @@ struct sort_input {
struct sort { struct sort {
int64_t n; int64_t n;
struct sort_input *inputs; struct sort_input *inputs;
struct chan **outputs; struct chan *outputs;
int64_t *values; int64_t *values;
int64_t *sorted; int64_t *sorted;
int copied;
struct bay *bay; struct bay *bay;
}; };
USE_RET int sort_init(struct sort *sort, struct bay *bay, int64_t n); USE_RET int sort_init(struct sort *sort, struct bay *bay, int64_t n, const char *name);
void sort_replace(int64_t *arr, int64_t n, int64_t old, int64_t new);
USE_RET int sort_set_input(struct sort *sort, int64_t index, struct chan *input); USE_RET int sort_set_input(struct sort *sort, int64_t index, struct chan *input);
USE_RET int sort_set_output(struct sort *sort, int64_t index, struct chan *output); USE_RET struct chan *sort_get_output(struct sort *sort, int64_t index);
USE_RET int sort_register(struct sort *sort, struct bay *bay); USE_RET int sort_register(struct sort *sort, struct bay *bay);
#endif /* SORT_H */ #endif /* SORT_H */

View File

@ -16,3 +16,4 @@ unit_test(value.c)
unit_test(version.c) unit_test(version.c)
unit_test(path.c) unit_test(path.c)
unit_test(sort.c) unit_test(sort.c)
unit_test(sort_replace.c)

View File

@ -32,46 +32,39 @@ test_sort(void)
bay_init(&bay); bay_init(&bay);
struct chan inputs[N]; struct chan inputs[N];
struct chan outputs[N];
for (int i = 0; i < N; i++) { for (int i = 0; i < N; i++)
chan_init(&inputs[i], CHAN_SINGLE, "input.%d", i); chan_init(&inputs[i], CHAN_SINGLE, "input.%d", i);
chan_init(&outputs[i], CHAN_SINGLE, "output.%d", i);
}
/* Register all channels in the bay */ /* Register all channels in the bay */
for (int i = 0; i < N; i++) { for (int i = 0; i < N; i++)
OK(bay_register(&bay, &inputs[i])); OK(bay_register(&bay, &inputs[i]));
OK(bay_register(&bay, &outputs[i]));
}
/* Setup channel values */ /* Setup channel values */
for (int i = 0; i < N; i++) { for (int i = 0; i < N; i++)
OK(chan_set(&inputs[i], value_int64(0))); OK(chan_set(&inputs[i], value_int64(0)));
}
OK(bay_propagate(&bay)); OK(bay_propagate(&bay));
struct sort sort; struct sort sort;
OK(sort_init(&sort, &bay, N)); OK(sort_init(&sort, &bay, N, "sort0"));
for (int i = 0; i < N; i++) { for (int i = 0; i < N; i++)
OK(sort_set_input(&sort, i, &inputs[i])); OK(sort_set_input(&sort, i, &inputs[i]));
OK(sort_set_output(&sort, i, &outputs[i]));
}
for (int i = 0; i < 2; i++) { for (int i = 0; i < 2; i++)
OK(chan_set(&inputs[i], value_int64(1))); OK(chan_set(&inputs[i], value_int64(1)));
}
OK(bay_propagate(&bay)); OK(bay_propagate(&bay));
/* Check the outputs */ /* Check the outputs */
for (int i = 0; i < N - 2; i++) { for (int i = 0; i < N - 2; i++) {
check_output(&outputs[i], value_int64(0)); struct chan *out = sort_get_output(&sort, i);
check_output(out, value_int64(0));
} }
for (int i = N - 2; i < N; i++) { for (int i = N - 2; i < N; i++) {
check_output(&outputs[i], value_int64(1)); struct chan *out = sort_get_output(&sort, i);
check_output(out, value_int64(1));
} }
} }

106
test/unit/sort_replace.c Normal file
View File

@ -0,0 +1,106 @@
/* Copyright (c) 2021-2023 Barcelona Supercomputing Center (BSC)
* SPDX-License-Identifier: GPL-3.0-or-later */
#include "emu/sort.h"
#include "common.h"
#include "unittest.h"
static int64_t
randint(void)
{
return rand() % 1000;
}
static int
cmp_int64(const void *a, const void *b)
{
int64_t aa = *(const int64_t *) a;
int64_t bb = *(const int64_t *) b;
if (aa < bb)
return -1;
else if (aa > bb)
return +1;
else
return 0;
}
static void
test_case(int64_t n, int64_t run)
{
srand(run);
int64_t *arr = calloc(n, sizeof(int64_t));
if (arr == NULL)
die("calloc failed:");
for (int64_t i = 0; i < n; i++)
arr[i] = randint();
qsort(arr, n, sizeof(int64_t), cmp_int64);
int64_t *copy = calloc(n, sizeof(int64_t));
if (copy == NULL)
die("calloc failed:");
memcpy(copy, arr, n * sizeof(int64_t));
int64_t iold = rand() % n;
int64_t old = arr[iold];
int64_t new = randint();
/* Ensure old != new */
while (old == new)
new = randint();
dbg("-- CASE run=%ld n=%ld iold=%ld old=%ld new=%ld\n",
run, n, iold, old, new);
dbg("Contents before sort: ");
for (int64_t i = 0; i < n; i++) {
dbg("i=%ld, arr[i]=%ld, copy[i]=%ld\n",
i, arr[i], copy[i]);
}
sort_replace(arr, n, old, new);
copy[iold] = new;
qsort(copy, n, sizeof(int64_t), cmp_int64);
dbg("Contents after sort: ");
for (int64_t i = 0; i < n; i++) {
dbg("i=%ld, arr[i]=%ld, copy[i]=%ld\n",
i, arr[i], copy[i]);
}
if (memcmp(arr, copy, n * sizeof(int64_t)) == 0)
return;
die("mismatch");
}
static void
test_sort_replace(void)
{
int64_t nmin = 2;
int64_t nmax = 300;
int64_t nrun = 500;
srand(123);
for (int64_t n = nmin; n <= nmax; n++) {
for (int64_t run = 0; run < nrun; run++)
test_case(n, run);
err("n = %ld OK", n);
}
}
int
main(void)
{
test_sort_replace();
err("OK\n");
return 0;
}