Check ring buffer order after injecting events

Ensure that we are not reading garbage.
This commit is contained in:
Rodrigo Arias 2024-09-06 14:39:44 +02:00
parent b2d91391b3
commit 49149e452c

View File

@ -74,6 +74,21 @@ ring_add(struct ring *r, struct ovni_ev *ev)
r->head = 0; r->head = 0;
} }
static void
ring_check(struct ring *r, long long from)
{
uint64_t last_clock = 0;
long long start = from != -1 ? from : r->head;
for (long long i = start; i != r->tail; i = i + 1 >= r->size ? 0 : i + 1) {
uint64_t clock = r->ev[i]->header.clock;
if (clock < last_clock) {
die("ring not sorted at i=%lld, last_clock=%"PRIu64" clock=%"PRIu64 ,
i, last_clock, clock);
}
last_clock = clock;
}
}
static ssize_t static ssize_t
find_destination(struct ring *r, uint64_t clock) find_destination(struct ring *r, uint64_t clock)
{ {
@ -289,6 +304,7 @@ execute_sort_plan(struct sortplan *sp)
/* Set the pointer to the first event that may be affected */ /* Set the pointer to the first event that may be affected */
struct ovni_ev *first = sp->r->ev[i0]; struct ovni_ev *first = sp->r->ev[i0];
long long dirty = i0;
/* Allocate a working buffer */ /* Allocate a working buffer */
uintptr_t bufsize = (uintptr_t) sp->next - (uintptr_t) first; uintptr_t bufsize = (uintptr_t) sp->next - (uintptr_t) first;
@ -306,6 +322,10 @@ execute_sort_plan(struct sortplan *sp)
free(buf); free(buf);
/* Invariant: The ring buffer is always sorted here. Check from the
* dirty position onwards, so we avoid scanning all events. */
ring_check(sp->r, dirty);
return 0; return 0;
} }
@ -366,6 +386,12 @@ stream_winsort(struct stream *stream, struct ring *r)
} }
} }
/* FIXME: After executing a sort plan, we may have pointers that
* now point to garbage, as the events have moved. We can test
* this by first sorting a given region so that it displaces the
* offsets of the end marker, and then injecting events that are
* inside the first unsorted region. Hopefully, trying to access
* the clock of the end marker will now read garbage. */
ring_add(r, ev); ring_add(r, ev);
} }