Fix ovnisort with flush events

Sort the events in the sorting region before injecting them back in the
stream. This solves the problem with flush events OF[ and OF].
This commit is contained in:
Rodrigo Arias 2023-06-27 19:23:43 +02:00
parent 929af74d3f
commit 5a9086e6d1

View File

@ -7,8 +7,9 @@
* we go back until we find a suitable position and start injecting the events * we go back until we find a suitable position and start injecting the events
* in order. * in order.
* *
* The events inside a unsorted region must be ordered. And the number of events * The events inside a unsorted region may not be ordered, they will be sorted
* that we will look back is limited by N. * by qsort() first. The number of events that we will look back is limited by
* N.
*/ */
#include <fcntl.h> #include <fcntl.h>
@ -78,8 +79,6 @@ find_destination(struct ring *r, uint64_t clock)
{ {
ssize_t nback = 0; ssize_t nback = 0;
UNUSED(nback);
ssize_t start = r->tail - 1 >= 0 ? r->tail - 1 : r->size - 1; ssize_t start = r->tail - 1 >= 0 ? r->tail - 1 : r->size - 1;
ssize_t end = r->head - 1 >= 0 ? r->head - 1 : r->size - 1; ssize_t end = r->head - 1 >= 0 ? r->head - 1 : r->size - 1;
uint64_t last_clock = 0; uint64_t last_clock = 0;
@ -125,50 +124,130 @@ ends_unsorted_region(struct ovni_ev *ev)
return ev->header.model == 'O' && ev->header.category == 'U' && ev->header.value == ']'; return ev->header.model == 'O' && ev->header.category == 'U' && ev->header.value == ']';
} }
static void static uint64_t
sort_buf(uint8_t *src, uint8_t *buf, int64_t bufsize, find_min_clock(uint8_t *src, uint8_t *end)
uint8_t *srcbad, uint8_t *srcnext)
{ {
int64_t injected = 0;
UNUSED(injected);
uint8_t *p = src; uint8_t *p = src;
uint8_t *q = srcbad; struct ovni_ev *ev0 = (struct ovni_ev *) p;
uint64_t min_clock = ev0->header.clock;
while (1) { while (1) {
struct ovni_ev *ep = (struct ovni_ev *) p; if (p >= end)
struct ovni_ev *eq = (struct ovni_ev *) q;
struct ovni_ev *ev = NULL;
int64_t evsize = 0;
if (p < srcbad && ep->header.clock < eq->header.clock) {
ev = ep;
evsize = ovni_ev_size(ev);
p += evsize;
} else {
ev = eq;
evsize = ovni_ev_size(ev);
q += evsize;
}
if ((uint8_t *) ev == srcnext)
break; break;
if ((uint8_t *) ev > srcnext) struct ovni_ev *ev = (struct ovni_ev *) p;
die("exceeded srcnext while sorting"); if (ev->header.clock < min_clock)
min_clock = ev->header.clock;
if (bufsize < evsize) p += ovni_ev_size(ev);
die("no space left in the sort buffer");
memcpy(buf, ev, evsize);
buf += evsize;
bufsize -= evsize;
injected++;
} }
dbg("injected %ld events in the past", injected); return min_clock;
}
static long
count_events(uint8_t *src, uint8_t *end)
{
uint8_t *p = src;
long n = 0;
while (1) {
if (p >= end)
break;
struct ovni_ev *ev = (struct ovni_ev *) p;
p += ovni_ev_size(ev);
n++;
}
return n;
}
static void
index_events(struct ovni_ev **table, long n, uint8_t *buf)
{
uint8_t *p = buf;
for (long i = 0; i < n; i++) {
table[i] = (struct ovni_ev *) p;
p += ovni_ev_size(table[i]);
}
}
static void
write_events(struct ovni_ev **table, long n, uint8_t *buf)
{
for (long i = 0; i < n; i++) {
struct ovni_ev *ev = table[i];
size_t size = ovni_ev_size(ev);
memcpy(buf, ev, size);
buf += size;
dbg("injected event %c%c%c at %ld",
ev->header.model,
ev->header.category,
ev->header.value,
ev->header.clock);
}
}
static int
cmp_ev(const void *a, const void *b)
{
struct ovni_ev **pev1 = (struct ovni_ev **) a;
struct ovni_ev **pev2 = (struct ovni_ev **) b;
struct ovni_ev *ev1 = *pev1;
struct ovni_ev *ev2 = *pev2;
int64_t clock1 = ev1->header.clock;
int64_t clock2 = ev2->header.clock;
if (clock1 < clock2)
return -1;
if (clock1 > clock2)
return +1;
else
return 0;
}
static void
sort_buf(uint8_t *src, uint8_t *buf, int64_t bufsize)
{
struct ovni_ev *ev = (struct ovni_ev *) src;
dbg("first event before sorting %c%c%c at %ld",
ev->header.model,
ev->header.category,
ev->header.value,
ev->header.clock);
/* Create a copy of the array */
uint8_t *buf2 = malloc(bufsize);
if (buf2 == NULL)
die("malloc failed:");
memcpy(buf2, src, bufsize);
long n = count_events(buf2, buf2 + bufsize);
struct ovni_ev **table = calloc(n, sizeof(struct ovni_ev *));
if (table == NULL)
die("calloc failed:");
index_events(table, n, buf2);
qsort(table, n, sizeof(struct ovni_ev *), cmp_ev);
write_events(table, n, buf);
dbg("first event after sorting %c%c%c at %ld",
ev->header.model,
ev->header.category,
ev->header.value,
ev->header.clock);
free(table);
free(buf2);
dbg("sorted %ld events", n);
} }
static void static void
@ -190,14 +269,21 @@ write_stream(int fd, void *base, void *dst, const void *src, size_t size)
static int static int
execute_sort_plan(struct sortplan *sp) execute_sort_plan(struct sortplan *sp)
{ {
uint64_t clock0 = sp->bad0->header.clock;
dbg("attempt to sort: start clock %ld", sp->bad0->header.clock); dbg("attempt to sort: start clock %ld", sp->bad0->header.clock);
/* Cannot sort in one pass; just fail for now */ uint64_t min_clock = find_min_clock((void *) sp->bad0, (void *) sp->next);
int64_t i0 = find_destination(sp->r, sp->bad0->header.clock);
if (i0 < 0) {
err("cannot find destination for region starting at clock %ld",
sp->bad0->header.clock);
if (min_clock < clock0) {
clock0 = min_clock;
dbg("region not sorted, using min clock=%ld", clock0);
}
/* Cannot sort in one pass; just fail for now */
int64_t i0 = find_destination(sp->r, clock0);
if (i0 < 0) {
err("cannot find destination for region starting at clock %ld", clock0);
err("consider increasing the look back size with -n");
return -1; return -1;
} }
@ -214,11 +300,8 @@ execute_sort_plan(struct sortplan *sp)
if (!buf) if (!buf)
die("malloc failed:"); die("malloc failed:");
sort_buf((uint8_t *) first, buf, bufsize, sort_buf((uint8_t *) first, buf, bufsize);
(uint8_t *) sp->bad0, (uint8_t *) sp->next);
/* Copy the sorted events back into the stream buffer */
memcpy(first, buf, bufsize);
write_stream(sp->fd, sp->base, first, buf, bufsize); write_stream(sp->fd, sp->base, first, buf, bufsize);
free(buf); free(buf);
@ -448,6 +531,9 @@ main(int argc, char *argv[])
{ {
progname_set("ovnisort"); progname_set("ovnisort");
if (getenv("OVNI_DEBUG") != NULL)
enable_debug();
parse_args(argc, argv); parse_args(argc, argv);
struct trace *trace = calloc(1, sizeof(struct trace)); struct trace *trace = calloc(1, sizeof(struct trace));