Rodrigo Arias Mallo
121030537d
The loom name "loom.xyz.123" will produce the hostname "xyz", so Nanos6 programs can use the ovnisync tool too. Also, sort the loom array directly, so we avoid potential problems by sorting the loom and the hostnames individually (when there are repeated entries).
743 lines
14 KiB
C
743 lines
14 KiB
C
/*
|
|
* Copyright (c) 2021-2022 Barcelona Supercomputing Center (BSC)
|
|
*
|
|
* This program is free software: you can redistribute it and/or modify
|
|
* it under the terms of the GNU General Public License as published by
|
|
* the Free Software Foundation, either version 3 of the License, or
|
|
* (at your option) any later version.
|
|
*
|
|
* This program is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
* GNU General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU General Public License
|
|
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
*/
|
|
|
|
#include "trace.h"
|
|
|
|
#define _GNU_SOURCE
|
|
#include <unistd.h>
|
|
|
|
#include <stdio.h>
|
|
#include <stdint.h>
|
|
#include <stdlib.h>
|
|
#include <time.h>
|
|
#include <string.h>
|
|
#include <linux/limits.h>
|
|
#include <errno.h>
|
|
#include <sys/stat.h>
|
|
#include <sys/mman.h>
|
|
#include <stdatomic.h>
|
|
#include <unistd.h>
|
|
#include <sys/stat.h>
|
|
#include <fcntl.h>
|
|
#include <dirent.h>
|
|
|
|
static int
|
|
find_dir_prefix_str(const char *dirname, const char *prefix, const char **str)
|
|
{
|
|
const char *p;
|
|
|
|
p = dirname;
|
|
|
|
/* Check the prefix */
|
|
if(strncmp(p, prefix, strlen(prefix)) != 0)
|
|
return -1;
|
|
|
|
p += strlen(prefix);
|
|
|
|
/* Find the dot */
|
|
if(*p != '.')
|
|
return -1;
|
|
|
|
p++;
|
|
|
|
if(str)
|
|
*str = p;
|
|
|
|
return 0;
|
|
}
|
|
|
|
static int
|
|
find_dir_prefix_int(const char *dirname, const char *prefix, int *num)
|
|
{
|
|
const char *p;
|
|
|
|
if(find_dir_prefix_str(dirname, prefix, &p) != 0)
|
|
return -1;
|
|
|
|
/* Convert the suffix string to a number */
|
|
*num = atoi(p);
|
|
|
|
return 0;
|
|
}
|
|
|
|
static size_t
|
|
count_dir_prefix(DIR *dir, const char *prefix)
|
|
{
|
|
struct dirent *dirent;
|
|
size_t n = 0;
|
|
|
|
while((dirent = readdir(dir)) != NULL)
|
|
{
|
|
if(find_dir_prefix_str(dirent->d_name, prefix, NULL) != 0)
|
|
continue;
|
|
|
|
n++;
|
|
}
|
|
|
|
return n;
|
|
}
|
|
|
|
static int
|
|
load_thread(struct ovni_ethread *thread, struct ovni_eproc *proc, int index, int tid, char *filepath)
|
|
{
|
|
static int total_threads = 0;
|
|
|
|
thread->tid = tid;
|
|
thread->index = index;
|
|
thread->gindex = total_threads++;
|
|
thread->state = TH_ST_UNKNOWN;
|
|
thread->proc = proc;
|
|
|
|
if(strlen(filepath) >= PATH_MAX)
|
|
{
|
|
err("filepath too large: %s\n", filepath);
|
|
return -1;
|
|
}
|
|
|
|
strcpy(thread->tracefile, filepath);
|
|
|
|
return 0;
|
|
}
|
|
|
|
static void
|
|
load_proc_metadata(struct ovni_eproc *proc, int *rank_enabled)
|
|
{
|
|
JSON_Object *meta;
|
|
|
|
meta = json_value_get_object(proc->meta);
|
|
if(meta == NULL)
|
|
die("load_proc_metadata: json_value_get_object() failed\n");
|
|
|
|
JSON_Value *appid_val = json_object_get_value(meta, "app_id");
|
|
if(appid_val == NULL)
|
|
die("process %d is missing app_id in metadata\n", proc->pid);
|
|
|
|
proc->appid = (int) json_number(appid_val);
|
|
|
|
JSON_Value *rank_val = json_object_get_value(meta, "rank");
|
|
|
|
if(rank_val != NULL)
|
|
{
|
|
proc->rank = (int) json_number(rank_val);
|
|
*rank_enabled = 1;
|
|
}
|
|
else
|
|
{
|
|
proc->rank = -1;
|
|
}
|
|
}
|
|
|
|
static void
|
|
check_metadata_version(struct ovni_eproc *proc)
|
|
{
|
|
JSON_Object *meta = json_value_get_object(proc->meta);
|
|
if(meta == NULL)
|
|
die("check_metadata_version: json_value_get_object() failed\n");
|
|
|
|
JSON_Value *version_val = json_object_get_value(meta, "version");
|
|
if(version_val == NULL)
|
|
{
|
|
die("process %d is missing attribute \"version\" in metadata\n",
|
|
proc->pid);
|
|
}
|
|
|
|
int version = (int) json_number(version_val);
|
|
|
|
if(version != OVNI_METADATA_VERSION)
|
|
{
|
|
die("pid %d: metadata version mismatch %d (expected %d)\n",
|
|
proc->pid, version,
|
|
OVNI_METADATA_VERSION);
|
|
}
|
|
|
|
JSON_Value *mversion_val = json_object_get_value(meta, "model_version");
|
|
if(mversion_val == NULL)
|
|
{
|
|
die("process %d is missing attribute \"model_version\" in metadata\n",
|
|
proc->pid);
|
|
}
|
|
|
|
const char *mversion = json_string(mversion_val);
|
|
|
|
if(strcmp(mversion, OVNI_MODEL_VERSION) != 0)
|
|
{
|
|
die("pid %d: metadata model version mismatch '%s' (expected '%s')\n",
|
|
proc->pid, mversion,
|
|
OVNI_MODEL_VERSION);
|
|
|
|
}
|
|
}
|
|
|
|
static int
|
|
compare_int(const void *a, const void *b)
|
|
{
|
|
int aa = *(const int *) a;
|
|
int bb = *(const int *) b;
|
|
|
|
if(aa < bb)
|
|
return -1;
|
|
else if(aa > bb)
|
|
return +1;
|
|
else
|
|
return 0;
|
|
}
|
|
|
|
static int
|
|
load_proc(struct ovni_eproc *proc, struct ovni_loom *loom, int index, int pid, char *procdir)
|
|
{
|
|
static int total_procs = 0;
|
|
|
|
struct dirent *dirent;
|
|
DIR *dir;
|
|
char path[PATH_MAX];
|
|
struct ovni_ethread *thread;
|
|
|
|
proc->pid = pid;
|
|
proc->index = index;
|
|
proc->gindex = total_procs++;
|
|
proc->loom = loom;
|
|
|
|
if(snprintf(path, PATH_MAX, "%s/%s", procdir, "metadata.json") >=
|
|
PATH_MAX)
|
|
{
|
|
err("snprintf: path too large: %s\n", procdir);
|
|
abort();
|
|
}
|
|
|
|
proc->meta = json_parse_file_with_comments(path);
|
|
if(proc->meta == NULL)
|
|
{
|
|
err("error loading metadata from %s\n", path);
|
|
return -1;
|
|
}
|
|
|
|
check_metadata_version(proc);
|
|
|
|
/* The appid is populated from the metadata */
|
|
load_proc_metadata(proc, &loom->rank_enabled);
|
|
|
|
if((dir = opendir(procdir)) == NULL)
|
|
{
|
|
fprintf(stderr, "opendir %s failed: %s\n",
|
|
procdir, strerror(errno));
|
|
return -1;
|
|
}
|
|
|
|
proc->nthreads = count_dir_prefix(dir, "thread");
|
|
|
|
if(proc->nthreads <= 0)
|
|
{
|
|
err("cannot find any thread for process %d\n",
|
|
proc->pid);
|
|
return -1;
|
|
}
|
|
|
|
proc->thread = calloc(proc->nthreads, sizeof(struct ovni_ethread));
|
|
|
|
if(proc->thread == NULL)
|
|
{
|
|
perror("calloc failed");
|
|
return -1;
|
|
}
|
|
|
|
int *tids;
|
|
|
|
if((tids = calloc(proc->nthreads, sizeof(int))) == NULL)
|
|
{
|
|
perror("calloc failed\n");
|
|
return -1;
|
|
}
|
|
|
|
rewinddir(dir);
|
|
|
|
for(size_t i = 0; i < proc->nthreads; )
|
|
{
|
|
dirent = readdir(dir);
|
|
|
|
if(dirent == NULL)
|
|
{
|
|
err("inconsistent: readdir returned NULL\n");
|
|
return -1;
|
|
}
|
|
|
|
if(find_dir_prefix_int(dirent->d_name, "thread", &tids[i]) != 0)
|
|
continue;
|
|
|
|
i++;
|
|
}
|
|
|
|
closedir(dir);
|
|
|
|
/* Sort threads by ascending TID */
|
|
qsort(tids, proc->nthreads, sizeof(int), compare_int);
|
|
|
|
for(size_t i = 0; i < proc->nthreads; i++)
|
|
{
|
|
int tid = tids[i];
|
|
|
|
if(snprintf(path, PATH_MAX, "%s/thread.%d", procdir, tid) >=
|
|
PATH_MAX)
|
|
{
|
|
err("snprintf: path too large: %s\n", procdir);
|
|
abort();
|
|
}
|
|
|
|
thread = &proc->thread[i];
|
|
|
|
if(load_thread(thread, proc, i, tid, path) != 0)
|
|
return -1;
|
|
}
|
|
|
|
free(tids);
|
|
|
|
return 0;
|
|
}
|
|
|
|
static int
|
|
load_loom(struct ovni_loom *loom, char *loomdir)
|
|
{
|
|
int pid;
|
|
size_t i;
|
|
char path[PATH_MAX];
|
|
DIR *dir;
|
|
struct dirent *dirent;
|
|
|
|
if((dir = opendir(loomdir)) == NULL)
|
|
{
|
|
fprintf(stderr, "opendir %s failed: %s\n",
|
|
loomdir, strerror(errno));
|
|
return -1;
|
|
}
|
|
|
|
loom->rank_enabled = 0;
|
|
loom->nprocs = count_dir_prefix(dir, "proc");
|
|
|
|
if(loom->nprocs <= 0)
|
|
{
|
|
err("cannot find any process directory in loom %s\n",
|
|
loom->hostname);
|
|
return -1;
|
|
}
|
|
|
|
loom->proc = calloc(loom->nprocs, sizeof(struct ovni_eproc));
|
|
|
|
if(loom->proc == NULL)
|
|
{
|
|
perror("calloc failed");
|
|
return -1;
|
|
}
|
|
|
|
rewinddir(dir);
|
|
|
|
i = 0;
|
|
while((dirent = readdir(dir)) != NULL)
|
|
{
|
|
if(find_dir_prefix_int(dirent->d_name, "proc", &pid) != 0)
|
|
continue;
|
|
|
|
sprintf(path, "%s/%s", loomdir, dirent->d_name);
|
|
|
|
if(i >= loom->nprocs)
|
|
{
|
|
err("more process than expected\n");
|
|
abort();
|
|
}
|
|
|
|
if(load_proc(&loom->proc[i], loom, i, pid, path) != 0)
|
|
return -1;
|
|
|
|
i++;
|
|
|
|
}
|
|
|
|
if(i != loom->nprocs)
|
|
{
|
|
err("unexpected number of processes\n");
|
|
abort();
|
|
}
|
|
|
|
closedir(dir);
|
|
|
|
/* Ensure all process have the rank, if enabled in any */
|
|
if(loom->rank_enabled)
|
|
{
|
|
for(i = 0; i < loom->nprocs; i++)
|
|
{
|
|
struct ovni_eproc *proc = &loom->proc[i];
|
|
if(proc->rank < 0)
|
|
{
|
|
die("process %d is missing the rank\n",
|
|
proc->pid);
|
|
}
|
|
}
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
static int
|
|
compare_looms(const void *a, const void *b)
|
|
{
|
|
struct ovni_loom *la = (struct ovni_loom *) a;
|
|
struct ovni_loom *lb = (struct ovni_loom *) b;
|
|
return strcmp(la->dname, lb->dname);
|
|
}
|
|
|
|
static void
|
|
loom_to_host(const char *loom_name, char *host, int n)
|
|
{
|
|
int i;
|
|
|
|
for(i = 0; i < n; i++)
|
|
{
|
|
/* Copy until dot or end */
|
|
if(loom_name[i] != '.' && loom_name[i] != '\0')
|
|
host[i] = loom_name[i];
|
|
else
|
|
break;
|
|
}
|
|
|
|
if(i == n)
|
|
die("loom host name %s too long\n", loom_name);
|
|
|
|
host[i] = '\0';
|
|
}
|
|
|
|
int
|
|
ovni_load_trace(struct ovni_trace *trace, char *tracedir)
|
|
{
|
|
DIR *dir;
|
|
|
|
if((dir = opendir(tracedir)) == NULL)
|
|
{
|
|
err("opendir %s failed: %s\n", tracedir, strerror(errno));
|
|
return -1;
|
|
}
|
|
|
|
trace->nlooms = count_dir_prefix(dir, "loom");
|
|
|
|
if(trace->nlooms == 0)
|
|
{
|
|
err("cannot find any loom in %s\n", tracedir);
|
|
return -1;
|
|
}
|
|
|
|
trace->loom = calloc(trace->nlooms, sizeof(struct ovni_loom));
|
|
|
|
if(trace->loom == NULL)
|
|
{
|
|
perror("calloc failed\n");
|
|
return -1;
|
|
}
|
|
|
|
rewinddir(dir);
|
|
|
|
size_t l = 0;
|
|
struct dirent *dirent;
|
|
|
|
while((dirent = readdir(dir)) != NULL)
|
|
{
|
|
struct ovni_loom *loom = &trace->loom[l];
|
|
const char *loom_name;
|
|
if(find_dir_prefix_str(dirent->d_name, "loom", &loom_name) != 0)
|
|
{
|
|
/* Ignore other files in tracedir */
|
|
continue;
|
|
}
|
|
|
|
if(l >= trace->nlooms)
|
|
{
|
|
err("extra loom detected\n");
|
|
return -1;
|
|
}
|
|
|
|
/* Copy the complete loom directory name to looms */
|
|
if(snprintf(loom->dname, PATH_MAX, "%s", dirent->d_name) >= PATH_MAX)
|
|
{
|
|
err("error: loom name %s too long\n", dirent->d_name);
|
|
return -1;
|
|
}
|
|
|
|
l++;
|
|
}
|
|
|
|
closedir(dir);
|
|
|
|
/* Sort the looms, so we get the hostnames in alphanumeric order */
|
|
qsort(trace->loom, trace->nlooms, sizeof(struct ovni_loom),
|
|
compare_looms);
|
|
|
|
for(size_t i = 0; i < trace->nlooms; i++)
|
|
{
|
|
struct ovni_loom *loom = &trace->loom[i];
|
|
const char *name = NULL;
|
|
|
|
if(find_dir_prefix_str(loom->dname, "loom", &name) != 0)
|
|
{
|
|
err("error: mismatch for loom %s\n", loom->dname);
|
|
return -1;
|
|
}
|
|
|
|
loom_to_host(name, loom->hostname, sizeof(loom->hostname));
|
|
|
|
if(snprintf(loom->path, PATH_MAX, "%s/%s",
|
|
tracedir, loom->dname) >= PATH_MAX)
|
|
{
|
|
err("error: loom path %s/%s too long\n",
|
|
tracedir, loom->dname);
|
|
return -1;
|
|
}
|
|
|
|
if(load_loom(loom, loom->path) != 0)
|
|
return -1;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
static int
|
|
check_stream_header(struct ovni_stream *stream)
|
|
{
|
|
int ret = 0;
|
|
|
|
if(stream->size < sizeof(struct ovni_stream_header))
|
|
{
|
|
err("stream %d: incomplete stream header\n",
|
|
stream->tid);
|
|
return -1;
|
|
}
|
|
|
|
struct ovni_stream_header *h =
|
|
(struct ovni_stream_header *) stream->buf;
|
|
|
|
if(memcmp(h->magic, OVNI_STREAM_MAGIC, 4) != 0)
|
|
{
|
|
char magic[5];
|
|
memcpy(magic, h->magic, 4);
|
|
magic[4] = '\0';
|
|
err("stream %d: wrong stream magic '%s' (expected '%s')\n",
|
|
stream->tid, magic, OVNI_STREAM_MAGIC);
|
|
ret = -1;
|
|
}
|
|
|
|
if(h->version != OVNI_STREAM_VERSION)
|
|
{
|
|
err("stream %d: stream version mismatch %u (expected %u)\n",
|
|
stream->tid, h->version, OVNI_STREAM_VERSION);
|
|
ret = -1;
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
|
|
static int
|
|
load_stream_fd(struct ovni_stream *stream, int fd)
|
|
{
|
|
struct stat st;
|
|
if(fstat(fd, &st) < 0)
|
|
{
|
|
perror("fstat failed");
|
|
return -1;
|
|
}
|
|
|
|
/* Error because it doesn't have the header */
|
|
if(st.st_size == 0)
|
|
{
|
|
err("stream %d is empty\n", stream->tid);
|
|
return -1;
|
|
}
|
|
|
|
int prot = PROT_READ | PROT_WRITE;
|
|
stream->buf = mmap(NULL, st.st_size, prot, MAP_PRIVATE, fd, 0);
|
|
|
|
if(stream->buf == MAP_FAILED)
|
|
{
|
|
perror("mmap failed");
|
|
return -1;
|
|
}
|
|
|
|
stream->size = st.st_size;
|
|
|
|
return 0;
|
|
}
|
|
|
|
static int
|
|
load_stream_buf(struct ovni_stream *stream, struct ovni_ethread *thread)
|
|
{
|
|
int fd;
|
|
|
|
if((fd = open(thread->tracefile, O_RDWR)) == -1)
|
|
{
|
|
perror("open failed");
|
|
return -1;
|
|
}
|
|
|
|
if(load_stream_fd(stream, fd) != 0)
|
|
return -1;
|
|
|
|
if(check_stream_header(stream) != 0)
|
|
{
|
|
err("stream %d: bad header\n", stream->tid);
|
|
return -1;
|
|
}
|
|
|
|
stream->offset = sizeof(struct ovni_stream_header);
|
|
|
|
if(stream->offset == stream->size)
|
|
stream->active = 0;
|
|
else
|
|
stream->active = 1;
|
|
|
|
/* No need to keep the fd open */
|
|
if(close(fd))
|
|
{
|
|
perror("close failed");
|
|
return -1;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
/* Populates the streams in a single array */
|
|
int
|
|
ovni_load_streams(struct ovni_trace *trace)
|
|
{
|
|
size_t i, j, k, s;
|
|
struct ovni_loom *loom;
|
|
struct ovni_eproc *proc;
|
|
struct ovni_ethread *thread;
|
|
struct ovni_stream *stream;
|
|
|
|
trace->nstreams = 0;
|
|
|
|
for(i=0; i<trace->nlooms; i++)
|
|
{
|
|
loom = &trace->loom[i];
|
|
for(j=0; j<loom->nprocs; j++)
|
|
{
|
|
proc = &loom->proc[j];
|
|
for(k=0; k<proc->nthreads; k++)
|
|
{
|
|
trace->nstreams++;
|
|
}
|
|
}
|
|
}
|
|
|
|
trace->stream = calloc(trace->nstreams, sizeof(struct ovni_stream));
|
|
|
|
if(trace->stream == NULL)
|
|
{
|
|
perror("calloc failed");
|
|
return -1;
|
|
}
|
|
|
|
err("loaded %ld streams\n", trace->nstreams);
|
|
|
|
for(s=0, i=0; i<trace->nlooms; i++)
|
|
{
|
|
loom = &trace->loom[i];
|
|
for(j=0; j<loom->nprocs; j++)
|
|
{
|
|
proc = &loom->proc[j];
|
|
for(k=0; k<proc->nthreads; k++)
|
|
{
|
|
thread = &proc->thread[k];
|
|
stream = &trace->stream[s++];
|
|
|
|
stream->tid = thread->tid;
|
|
stream->thread = thread;
|
|
stream->proc = proc;
|
|
stream->loom = loom;
|
|
stream->lastclock = 0;
|
|
stream->offset = 0;
|
|
stream->cur_ev = NULL;
|
|
|
|
if(load_stream_buf(stream, thread) != 0)
|
|
{
|
|
err("load_stream_buf failed\n");
|
|
return -1;
|
|
}
|
|
|
|
}
|
|
}
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
void
|
|
ovni_free_streams(struct ovni_trace *trace)
|
|
{
|
|
for(size_t i = 0; i < trace->nstreams; i++)
|
|
{
|
|
struct ovni_stream *stream = &trace->stream[i];
|
|
if(munmap(stream->buf, stream->size) != 0)
|
|
die("munmap stream failed: %s\n", strerror(errno));
|
|
}
|
|
|
|
free(trace->stream);
|
|
}
|
|
|
|
void
|
|
ovni_free_trace(struct ovni_trace *trace)
|
|
{
|
|
size_t i, j;
|
|
|
|
for(i=0; i<trace->nlooms; i++)
|
|
{
|
|
for(j=0; j<trace->loom[i].nprocs; j++)
|
|
{
|
|
free(trace->loom[i].proc[j].thread);
|
|
}
|
|
|
|
free(trace->loom[i].proc);
|
|
}
|
|
|
|
free(trace->loom);
|
|
}
|
|
|
|
int
|
|
ovni_load_next_event(struct ovni_stream *stream)
|
|
{
|
|
if(stream->active == 0)
|
|
{
|
|
dbg("stream is inactive, cannot load more events\n");
|
|
return -1;
|
|
}
|
|
|
|
/* Only step the offset if we have load an event */
|
|
if(stream->cur_ev != NULL)
|
|
stream->offset += ovni_ev_size(stream->cur_ev);
|
|
|
|
/* It cannot overflow, otherwise we are reading garbage */
|
|
if(stream->offset > stream->size)
|
|
die("ovni_load_next_event: stream offset exceeds size\n");
|
|
|
|
/* We have reached the end */
|
|
if(stream->offset == stream->size)
|
|
{
|
|
stream->active = 0;
|
|
stream->cur_ev = NULL;
|
|
dbg("stream %d runs out of events\n", stream->tid);
|
|
return -1;
|
|
}
|
|
|
|
stream->cur_ev = (struct ovni_ev *) &stream->buf[stream->offset];
|
|
|
|
return 0;
|
|
}
|