Implement drift detection and add plot script

This commit is contained in:
Rodrigo Arias 2021-08-19 20:34:09 +02:00
parent cf829f3630
commit 67a9b8f319
3 changed files with 396 additions and 95 deletions

View File

@ -29,7 +29,8 @@ libovni.so: ovni.o parson.o
ovni2prv: ovni2prv.c ovni.o parson.o ovni2prv: ovni2prv.c ovni.o parson.o
ovnisync: ovnisync.c ovnisync: ovnisync.c
mpicc -lm $^ -o $@ #OMPI_CC=clang mpicc -fsanitize=memory -fsanitize-recover=memory -fno-omit-frame-pointer -g -O2 -lm $^ -o $@
mpicc -g -O0 -lm $^ -o $@
clean: clean:
rm -f *.o $(BIN) rm -f *.o $(BIN)

View File

@ -5,20 +5,64 @@
#include <stdlib.h> #include <stdlib.h>
#include <math.h> #include <math.h>
#include <unistd.h> #include <unistd.h>
#include <assert.h>
#include "ovni.h" #include "ovni.h"
const char progname[] = "ovnisync";
struct offset {
/* All in nanoseconds */
double delta_mean;
double delta_median;
double delta_var;
double delta_std;
/* The value selected for the offset */
int64_t offset;
/* In seconds */
double wall_t0;
double wall_t1;
char hostname[OVNI_MAX_HOSTNAME];
int rank;
int nsamples;
/* Flexible array */
double clock_sample[];
};
struct offset_table {
int nprocs;
struct offset *_offset;
struct offset **offset;
};
struct options {
int nsamples;
int ndrift_samples;
int drift_wait; /* in seconds */
int verbose;
};
static double static double
get_time() get_time(clockid_t clock, int use_ns)
{ {
struct timespec tv; struct timespec tv;
if(clock_gettime(CLOCK_MONOTONIC, &tv) != 0) if(clock_gettime(clock, &tv) != 0)
{ {
perror("clock_gettime failed"); perror("clock_gettime failed");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
return (double)(tv.tv_sec) * 1.0e-9 +
(double)tv.tv_nsec; if(use_ns)
return (double)(tv.tv_sec) * 1.0e9 +
(double)tv.tv_nsec;
return (double)(tv.tv_sec) +
(double)tv.tv_nsec * 1.0e-9;
} }
static int static int
@ -37,123 +81,339 @@ cmp_double(const void *pa, const void *pb)
return 0; return 0;
} }
/* Called by rank 0 */ void
static void usage(int argc, char *argv[])
get_offset(double *timetable, char (*hosttable)[OVNI_MAX_HOSTNAME], int nproc, int nsamples)
{ {
int i, j; fprintf(stderr, "%s: clock synchronization utility\n", progname);
double median, mean, var, std; fprintf(stderr, "\n");
double *offset; fprintf(stderr, "Usage: %s [-d ndrift_samples] [-v] [-n nsamples] [-w drift_delay]\n",
double *delta; progname);
exit(EXIT_FAILURE);
offset = malloc(nproc * sizeof(double));
delta = malloc(nsamples * sizeof(double));
if(!offset || !delta)
{
perror("malloc");
exit(EXIT_FAILURE);
}
/* We use as ref the clock in rank 0 */
printf("%-10s %-20s %-20s %-20s\n", "rank", "hostname", "offset_median", "offset_std");
for(i=0; i<nproc; i++)
{
for(j=0; j<nsamples; j++)
delta[j] = timetable[j] - timetable[i*nsamples + j];
mean = 0.0;
var = 0.0;
qsort(delta, nsamples, sizeof(*delta), cmp_double);
median = delta[nsamples / 2];
for(j=0; j<nsamples; j++)
{
//printf("%f\n", delta[j]);
mean += delta[j];
}
mean /= nsamples;
for(j=0; j<nsamples; j++)
{
var += (delta[j] - mean) * (delta[j] - mean);
}
var /= (double) (nsamples - 1);
std = sqrt(var);
offset[i] = mean;
printf("%-10d %-20s %-20.0f %-20f\n", i, hosttable[i], median, std);
}
free(offset);
free(delta);
} }
int void
main(int argc, char *argv[]) parse_options(struct options *options, int argc, char *argv[])
{ {
double *t; int opt;
double *timetable;
int i, rank, nprocs, nsamples;
char (*hosttable)[OVNI_MAX_HOSTNAME];
MPI_Init(&argc, &argv); /* Default options */
options->ndrift_samples = 1;
options->nsamples = 100;
options->verbose = 0;
options->drift_wait = 5;
if(!argv[1]) while ((opt = getopt(argc, argv, "d:vn:w:")) != -1) {
nsamples = 100; switch (opt) {
else case 'd':
nsamples = atoi(argv[1]); options->ndrift_samples = atoi(optarg);
break;
case 'w':
options->drift_wait = atoi(optarg);
break;
case 'v':
options->verbose = 1;
break;
case 'n':
options->nsamples = atoi(optarg);
break;
default: /* '?' */
usage(argc, argv);
}
}
MPI_Comm_rank(MPI_COMM_WORLD, &rank); if (optind < argc)
MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
timetable = calloc(nsamples * nprocs, sizeof(double));
hosttable = calloc(nprocs, sizeof(*hosttable));
t = calloc(nsamples, sizeof(double));
if(!timetable || !t || !hosttable)
{ {
perror("calloc"); fprintf(stderr, "error: unexpected extra arguments\n");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
}
/* Warm up iterations */ void
for(i=0; i<20; i++) get_clock_samples(struct offset *offset, int nsamples)
{ {
MPI_Barrier(MPI_COMM_WORLD); int i;
get_time();
} /* Keep the wall time as well */
offset->wall_t0 = get_time(CLOCK_REALTIME, 0);
offset->nsamples = nsamples;
for(i=0; i<nsamples; i++) for(i=0; i<nsamples; i++)
{ {
MPI_Barrier(MPI_COMM_WORLD); MPI_Barrier(MPI_COMM_WORLD);
t[i] = get_time(); offset->clock_sample[i] = get_time(CLOCK_MONOTONIC, 1);
} }
if(gethostname(hosttable[rank], OVNI_MAX_HOSTNAME) != 0) offset->wall_t1 = get_time(CLOCK_REALTIME, 0);
}
void
fill_offset(struct offset *offset, int nsamples)
{
int warmup_nsamples;
/* Identify the rank */
MPI_Comm_rank(MPI_COMM_WORLD, &offset->rank);
/* Fill the host name */
if(gethostname(offset->hostname, OVNI_MAX_HOSTNAME) != 0)
{ {
perror("gethostname"); perror("gethostname");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
void *sendbuff = (rank > 0) ? hosttable[rank] : MPI_IN_PLACE; //printf("rank=%d hostname=%s\n", offset->rank, offset->hostname);
MPI_Gather(sendbuff, sizeof(*hosttable), MPI_CHAR, /* Warm up iterations */
hosttable[0], sizeof(*hosttable), MPI_CHAR, warmup_nsamples = nsamples >= 20 ? 20 : nsamples;
0, MPI_COMM_WORLD); get_clock_samples(offset, warmup_nsamples);
MPI_Gather(t, nsamples, MPI_DOUBLE, get_clock_samples(offset, nsamples);
timetable, nsamples, MPI_DOUBLE, }
0, MPI_COMM_WORLD);
void
offset_compute_delta(struct offset *ref, struct offset *cur, int nsamples)
{
int i;
double *delta;
delta = malloc(sizeof(double) * nsamples);
if(delta == NULL)
{
perror("malloc");
exit(EXIT_FAILURE);
}
for(i=0; i<nsamples; i++)
{
delta[i] = ref->clock_sample[i] - cur->clock_sample[i];
//printf("rank=%d sample=%d delta=%f ref=%f cur=%f\n",
// cur->rank, i, delta[i], ref->clock_sample[i], cur->clock_sample[i]);
}
qsort(delta, nsamples, sizeof(double), cmp_double);
cur->delta_median = delta[nsamples / 2];
for(cur->delta_mean=0, i=0; i<nsamples; i++)
cur->delta_mean += delta[i];
cur->delta_mean /= nsamples;
for(cur->delta_var=0, i=0; i<nsamples; i++)
cur->delta_var += (delta[i] - cur->delta_mean) *
(delta[i] - cur->delta_mean);
cur->delta_var /= (double) (nsamples - 1);
cur->delta_std = sqrt(cur->delta_var);
/* The median is the selected metric for the offset */
cur->offset = (int64_t) cur->delta_median;
free(delta);
}
size_t
offset_size(int nsamples)
{
return sizeof(struct offset) + sizeof(double) * nsamples;
}
struct offset *
table_get_offset(struct offset_table *table, int i, int nsamples)
{
char *p;
p = (char *) table->_offset;
p += i * offset_size(nsamples);
return (struct offset *) p;
}
struct offset_table *
build_offset_table(int nsamples, int rank)
{
int i;
struct offset_table *table = NULL;
struct offset *offset = NULL;
void *sendbuf;
/* The rank 0 must build the table */
if(rank == 0) if(rank == 0)
get_offset(timetable, hosttable, nprocs, nsamples); {
table = malloc(sizeof(*table));
if(table == NULL)
{
perror("malloc");
exit(EXIT_FAILURE);
}
MPI_Comm_size(MPI_COMM_WORLD, &table->nprocs);
table->_offset = calloc(table->nprocs, offset_size(nsamples));
if(table->_offset == NULL)
{
perror("malloc");
exit(EXIT_FAILURE);
}
table->offset = malloc(sizeof(struct offset *) *
table->nprocs);
if(table->offset == NULL)
{
perror("malloc");
exit(EXIT_FAILURE);
}
for(i=0; i<table->nprocs; i++)
table->offset[i] = table_get_offset(table, i, nsamples);
offset = table->offset[0];
}
else
{
/* Otherwise just allocate one offset */
offset = calloc(1, offset_size(nsamples));
if(offset == NULL)
{
perror("malloc");
exit(EXIT_FAILURE);
}
table = NULL;
}
/* Each rank fills its own offset */
fill_offset(offset, nsamples);
sendbuf = rank == 0 ? MPI_IN_PLACE : offset;
/* Then collect all the offsets into the rank 0 */
MPI_Gather(sendbuf, offset_size(nsamples), MPI_CHAR,
offset, offset_size(nsamples), MPI_CHAR,
0, MPI_COMM_WORLD);
/* Finish the offsets by computing the deltas on rank 0 */
if(rank == 0)
{
for(i=0; i<table->nprocs; i++)
{
offset_compute_delta(offset, table->offset[i],
nsamples);
}
}
/* Cleanup for non-zero ranks */
if(rank != 0)
free(offset);
return table;
}
void
print_drift_header(struct offset_table *table)
{
int i;
//char buf[64];
printf("%-20s", "wallclock");
for(i=0; i<table->nprocs; i++)
{
//sprintf(buf, "rank%d", i);
printf(" %-20s", table->offset[i]->hostname);
}
printf("\n");
}
void
print_drift_row(struct offset_table *table)
{
int i;
char buf[64];
printf("%-20f", table->offset[0]->wall_t1);
for(i=0; i<table->nprocs; i++)
printf(" %-20ld", table->offset[i]->offset);
printf("\n");
}
void
print_table_detailed(struct offset_table *table)
{
int i;
struct offset *offset;
printf("%-10s %-20s %-20s %-20s %-20s\n", "rank", "hostname", "offset_median", "offset_mean", "offset_std");
for(i=0; i<table->nprocs; i++)
{
offset = table->offset[i];
printf("%-10d %-20s %-20ld %-20f %-20f\n",
i, offset->hostname, offset->offset,
offset->delta_mean, offset->delta_std);
}
}
void
do_work(struct options *options, int rank)
{
int drift_mode;
int i;
struct offset_table *table;
drift_mode = options->ndrift_samples > 1 ? 1 : 0;
for(i=0; i<options->ndrift_samples; i++)
{
table = build_offset_table(options->nsamples, rank);
if(rank == 0)
{
if(drift_mode)
{
if(i == 0)
print_drift_header(table);
print_drift_row(table);
}
else
{
print_table_detailed(table);
}
free(table->_offset);
free(table->offset);
free(table);
}
if(drift_mode)
usleep(options->drift_wait * 1000 * 1000);
}
}
int
main(int argc, char *argv[])
{
int rank;
struct options options;
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
parse_options(&options, argc, argv);
do_work(&options, rank);
free(hosttable);
free(timetable);
free(t);
MPI_Finalize(); MPI_Finalize();
return 0; return 0;
} }

40
plot-drift.py Normal file
View File

@ -0,0 +1,40 @@
import numpy as np
import matplotlib.pyplot as plt
import sys
if len(sys.argv) != 2:
print("missing drift file")
exit(1)
fn = sys.argv[1]
data = np.genfromtxt(fn, skip_header=1)
with open(fn, 'r') as f:
lines = [n for n in f.readline().strip().split(" ") if n != '']
node_names = lines[1:]
nnodes=len(node_names)
t = data[:,0]
t -= t[0]
plt.figure(figsize=(10,6))
for i in range(nnodes):
delta = data[:,i+1]
delta -= delta[0]
delta /= 1000
plt.plot(t, delta, label="rank%d (%s)" % (i, node_names[i]))
plt.title('Clock drift using %d nodes' % nnodes)
plt.xlabel('Relative wall clock time (s)')
plt.ylabel('Relative time delta (us)')
plt.legend()
plt.grid()
#plt.show()
plt.savefig("drift.png")