diff --git a/src/sort.c b/src/sort.c index c1cdc58..78801ee 100644 --- a/src/sort.c +++ b/src/sort.c @@ -87,15 +87,16 @@ static ssize_t find_destination(struct ring *r, uint64_t clock) { ssize_t nback = 0; - ssize_t i = 0; UNUSED(nback); ssize_t start = r->tail - 1 >= 0 ? r->tail - 1 : r->size - 1; ssize_t end = r->head - 1 >= 0 ? r->head - 1 : r->size - 1; + uint64_t last_clock = 0; - for (i = start; i != end; i = i - 1 < 0 ? r->size - 1 : i - 1) { - if (r->ev[i]->header.clock < clock) { + for (ssize_t i = start; i != end; i = i - 1 < 0 ? r->size - 1 : i - 1) { + last_clock = r->ev[i]->header.clock; + if (last_clock < clock) { dbg("found suitable position %ld events backwards\n", nback); return i; @@ -103,8 +104,19 @@ find_destination(struct ring *r, uint64_t clock) nback++; } + /* If there is no event with a lower clock and we haven't fill the ring + * yet, then we are at the beginning and no other event has be emitted + * before the sort window. So simply return the first marker. */ + if (nback < (ssize_t) max_look_back) { + if (r->head != 0) + die("ring head expected to be 0\n"); + if (r->tail >= r->size - 1) + die("ring tail=%d expected to be less than %d\n", r->tail, r->size - 1); + return r->head; + } + err("cannot find a event previous to clock %lu\n", clock); - err("nback = %ld, last clock=%lu\n", nback, r->ev[i]->header.clock); + err("nback=%ld, last_clock=%lu\n", nback, last_clock); return -1; }