StarPU runtime system (part 2)
Data handles and interfaces
A task implementation should not modify the task arguments as these changes are not propagated to the other tasks. Furthermore, the task arguments do not induce any task dependencies. They are therefore only suitable for passing static arguments to the tasks.
Data handles are much more flexible as any modification made in one task are passed to the other tasks and these changes also induce task dependencies. A data handle (starpu_data_handle_t) can encapsulate any conceivable data type. However, the built-in data interfaces for scalars, vectors and matrices are adequate for many use cases:
void starpu_variable_data_register (starpu_data_handle_t *handle,
int home_node,
uintptr_t ptr,
size_t size
)
#define STARPU_VARIABLE_GET_PTR (interface)
#define STARPU_VARIABLE_GET_ELEMSIZE (interface)
Above, home_node is the memory node where the variable is initially stored. In most cases, the variable is initially stored in the main memory (STARPU_MAIN_RAM). The argument ptr is a pointer to the variable (in the main memory) and the argument size is the size of the variable.
void starpu_vector_data_register (starpu_data_handle_t * handle,
int home_node,
uintptr_t ptr,
uint32_t nx,
size_t elemsize
)
#define STARPU_VECTOR_GET_PTR (interface)
#define STARPU_VECTOR_GET_NX (interface)
#define STARPU_VECTOR_GET_ELEMSIZE (interface)
Above, the argument nx is the length of the vector and the argument elemsize is the size of a vector element.
void starpu_matrix_data_register (starpu_data_handle_t * handle,
int home_node,
uintptr_t ptr,
uint32_t ld,
uint32_t nx,
uint32_t ny,
size_t elemsize
)
#define STARPU_MATRIX_GET_PTR (interface)
#define STARPU_MATRIX_GET_NX (interface)
#define STARPU_MATRIX_GET_NY (interface)
#define STARPU_MATRIX_GET_LD (interface)
#define STARPU_MATRIX_GET_ELEMSIZE (interface)
Above, the argument ld is the leading dimension of the matrix (row-major order), the argument xn is the width of the matrix and the argument ny is the height of the matrix.
For example, the following example allocates a matrix and initializes a data handle that encapsulates the matrix:
1double *matrix = malloc(width * ld * sizeof(double));
2starpu_data_handle_t handle;
3starpu_matrix_data_register(&handle, STARPU_MAIN_RAM,
4 (uintptr_t)matrix, ld, height, width, sizeof(double));
The above example assumes that the matrix is stored in column-major order.
Each data handle must be unregistered before the main thread can access it again:
starpu_data_unregister(handle);
This blocks the main thread until all related tasks have been executed.
The easiest way to pass a data handle to a task is to declare it in the related codelet:
1struct starpu_codelet
2{
3 uint32_t where;
4 int (*can_execute)(unsigned workerid, struct starpu_task *task, unsigned nimpl);
5 enum starpu_codelet_type type;
6 int max_parallelism;
7 starpu_cpu_func_t cpu_func STARPU_DEPRECATED;
8 starpu_cuda_func_t cuda_func STARPU_DEPRECATED;
9 starpu_opencl_func_t opencl_func STARPU_DEPRECATED;
10 starpu_cpu_func_t cpu_funcs[STARPU_MAXIMPLEMENTATIONS];
11 starpu_cuda_func_t cuda_funcs[STARPU_MAXIMPLEMENTATIONS];
12 char cuda_flags[STARPU_MAXIMPLEMENTATIONS];
13 starpu_opencl_func_t opencl_funcs[STARPU_MAXIMPLEMENTATIONS];
14 char opencl_flags[STARPU_MAXIMPLEMENTATIONS];
15 starpu_mic_func_t mic_funcs[STARPU_MAXIMPLEMENTATIONS];
16 starpu_mpi_ms_func_t mpi_ms_funcs[STARPU_MAXIMPLEMENTATIONS];
17 const char *cpu_funcs_name[STARPU_MAXIMPLEMENTATIONS];
18 int nbuffers;
19 enum starpu_data_access_mode modes[STARPU_NMAXBUFS];
20 enum starpu_data_access_mode *dyn_modes;
21 unsigned specific_nodes;
22 int nodes[STARPU_NMAXBUFS];
23 int *dyn_nodes;
24 struct starpu_perfmodel *model;
25 struct starpu_perfmodel *energy_model;
26 unsigned long per_worker_stats[STARPU_NMAXWORKERS];
27 const char *name;
28 unsigned color;
29 int flags;
30 int checked;
31};
The nbuffers field stores the total number of data handles the task accepts and the modes field tabulates an access mode for each data handle. The access mode can be one of the following:
- STARPU_NONE:
Not documented.
- STARPU_R:
Read-only mode.
- STARPU_W:
Write-only mode.
- STARPU_RW:
Read-write mode. Equivalent to STARPU_R | STARPU_W.
- STARPU_SCRATCH:
Scratch buffer (one per device).
- STARPU_REDUX:
The data handle is used in a reduction-type operation.
- STARPU_COMMUTE:
Tasks can access this variable in an arbitrary order.
- STARPU_SSEND:
The data has to be sent using a synchronous and non-blocking mode (StarPU-MPI).
- STARPU_LOCALITY:
Tells the scheduler that the data handle is sensitive to data locality.
- STARPU_ACCESS_MODE_MAX:
Not documented.
Note that this limits the number of data handles passed to a task to STARPU_NMAXBUFS. Furthermore, all tasks of a particular type must accept the same number of data handles. The number of data handles passed to a codelet can be arbitrary but this feature is not covered during this course.
For example, the following example defines a codelet that accepts a single read-write data handle:
1struct starpu_codelet codelet =
2{
3 .cpu_funcs = { func },
4 .nbuffers = 1,
5 .modes = { STARPU_RW }
6};
The data handles are passed to the starpu_task_insert function:
1starpu_task_insert(
2 &codelet,
3 STARPU_RW,
4 handle,
5 0);
Finally, the task implementation extracts a matching data interface from the implementation arguments:
1void func(void *buffers[], void *args)
2{
3 struct starpu_matrix_interface *interface =
4 (struct starpu_matrix_interface *)buffers[0];
5
6 double *ptr = (double *) STARPU_MATRIX_GET_PTR(interface);
7 int height = STARPU_MATRIX_GET_NX(interface);
8 int width = STARPU_MATRIX_GET_NY(interface);
9 int ld = STARPU_MATRIX_GET_LD(interface);
10
11 process(height, width, ld, ptr);
12}
The runtime system guarantees that data resides in the device memory when a worker thread starts executing the task. If necessary, StarPU copies the data from one memory space to another. The scalar and vector data handles have their own interfaces: starpu_variable_interface and starpu_vector_interface.
If two tasks are given the same data handle in their argument lists, then an implicit data dependency may be induced between the tasks:
Distributed memory
StarPU supports distributed memory through MPI in three different ways:
Without StarPU-MPI.
A programmer must manually transfer the data between StarPU data handles and MPI.
Not generally recommended but might be a good stopgap solution.
With StarPU-MPI.
A programmer replaces the
MPI_Recv()
andMPI_Send()
calls withstarpu_mpi_irecv_detached()
andstarpu_mpi_isend_detached()
calls. These functions act directly on the StarPU data handles.With MPI Insert Task Utility.
A programmer replaces the
starpu_task_insert()
calls withstarpu_mpi_task_insert()
calls. In addition, one must use thestarpu_mpi_data_register()
function to tell which MPI process owns each data handle.
The second and third approach allocate one CPU core for MPI communications.
In the third approach, the starpu_mpi_task_insert()
function takes into account the task dependencies and the data distribution, and generates the necessary communication pattern automatically:
In the above illustration, each MPI process has a copy of the entire task graph. Two things can happen:
If the MPI process is going to execute a task, it can receive any missing data handle from the MPI process that owns the data handle.
If the MPI process is not going to execute a task, it will send any data handles it owns to the MPI process that is going to execute the task.
This all happens automatically and asynchronously. The task implementations do not require any modifications! StarPU-MPI also implements a MPI cache that caches data handles that were not modified.
Consider the following example where each MPI process writes its rank to a data handle and then passes it to the neighbouring MPI process:
1#include <stdio.h>
2#include <starpu.h>
3#include <starpu_mpi.h>
4
5// a codelet that initializes a data handle with a MPI process' world rank
6
7void write_number_cpu(void *buffers[], void *cl_arg)
8{
9 int world_rank = starpu_mpi_world_rank();
10 int *value = (int *) STARPU_VARIABLE_GET_PTR(buffers[0]);
11
12 *value = world_rank;
13 printf("Rank %d writes value %d.\n", world_rank, *value);
14}
15
16struct starpu_codelet write_number_cl = {
17 .cpu_funcs = { write_number_cpu },
18 .nbuffers = 1,
19 .modes = { STARPU_W }
20};
21
22// a codelet that prints the contents of a data handle
23
24void read_number_cpu(void *buffers[], void *cl_arg)
25{
26 int world_rank = starpu_mpi_world_rank();
27 int value = *((int *) STARPU_VARIABLE_GET_PTR(buffers[0]));
28 printf("Rank %d reads value %d.\n", world_rank, value);
29}
30
31struct starpu_codelet read_number_cl = {
32 .cpu_funcs = { read_number_cpu },
33 .nbuffers = 1,
34 .modes = { STARPU_R }
35};
36
37int main(int argc, char **argv) {
38
39 // initialize MPI
40 int thread_support;
41 MPI_Init_thread(
42 &argc, (char ***)&argv, MPI_THREAD_MULTIPLE, &thread_support);
43
44 // initialize StarPU
45 if (starpu_init(NULL) != 0)
46 printf("Failed to initialize Starpu.\n");
47
48 // initialize StarPU-MPI
49 if (starpu_mpi_init(&argc, &argv, 0) != 0)
50 printf("Failed to initialize Starpu-MPI.\n");
51
52 // query world communicator's size
53 int world_size = starpu_mpi_world_size();
54
55 // initialize all data handles
56 starpu_data_handle_t handles[world_size];
57 for (int i = 0; i < world_size; i++) {
58
59 // register a data handle that is going to be initialized later
60 starpu_variable_data_register(
61 &handles[i], -1, (uintptr_t) NULL, sizeof(int));
62
63 // register data handle's owner and tag
64 starpu_mpi_data_register(handles[i], i, i);
65 }
66
67 // insert tasks that initialize the data handles
68 for (int i = 0; i < world_size; i++)
69 starpu_mpi_task_insert(
70 MPI_COMM_WORLD, &write_number_cl,
71 STARPU_EXECUTE_ON_DATA, handles[i], // data handles owner executes
72 STARPU_W, handles[i], 0);
73
74 // insert tasks that print the data handles
75 for (int i = 0; i < world_size; i++)
76 starpu_mpi_task_insert(
77 MPI_COMM_WORLD, &read_number_cl,
78 STARPU_EXECUTE_ON_NODE, i, // rank i executes
79 STARPU_R, handles[(i+1)%world_size], 0);
80
81 // unregister all data handles
82 for (int i = 0; i < world_size; i++)
83 starpu_data_unregister(handles[i]);
84
85 // de-initialize everything
86 starpu_mpi_shutdown();
87 starpu_shutdown();
88 MPI_Finalize();
89
90 return 0;
91}
We are going to launch four MPI processes and allocate two CPU cores for each process:
$ gcc -o my_program my_program.c -lstarpu-1.3 -lstarpumpi-1.3 -lmpi -Wall
$ STARPU_WORKERS_NOBIND=1 mpirun -n 4 --map-by :PE=2 ./my_program
Rank 1 writes value 1.
Rank 0 writes value 0.
Rank 2 writes value 2.
Rank 3 reads value 0.
Rank 3 writes value 3.
Rank 0 reads value 1.
Rank 1 reads value 2.
Rank 2 reads value 3.
Accelerators
As you may remember, a StarPU codelet included a field for CUDA implementations:
1struct starpu_codelet
2{
3 uint32_t where;
4 int (*can_execute)(unsigned workerid, struct starpu_task *task, unsigned nimpl);
5 enum starpu_codelet_type type;
6 int max_parallelism;
7 starpu_cpu_func_t cpu_func STARPU_DEPRECATED;
8 starpu_cuda_func_t cuda_func STARPU_DEPRECATED;
9 starpu_opencl_func_t opencl_func STARPU_DEPRECATED;
10 starpu_cpu_func_t cpu_funcs[STARPU_MAXIMPLEMENTATIONS];
11 starpu_cuda_func_t cuda_funcs[STARPU_MAXIMPLEMENTATIONS];
12 char cuda_flags[STARPU_MAXIMPLEMENTATIONS];
13 starpu_opencl_func_t opencl_funcs[STARPU_MAXIMPLEMENTATIONS];
14 char opencl_flags[STARPU_MAXIMPLEMENTATIONS];
15 starpu_mic_func_t mic_funcs[STARPU_MAXIMPLEMENTATIONS];
16 starpu_mpi_ms_func_t mpi_ms_funcs[STARPU_MAXIMPLEMENTATIONS];
17 const char *cpu_funcs_name[STARPU_MAXIMPLEMENTATIONS];
18 int nbuffers;
19 enum starpu_data_access_mode modes[STARPU_NMAXBUFS];
20 enum starpu_data_access_mode *dyn_modes;
21 unsigned specific_nodes;
22 int nodes[STARPU_NMAXBUFS];
23 int *dyn_nodes;
24 struct starpu_perfmodel *model;
25 struct starpu_perfmodel *energy_model;
26 unsigned long per_worker_stats[STARPU_NMAXWORKERS];
27 const char *name;
28 unsigned color;
29 int flags;
30 int checked;
31};
Unfortunately we do not have time to cover all the complexities related to GPU offloading. Instead, we will simply consider the following example:
1#include <stdio.h>
2#include <starpu.h>
3
4void hello_world_cpu(void *buffers[], void *cl_arg)
5{
6 printf("The host says, Hello world!\n");
7}
8
9__global__ void say_hello()
10{
11 printf("A device says, Hello world!\n");
12}
13
14void hello_world_cuda(void *buffers[], void *cl_arg)
15{
16 cudaStream_t stream = starpu_cuda_get_local_stream();
17 say_hello<<<1, 1 , 0, stream>>>();
18 cudaError err = cudaStreamSynchronize(stream);
19 if (err != cudaSuccess)
20 STARPU_CUDA_REPORT_ERROR(err);
21}
22
23struct starpu_codelet hello_world_cl = {
24 .cpu_funcs = { hello_world_cpu },
25 .cuda_funcs = { hello_world_cuda }
26};
27
28int main()
29{
30 if (starpu_init(NULL) != 0)
31 printf("Failed to initialize Starpu.\n");
32
33 starpu_task_insert(&hello_world_cl, 0);
34
35 starpu_task_wait_for_all();
36 starpu_shutdown();
37
38 return 0;
39}
That is, we must add a task implementation (hello_world_cuda
) that inserts a CUDA kernel (say_hello
) to the provided local CUDA stream (stream
).
Note how the naive eager
scheduler prefers to use the CPU implementation where as the GPU-aware dm
scheduler prefers the GPU:
$ nvcc -o my_program my_program.cu -lstarpu-1.3 -Xcompiler="-Wall"
$ STARPU_SCHED=eager ./my_program
The host says, Hello world!
$ STARPU_SCHED=dm ./my_program
A device says, Hello world!
If we want to obtain a reasonable performance using GPUs, we must define a performance model for each codelet:
1struct starpu_codelet
2{
3 uint32_t where;
4 int (*can_execute)(unsigned workerid, struct starpu_task *task, unsigned nimpl);
5 enum starpu_codelet_type type;
6 int max_parallelism;
7 starpu_cpu_func_t cpu_func STARPU_DEPRECATED;
8 starpu_cuda_func_t cuda_func STARPU_DEPRECATED;
9 starpu_opencl_func_t opencl_func STARPU_DEPRECATED;
10 starpu_cpu_func_t cpu_funcs[STARPU_MAXIMPLEMENTATIONS];
11 starpu_cuda_func_t cuda_funcs[STARPU_MAXIMPLEMENTATIONS];
12 char cuda_flags[STARPU_MAXIMPLEMENTATIONS];
13 starpu_opencl_func_t opencl_funcs[STARPU_MAXIMPLEMENTATIONS];
14 char opencl_flags[STARPU_MAXIMPLEMENTATIONS];
15 starpu_mic_func_t mic_funcs[STARPU_MAXIMPLEMENTATIONS];
16 starpu_mpi_ms_func_t mpi_ms_funcs[STARPU_MAXIMPLEMENTATIONS];
17 const char *cpu_funcs_name[STARPU_MAXIMPLEMENTATIONS];
18 int nbuffers;
19 enum starpu_data_access_mode modes[STARPU_NMAXBUFS];
20 enum starpu_data_access_mode *dyn_modes;
21 unsigned specific_nodes;
22 int nodes[STARPU_NMAXBUFS];
23 int *dyn_nodes;
24 struct starpu_perfmodel *model;
25 struct starpu_perfmodel *energy_model;
26 unsigned long per_worker_stats[STARPU_NMAXWORKERS];
27 const char *name;
28 unsigned color;
29 int flags;
30 int checked;
31};
In the simplest case, the model can use the run time history to predict the execution times:
1struct starpu_perfmodel model = {
2 .type = STARPU_HISTORY_BASED
3};
StarPU also supports regression based performance models (STARPU_REGRESSION_BASED
, STARPU_NL_REGRESSION_BASED
, STARPU_MULTIPLE_REGRESSION_BASED
):
1struct starpu_perfmodel model = {
2 .type = STARPU_REGRESSION_BASED
3};
By default, StarPU calculates the model argument (base) from the amount of memory required to store all involved data handles. However, a programmer may provide a custom function for this.