Update task model to use bodies

The task model is now integrated with the body model. A normal task can
only have one body, while a parallel task can have more.

It inherits the restriction that a task body cannot be nested over
another one unless it is paused (or the relaxed nest model is enabled).
This commit is contained in:
Rodrigo Arias 2023-10-23 15:24:10 +02:00 committed by Rodrigo Arias
parent a0e7fad83e
commit 27a23f25ca
5 changed files with 191 additions and 137 deletions

View File

@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased] ## [Unreleased]
### Changed
- The task model now requires the previous task body to be paused before nesting
another one. A compatibility flag is still available to still allow the old
behavior.
## [1.7.0] - 2024-03-12 ## [1.7.0] - 2024-03-12
### Added ### Added

View File

@ -9,6 +9,12 @@
#include "pv/pcf.h" #include "pv/pcf.h"
#include "utlist.h" #include "utlist.h"
uint32_t
task_get_id(struct task *task)
{
return task->id;
}
struct task * struct task *
task_find(struct task *tasks, uint32_t task_id) task_find(struct task *tasks, uint32_t task_id)
{ {
@ -28,7 +34,7 @@ task_type_find(struct task_type *types, uint32_t type_id)
} }
int int
task_create(struct task_info *info, uint32_t type_id, uint32_t task_id) task_create(struct task_info *info, uint32_t type_id, uint32_t task_id, uint32_t flags)
{ {
/* Ensure the task id is new */ /* Ensure the task id is new */
if (task_find(info->tasks, task_id) != NULL) { if (task_find(info->tasks, task_id) != NULL) {
@ -52,8 +58,7 @@ task_create(struct task_info *info, uint32_t type_id, uint32_t task_id)
task->id = task_id; task->id = task_id;
task->type = type; task->type = type;
task->state = TASK_ST_CREATED; task->flags = flags;
task->thread = NULL;
/* Add the new task to the hash table */ /* Add the new task to the hash table */
HASH_ADD_INT(info->tasks, id, task); HASH_ADD_INT(info->tasks, id, task);
@ -62,175 +67,152 @@ task_create(struct task_info *info, uint32_t type_id, uint32_t task_id)
return 0; return 0;
} }
static struct body *
create_body(struct task *task, uint32_t body_id)
{
if (!task_is_parallel(task) && task->nbodies > 0) {
err("cannot create more than one body for non-parallel task %u",
task->id);
return NULL;
}
int flags = 0;
if (task->flags & TASK_FLAG_RELAX_NESTING)
flags |= BODY_FLAG_RELAX_NESTING;
if (task->flags & TASK_FLAG_RESURRECT)
flags |= BODY_FLAG_RESURRECT;
if (task->flags & TASK_FLAG_PAUSE)
flags |= BODY_FLAG_PAUSE;
struct body *body = body_create(&task->body_info, task, body_id, flags);
if (body == NULL) {
err("body_create failed for task %u", task->id);
return NULL;
}
task->nbodies++;
return body;
}
int int
task_execute(struct task_stack *stack, struct task *task) task_execute(struct task_stack *stack, struct task *task, uint32_t body_id)
{ {
if (task == NULL) { if (task == NULL) {
err("task is NULL"); err("task is NULL");
return -1; return -1;
} }
/* FIXME: To support the taskiter we need to transition from Dead to struct body *body = body_find(&task->body_info, body_id);
* Running again. For now we allow the transition until we have a proper
* task state event. */ /* Create new body if it doesn't exist. */
if (task->state == TASK_ST_DEAD) { if (body == NULL) {
task->thread = NULL; body = create_body(task, body_id);
} else if (task->state != TASK_ST_CREATED) {
err("cannot execute task %u: state is not created or dead", task->id); if (body == NULL) {
err("create_body failed");
return -1;
}
}
if (body_execute(&stack->body_stack, body) != 0) {
err("body_execute failed for task %u", task->id);
return -1; return -1;
} }
if (task->thread != NULL) { dbg("body %u of task %u executes", body_id, task->id);
err("task already has a thread assigned");
return -1;
}
if (stack->thread->state != TH_ST_RUNNING) {
err("thread state is not running");
return -1;
}
if (stack->top == task) {
err("thread already has assigned task %u", task->id);
return -1;
}
if (stack->top && stack->top->state != TASK_ST_RUNNING) {
err("cannot execute a nested task from a non-running task");
return -1;
}
task->state = TASK_ST_RUNNING;
task->thread = stack->thread;
DL_PREPEND(stack->tasks, task);
dbg("task id=%u runs now", task->id);
return 0; return 0;
} }
int int
task_pause(struct task_stack *stack, struct task *task) task_pause(struct task_stack *stack, struct task *task, uint32_t body_id)
{ {
if (task == NULL) { if (task == NULL) {
err("cannot pause: task is NULL"); err("task is NULL");
return -1; return -1;
} }
if (task->state != TASK_ST_RUNNING) { struct body *body = body_find(&task->body_info, body_id);
err("cannot pause: task state is not running");
if (body == NULL) {
err("cannot find body with id %u in task %u",
body_id, task->id);
return -1; return -1;
} }
if (task->thread == NULL) { if (body_pause(&stack->body_stack, body) != 0) {
err("cannot pause: task has no thread assigned"); err("body_pause failed for task %u", task->id);
return -1; return -1;
} }
if (stack->thread->state != TH_ST_RUNNING) { dbg("body %u of task %u pauses", body_id, task->id);
err("cannot pause: thread state is not running");
return -1;
}
if (stack->top != task) {
err("thread has assigned a different task");
return -1;
}
if (stack->thread != task->thread) {
err("task is assigned to a different thread");
return -1;
}
task->state = TASK_ST_PAUSED;
dbg("task id=%d pauses", task->id);
return 0; return 0;
} }
int int
task_resume(struct task_stack *stack, struct task *task) task_resume(struct task_stack *stack, struct task *task, uint32_t body_id)
{ {
if (task == NULL) { if (task == NULL) {
err("cannot resume: task is NULL"); err("task is NULL");
return -1; return -1;
} }
if (task->state != TASK_ST_PAUSED) { struct body *body = body_find(&task->body_info, body_id);
err("task state is not paused");
if (body == NULL) {
err("cannot find body with id %u in task %u",
body_id, task->id);
return -1; return -1;
} }
if (task->thread == NULL) { if (body_resume(&stack->body_stack, body) != 0) {
err("cannot resume: task has no thread assigned"); err("body_resume failed for task %u", task->id);
return -1; return -1;
} }
if (stack->thread->state != TH_ST_RUNNING) { dbg("body %u of task %u resumes", body_id, task->id);
err("thread is not running");
return -1;
}
if (stack->top != task) {
err("thread has assigned a different task");
return -1;
}
if (stack->thread != task->thread) {
err("task is assigned to a different thread");
return -1;
}
task->state = TASK_ST_RUNNING;
dbg("task id=%d resumes", task->id);
return 0; return 0;
} }
int int
task_end(struct task_stack *stack, struct task *task) task_end(struct task_stack *stack, struct task *task, uint32_t body_id)
{ {
if (task == NULL) { if (task == NULL) {
err("cannot end: task is NULL"); err("task is NULL");
return -1; return -1;
} }
if (task->state != TASK_ST_RUNNING) { struct body *body = body_find(&task->body_info, body_id);
err("task state is not running");
if (body == NULL) {
err("cannot find body with id %u in task %u",
body_id, task->id);
return -1; return -1;
} }
if (task->thread == NULL) { if (body_end(&stack->body_stack, body) != 0) {
err("cannot end: task has no thread assigned"); err("body_end failed for task %u", task->id);
return -1; return -1;
} }
if (stack->thread->state != TH_ST_RUNNING) { dbg("body %u of task %u ends", body_id, task->id);
err("cannot end task: thread is not running");
return -1;
}
if (stack->top != task) {
err("thread has assigned a different task");
return -1;
}
if (stack->thread != task->thread) {
err("task is assigned to a different thread");
return -1;
}
task->state = TASK_ST_DEAD;
/* Don't unset the thread from the task, as it will be used
* later to ensure we switch to tasks of the same thread. */
DL_DELETE(stack->tasks, task);
dbg("task id=%d ends", task->id);
return 0; return 0;
} }
int
task_is_parallel(struct task *task)
{
return (task->flags & TASK_FLAG_PARALLEL);
}
uint32_t uint32_t
task_get_type_gid(const char *label) task_get_type_gid(const char *label)
{ {
@ -319,12 +301,14 @@ task_create_pcf_types(struct pcf_type *pcftype, struct task_type *types)
return ret; return ret;
} }
struct task * struct body *
task_get_running(struct task_stack *stack) task_get_running(struct task_stack *stack)
{ {
struct task *task = stack->top; return body_get_running(&stack->body_stack);
if (task && task->state == TASK_ST_RUNNING) }
return task;
struct body *
return NULL; task_get_top(struct task_stack *stack)
{
return body_get_top(&stack->body_stack);
} }

View File

@ -8,12 +8,13 @@
#include "common.h" #include "common.h"
#include "pv/pcf.h" #include "pv/pcf.h"
#include "uthash.h" #include "uthash.h"
#include "body.h"
enum task_state { enum task_flags {
TASK_ST_CREATED, TASK_FLAG_PARALLEL = (1 << 0), /* Can have multiple bodies */
TASK_ST_RUNNING, TASK_FLAG_RESURRECT = (1 << 1), /* Can run again after dead */
TASK_ST_PAUSED, TASK_FLAG_PAUSE = (1 << 2), /* Can pause the bodies */
TASK_ST_DEAD, TASK_FLAG_RELAX_NESTING = (1 << 3), /* Can nest a over a running task */
}; };
struct task_type { struct task_type {
@ -26,12 +27,11 @@ struct task_type {
struct task { struct task {
uint32_t id; uint32_t id;
struct task_type *type; struct task_type *type;
long nbodies;
struct body_info body_info;
uint32_t flags;
/* TODO: Use a pointer to task_stack instead of thread */ /* Hash map in task_info */
/* The thread that has began to execute the task. It cannot
* changed after being set, even if the task ends. */
struct thread *thread;
enum task_state state;
UT_hash_handle hh; UT_hash_handle hh;
/* List handle for nested task support */ /* List handle for nested task support */
@ -46,23 +46,24 @@ struct task_info {
}; };
struct task_stack { struct task_stack {
union { struct body_stack body_stack;
struct task *top; /* Synctactic sugar */
struct task *tasks;
};
struct thread *thread;
}; };
USE_RET uint32_t task_get_id(struct task *task);
USE_RET struct task *task_find(struct task *tasks, uint32_t task_id); USE_RET struct task *task_find(struct task *tasks, uint32_t task_id);
USE_RET int task_create(struct task_info *info, uint32_t type_id, uint32_t task_id); USE_RET int task_create(struct task_info *info, uint32_t type_id, uint32_t task_id, uint32_t flags);
USE_RET int task_execute(struct task_stack *stack, struct task *task);
USE_RET int task_pause(struct task_stack *stack, struct task *task); USE_RET int task_execute(struct task_stack *stack, struct task *task, uint32_t body_id);
USE_RET int task_resume(struct task_stack *stack, struct task *task); USE_RET int task_pause(struct task_stack *stack, struct task *task, uint32_t body_id);
USE_RET int task_end(struct task_stack *stack, struct task *task); USE_RET int task_resume(struct task_stack *stack, struct task *task, uint32_t body_id);
USE_RET int task_end(struct task_stack *stack, struct task *task, uint32_t body_id);
USE_RET struct task_type *task_type_find(struct task_type *types, uint32_t type_id); USE_RET struct task_type *task_type_find(struct task_type *types, uint32_t type_id);
USE_RET int task_type_create(struct task_info *info, uint32_t type_id, const char *label); USE_RET int task_type_create(struct task_info *info, uint32_t type_id, const char *label);
USE_RET uint32_t task_get_type_gid(const char *label); USE_RET uint32_t task_get_type_gid(const char *label);
USE_RET int task_create_pcf_types(struct pcf_type *pcftype, struct task_type *types); USE_RET int task_create_pcf_types(struct pcf_type *pcftype, struct task_type *types);
USE_RET struct task *task_get_running(struct task_stack *stack); USE_RET struct body *task_get_running(struct task_stack *stack);
USE_RET struct body *task_get_top(struct task_stack *stack);
USE_RET int task_is_parallel(struct task *task);
#endif /* TASK_H */ #endif /* TASK_H */

View File

@ -16,6 +16,7 @@ unit_test(loom.c)
unit_test(mux.c) unit_test(mux.c)
unit_test(prv.c) unit_test(prv.c)
unit_test(stream.c) unit_test(stream.c)
unit_test(task.c)
unit_test(thread.c) unit_test(thread.c)
unit_test(value.c) unit_test(value.c)
unit_test(version.c) unit_test(version.c)

62
test/unit/task.c Normal file
View File

@ -0,0 +1,62 @@
/* Copyright (c) 2023 Barcelona Supercomputing Center (BSC)
* SPDX-License-Identifier: GPL-3.0-or-later */
#include "common.h"
#include "emu/task.h"
#include "unittest.h"
static void
test_parallel(void)
{
struct task_info info;
struct task_stack stack;
struct task_stack stack2;
memset(&info, 0, sizeof(info));
memset(&stack, 0, sizeof(stack));
memset(&stack2, 0, sizeof(stack2));
uint32_t type_id = 123;
uint32_t task_id = 456;
OK(task_type_create(&info, type_id, "parallel_task"));
OK(task_create(&info, type_id, task_id, TASK_FLAG_PARALLEL));
struct task *task = task_find(info.tasks, task_id);
if (task == NULL)
err("task_find failed");
/* Attempt to run body with zero id */
ERR(task_execute(&stack, task, 0));
OK(task_execute(&stack, task, 1));
OK(task_execute(&stack2, task, 2));
/* Attempt to run a body which is already running */
ERR(task_execute(&stack2, task, 1));
/* Finish a non existant body */
ERR(task_end(&stack, task, 42));
/* Attempt to pause top without pause flag */
ERR(task_pause(&stack, task, 1));
/* Attempt to resume while running */
ERR(task_resume(&stack, task, 1));
OK(task_end(&stack, task, 1));
OK(task_end(&stack2, task, 2));
/* Try to run one body again without resurrect flag */
ERR(task_execute(&stack, task, 1));
err("ok");
}
int main(void)
{
test_parallel();
return 0;
}