Example: LU factorization
Scalar algorithm
The algorithm we are considering is called LU factorization (or LU decomposition) without pivoting. Let \(A\) be a square matrix. An LU factorization refers to the factorization of \(A\) into a lower triangular matrix \(L\) and an upper triangular matrix \(U\):
For example, consider the following example from Wikipedia:
If we want to solve a system of linear equations
for \(x\) and have already computed the LU factorization
then we can solve \(A x = b\) as follows:
Solve the equation \(L y = b\) for \(y\).
Solve the equation \(U x = y\) for \(x\).
The factorization is typically normalized such that the matrix \(L\) is unit lower triangular, i.e., the diagonal entries are all ones. This allows us to store the factor matrices on top of the original matrix:
In case of the earlier example, we get
Scalar implementation
The scalar sequential version of the algorithm is rather simple:
1void simple_lu(int n, int ld, double *A)
2{
3 for (int i = 0; i < n; i++) {
4 for (int j = i+1; j < n; j++) {
5 A[i*ld+j] /= A[i*ld+i];
6
7 for (int k = i+1; k < n; k++)
8 A[k*ld+j] -= A[i*ld+j] * A[k*ld+i];
9 }
10 }
11}
The LU factorization of the \(n \times n\) matrix \(A\) (column-major order with leading dimension ld
) is stored on top of the original matrix \(A\).
Test program
We can validate the result with the following test program:
1#include <stdio.h>
2#include <stdlib.h>
3#include <time.h>
4#include <omp.h>
5
6extern void dtrmm_(char const *, char const *, char const *, char const *,
7 int const *, int const *, double const *, double const *, int const *,
8 double *, int const *);
9
10extern double dlange_(char const *, int const *, int const *, double const *,
11 int const *, double *);
12
13void simple_lu(int n, int ld, double *A)
14{
15 for (int i = 0; i < n; i++) {
16 for (int j = i+1; j < n; j++) {
17 A[i*ld+j] /= A[i*ld+i];
18
19 for (int k = i+1; k < n; k++)
20 A[k*ld+j] -= A[i*ld+j] * A[k*ld+i];
21 }
22 }
23}
24
25// returns the ceil of a / b
26int DIVCEIL(int a, int b)
27{
28 return (a+b-1)/b;
29}
30
31// computes C <- L * U
32void mul_lu(int n, int lda, int ldb, double const *A, double *B)
33{
34 double one = 1.0;
35
36 // B <- U(A) = U
37 for (int i = 0; i < n; i++) {
38 for (int j = 0; j < i+1; j++)
39 B[i*ldb+j] = A[i*lda+j];
40 for (int j = i+1; j < n; j++)
41 B[i*ldb+j] = 0.0;
42 }
43
44 // B <- L1(A) * B = L * U
45 dtrmm_("Left", "Lower", "No Transpose", "Unit triangular",
46 &n, &n, &one, A, &lda, B, &ldb);
47}
48
49int main(int argc, char **argv)
50{
51 //
52 // check arguments
53 //
54
55 if (argc != 2) {
56 fprintf(stderr,
57 "[error] Incorrect arguments. Use %s (n)\n", argv[0]);
58 return EXIT_FAILURE;
59 }
60
61 int n = atoi(argv[1]);
62 if (n < 1) {
63 fprintf(stderr, "[error] Invalid matrix dimension.\n");
64 return EXIT_FAILURE;
65 }
66
67 //
68 // Initialize matrix A and store a duplicate to matrix B. Matrix C is for
69 // validation.
70 //
71
72 srand(time(NULL));
73
74 int ldA, ldB, ldC;
75 ldA = ldB = ldC = DIVCEIL(n, 8)*8; // align to 64 bytes
76 double *A = (double *) aligned_alloc(8, n*ldA*sizeof(double));
77 double *B = (double *) aligned_alloc(8, n*ldB*sizeof(double));
78 double *C = (double *) aligned_alloc(8, n*ldC*sizeof(double));
79
80 if (A == NULL || B == NULL || C == NULL) {
81 fprintf(stderr, "[error] Failed to allocate memory.\n");
82 return EXIT_FAILURE;
83 }
84
85 // A <- random diagonally dominant matrix
86 for (int i = 0; i < n; i++) {
87 for (int j = 0; j < n; j++)
88 A[i*ldA+j] = B[i*ldB+j] = 2.0*rand()/RAND_MAX - 1.0;
89 A[i*ldA+i] = B[i*ldB+i] = 1.0*rand()/RAND_MAX + n;
90 }
91
92 //
93 // compute
94 //
95
96 struct timespec ts_start;
97 clock_gettime(CLOCK_MONOTONIC, &ts_start);
98
99 // A <- (L,U)
100 simple_lu(n, ldA, A);
101
102 struct timespec ts_stop;
103 clock_gettime(CLOCK_MONOTONIC, &ts_stop);
104
105 printf("Time = %f s\n",
106 ts_stop.tv_sec - ts_start.tv_sec +
107 1.0E-9*(ts_stop.tv_nsec - ts_start.tv_nsec));
108
109 // C <- L * U
110 mul_lu(n, ldA, ldC, A, C);
111
112 //
113 // validate
114 //
115
116 // C <- L * U - B
117 for (int i = 0; i < n; i++)
118 for (int j = 0; j < n; j++)
119 C[i*ldC+j] -= B[i*ldB+j];
120
121 // compute || C ||_F / || B ||_F = || L * U - B ||_F / || B ||_F
122 double residual = dlange_("Frobenius", &n, &n, C, &ldC, NULL) /
123 dlange_("Frobenius", &n, &n, B, &ldB, NULL);
124
125 printf("Residual = %E\n", residual);
126
127 int ret = EXIT_SUCCESS;
128 if (1.0E-12 < residual) {
129 fprintf(stderr, "The residual is too large.\n");
130 ret = EXIT_FAILURE;
131 }
132
133 //
134 // cleanup
135 //
136
137 free(A);
138 free(B);
139 free(C);
140
141 return ret;
142}
We can compile and test the algorithm:
$ gcc -o scalar scalar.c -Wall ${LIBLAPACK} ${LIBBLAS}
$ OMP_NUM_THREADS=1 ./scalar 3000
Time = 120.412646 s
Residual = 1.780586E-15
Above, ${LIBLAPACK}
and ${LIBBLAS}
are the LAPACK and BLAS libraries, respectively.
Challenge
Coarsely-blocked algorithm
The above scalar algorithm cannot be parallelized efficiently because we cannot collapse the loops and the innermost loop is too short for effective parallelization.
Coarsely-blocked implementation
We can solve some of the parallelization problems by writing the algorithm in a blocked form:
1void blocked_lu(int block_size, int n, int ldA, double *A)
2{
3 // allocate and fill an array that stores the block pointers
4 int block_count = DIVCEIL(n, block_size);
5 double ***blocks = (double ***) malloc(block_count*sizeof(double**));
6 for (int i = 0; i < block_count; i++) {
7 blocks[i] = (double **) malloc(block_count*sizeof(double*));
8
9 for (int j = 0; j < block_count; j++)
10 blocks[i][j] = A+(j*ldA+i)*block_size;
11 }
12
13 // iterate through the diagonal blocks
14 for (int i = 0; i < block_count; i++) {
15
16 // calculate diagonal block size
17 int dsize = min(block_size, n-i*block_size);
18
19 // calculate trailing matrix size
20 int tsize = n-(i+1)*block_size;
21
22 // compute the LU decomposition of the diagonal block
23 simple_lu(dsize, ldA, blocks[i][i]);
24
25 if (0 < tsize) {
26
27 // blocks[i][i+1:] <- L1(blocks[i][i]) \ blocks[i][i+1:]
28 dtrsm_("Left", "Lower", "No transpose", "Unit triangular",
29 &dsize, &tsize, &one, blocks[i][i], &ldA, blocks[i][i+1], &ldA);
30
31 // blocks[i+1:][i] <- U(blocks[i][i]) / blocks[i+1:][i]
32 dtrsm_("Right", "Upper", "No Transpose", "Not unit triangular",
33 &tsize, &dsize, &one, blocks[i][i], &ldA, blocks[i+1][i], &ldA);
34
35 // blocks[i+1:][i+1:] <- blocks[i+1:][i+1:] -
36 // blocks[i+1:][i] * blocks[i][i+1:]
37 dgemm_("No Transpose", "No Transpose",
38 &tsize, &tsize, &dsize, &minus_one, blocks[i+1][i],
39 &ldA, blocks[i][i+1], &ldA, &one, blocks[i+1][i+1], &ldA);
40 }
41 }
42
43 // free allocated resources
44 for (int i = 0; i < block_count; i++)
45 free(blocks[i]);
46 free(blocks);
47}
We have divided the matrix \(A\) into square blocks (possibly excluding the last block row and column).
The two-dimensional array blocks
contains the addresses of the upper-left corners of the blocks.
That is, blocks[i-1][j-1]
is the block on the i
’th row and j
’th column of the matrix.
We then loop over the diagonal blocks as follows:
We factorize the diagonal blocks using the scalar algorithm (
simple_lu
).The sub-factor matrices are used to update the block row and the block column by solving two matrix equations (
dtrsm_
).The trailing matrix is updated by computing a matrix-matrix multiplication (
dgemm_
).
The exact details are not that relevant as we are only interested in the data dependencies.
Test program
1#include <stdio.h>
2#include <stdlib.h>
3#include <time.h>
4#include <omp.h>
5
6extern double dnrm2_(int const *, double const *, int const *);
7
8extern void dtrmm_(char const *, char const *, char const *, char const *,
9 int const *, int const *, double const *, double const *, int const *,
10 double *, int const *);
11
12extern void dlacpy_(char const *, int const *, int const *, double const *,
13 int const *, double *, int const *);
14
15extern double dlange_(char const *, int const *, int const *, double const *,
16 int const *, double *);
17
18extern void dtrsm_(char const *, char const *, char const *, char const *,
19 int const *, int const *, double const *, double const *, int const *,
20 double *, int const *);
21
22extern void dgemm_(char const *, char const *, int const *, int const *,
23 int const *, double const *, double const *, int const *, double const *,
24 int const *, double const *, double *, int const *);
25
26double one = 1.0;
27double minus_one = -1.0;
28
29// returns the ceil of a / b
30int DIVCEIL(int a, int b)
31{
32 return (a+b-1)/b;
33}
34
35// returns the minimum of a and b
36int MIN(int a, int b)
37{
38 return a < b ? a : b;
39}
40
41// returns the maxinum of a and b
42int MAX(int a, int b)
43{
44 return a > b ? a : b;
45}
46
47void simple_lu(int n, int ldA, double *A)
48{
49 for (int i = 0; i < n; i++) {
50 for (int j = i+1; j < n; j++) {
51 A[i*ldA+j] /= A[i*ldA+i];
52
53 for (int k = i+1; k < n; k++)
54 A[k*ldA+j] -= A[i*ldA+j] * A[k*ldA+i];
55 }
56 }
57}
58
59////////////////////////////////////////////////////////////////////////////
60////////////////////////////////////////////////////////////////////////////
61////////////////////////////////////////////////////////////////////////////
62
63void blocked_lu(int block_size, int n, int ldA, double *A)
64{
65 // allocate and fill an array that stores the block pointers
66 int block_count = DIVCEIL(n, block_size);
67 double ***blocks = (double ***) malloc(block_count*sizeof(double**));
68 for (int i = 0; i < block_count; i++) {
69 blocks[i] = (double **) malloc(block_count*sizeof(double*));
70
71 for (int j = 0; j < block_count; j++)
72 blocks[i][j] = A+(j*ldA+i)*block_size;
73 }
74
75 // iterate through the diagonal blocks
76 for (int i = 0; i < block_count; i++) {
77
78 // calculate diagonal block size
79 int dsize = MIN(block_size, n-i*block_size);
80
81 // calculate trailing matrix size
82 int tsize = n-(i+1)*block_size;
83
84 // compute the LU decomposition of the diagonal block
85 simple_lu(dsize, ldA, blocks[i][i]);
86
87 if (0 < tsize) {
88
89 // blocks[i][i+1:] <- L1(blocks[i][i]) \ blocks[i][i+1:]
90 dtrsm_("Left", "Lower", "No transpose", "Unit triangular",
91 &dsize, &tsize, &one, blocks[i][i], &ldA, blocks[i][i+1], &ldA);
92
93 // blocks[i+1:][i] <- U(blocks[i][i]) / blocks[i+1:][i]
94 dtrsm_("Right", "Upper", "No Transpose", "Not unit triangular",
95 &tsize, &dsize, &one, blocks[i][i], &ldA, blocks[i+1][i], &ldA);
96
97 // blocks[i+1:][i+1:] <- blocks[i+1:][i+1:] -
98 // blocks[i+1:][i] * blocks[i][i+1:]
99 dgemm_("No Transpose", "No Transpose",
100 &tsize, &tsize, &dsize, &minus_one, blocks[i+1][i],
101 &ldA, blocks[i][i+1], &ldA, &one, blocks[i+1][i+1], &ldA);
102 }
103 }
104
105 // free allocated resources
106 for (int i = 0; i < block_count; i++)
107 free(blocks[i]);
108 free(blocks);
109}
110
111////////////////////////////////////////////////////////////////////////////
112////////////////////////////////////////////////////////////////////////////
113////////////////////////////////////////////////////////////////////////////
114
115// computes C <- L * U
116void mul_lu(int n, int lda, int ldb, double const *A, double *B)
117{
118 // B <- U(A) = U
119 for (int i = 0; i < n; i++) {
120 for (int j = 0; j < i+1; j++)
121 B[i*ldb+j] = A[i*lda+j];
122 for (int j = i+1; j < n; j++)
123 B[i*ldb+j] = 0.0;
124 }
125
126 // B <- L1(A) * B = L * U
127 dtrmm_("Left", "Lower", "No Transpose", "Unit triangular",
128 &n, &n, &one, A, &lda, B, &ldb);
129}
130
131int main(int argc, char **argv)
132{
133 //
134 // check arguments
135 //
136
137 if (argc != 3) {
138 fprintf(stderr,
139 "[error] Incorrect arguments. Use %s (n) (block size)\n", argv[0]);
140 return EXIT_FAILURE;
141 }
142
143 int n = atoi(argv[1]);
144 if (n < 1) {
145 fprintf(stderr, "[error] Invalid matrix dimension.\n");
146 return EXIT_FAILURE;
147 }
148
149 int block_size = atoi(argv[2]);
150 if (block_size < 2) {
151 fprintf(stderr, "[error] Invalid block size.\n");
152 return EXIT_FAILURE;
153 }
154
155 //
156 // Initialize matrix A and store a duplicate to matrix B. Matrix C is for
157 // validation.
158 //
159
160 srand(time(NULL));
161
162 int ldA, ldB, ldC;
163 ldA = ldB = ldC = DIVCEIL(n, 8)*8; // align to 64 bytes
164 double *A = (double *) aligned_alloc(8, n*ldA*sizeof(double));
165 double *B = (double *) aligned_alloc(8, n*ldB*sizeof(double));
166 double *C = (double *) aligned_alloc(8, n*ldC*sizeof(double));
167
168 if (A == NULL || B == NULL || C == NULL) {
169 fprintf(stderr, "[error] Failed to allocate memory.\n");
170 return EXIT_FAILURE;
171 }
172
173 // A <- random diagonally dominant matrix
174 for (int i = 0; i < n; i++) {
175 for (int j = 0; j < n; j++)
176 A[i*ldA+j] = B[i*ldB+j] = 2.0*rand()/RAND_MAX - 1.0;
177 A[i*ldA+i] = B[i*ldB+i] = 1.0*rand()/RAND_MAX + n;
178 }
179
180 //
181 // compute
182 //
183
184 struct timespec ts_start;
185 clock_gettime(CLOCK_MONOTONIC, &ts_start);
186
187 // A <- (L,U)
188 blocked_lu(block_size, n, ldA, A);
189
190 struct timespec ts_stop;
191 clock_gettime(CLOCK_MONOTONIC, &ts_stop);
192
193 printf("Time = %f s\n",
194 ts_stop.tv_sec - ts_start.tv_sec +
195 1.0E-9*(ts_stop.tv_nsec - ts_start.tv_nsec));
196
197 // C <- L * U
198 mul_lu(n, ldA, ldC, A, C);
199
200 //
201 // validate
202 //
203
204 // C <- L * U - B
205 for (int i = 0; i < n; i++)
206 for (int j = 0; j < n; j++)
207 C[i*ldC+j] -= B[i*ldB+j];
208
209 // compute || C ||_F / || B ||_F = || L * U - B ||_F / || B ||_F
210 double residual = dlange_("Frobenius", &n, &n, C, &ldC, NULL) /
211 dlange_("Frobenius", &n, &n, B, &ldB, NULL);
212
213 printf("Residual = %E\n", residual);
214
215 int ret = EXIT_SUCCESS;
216 if (1.0E-12 < residual) {
217 fprintf(stderr, "The residual is too large.\n");
218 ret = EXIT_FAILURE;
219 }
220
221 //
222 // cleanup
223 //
224
225 free(A);
226 free(B);
227 free(C);
228
229 return ret;
230}
We can compile and test the algorithm:
$ gcc -o coarse-blocked coarse-blocked.c -Wall ${LIBLAPACK} ${LIBBLAS}
$ OMP_NUM_THREADS=1 ./coarse-blocked 3000 128
Time = 0.480420 s
Residual = 3.917427E-16
The second argument is the block size. We can see that the blocked variant is significantly faster even before it is parallelized.
Challenge
Finely-blocked algorithm
If we want to reach a reasonable level of parallelism, we must make the task granularity finer:
Finely-blocked implementation
1void blocked_lu(int block_size, int n, int ldA, double *A)
2{
3 // allocate and fill an array that stores the block pointers
4 int block_count = DIVCEIL(n, block_size);
5 double ***blocks = (double ***) malloc(block_count*sizeof(double**));
6 for (int i = 0; i < block_count; i++) {
7 blocks[i] = (double **) malloc(block_count*sizeof(double*));
8
9 for (int j = 0; j < block_count; j++)
10 blocks[i][j] = A+(j*ldA+i)*block_size;
11 }
12
13 // iterate through the diagonal blocks
14 for (int i = 0; i < block_count; i++) {
15
16 // calculate diagonal block size
17 int dsize = MIN(block_size, n-i*block_size);
18
19 // process the current diagonal block
20 //
21 // +--+--+--+--+
22 // | | | | |
23 // +--+--+--+--+ ## - process (read-write)
24 // | |##| | |
25 // +--+--+--+--+
26 // | | | | |
27 // +--+--+--+--+
28 // | | | | |
29 // +--+--+--+--+
30 //
31 simple_lu(dsize, ldA, blocks[i][i]);
32
33 // process the blocks to the right of the current diagonal block
34 for (int j = i+1; j < block_count; j++) {
35 int width = MIN(block_size, n-j*block_size);
36
37 // blocks[i][j] <- L1(blocks[i][i]) \ blocks[i][j]
38 //
39 // +--+--+--+--+
40 // | | | | |
41 // +--+--+--+--+ ## - process (read-write), current iteration
42 // | |rr|##|xx| xx - process (read-write)
43 // +--+--+--+--+ rr - read
44 // | | | | |
45 // +--+--+--+--+
46 // | | | | |
47 // +--+--+--+--+
48 //
49 dtrsm_("Left", "Lower", "No transpose", "Unit triangular",
50 &dsize, &width, &one, blocks[i][i], &ldA, blocks[i][j], &ldA);
51 }
52
53 // process the blocks below the current diagonal block
54 for (int j = i+1; j < block_count; j++) {
55 int height = MIN(block_size, n-j*block_size);
56
57 // blocks[j][i] <- U(blocks[i][i]) / blocks[j][i]
58 //
59 // +--+--+--+--+
60 // | | | | |
61 // +--+--+--+--+ ## - process (read-write), current iteration
62 // | |rr| | | xx - process (read-write)
63 // +--+--+--+--+ rr - read
64 // | |##| | |
65 // +--+--+--+--+
66 // | |xx| | |
67 // +--+--+--+--+
68 //
69 dtrsm_("Right", "Upper", "No Transpose", "Not unit triangular",
70 &height, &dsize, &one, blocks[i][i], &ldA, blocks[j][i], &ldA);
71 }
72
73 // process the trailing matrix
74
75 for (int ii = i+1; ii < block_count; ii++) {
76 for (int jj = i+1; jj < block_count; jj++) {
77 int width = MIN(block_size, n-jj*block_size);
78 int height = MIN(block_size, n-ii*block_size);
79
80 // blocks[ii][jj] <-
81 // blocks[ii][jj] - blocks[ii][i] * blocks[i][jj]
82 //
83 // +--+--+--+--+
84 // | | | | |
85 // +--+--+--+--+ ## - process (read-write), current iteration
86 // | | |rr|..| xx - process (read-write)
87 // +--+--+--+--+ rr - read, current iteration
88 // | |rr|##|xx| .. - read
89 // +--+--+--+--+
90 // | |..|xx|xx|
91 // +--+--+--+--+
92 //
93 dgemm_("No Transpose", "No Transpose",
94 &height, &width, &dsize, &minus_one, blocks[ii][i],
95 &ldA, blocks[i][jj], &ldA, &one, blocks[ii][jj], &ldA);
96 }
97 }
98
99 }
100
101 // free allocated resources
102 for (int i = 0; i < block_count; i++)
103 free(blocks[i]);
104 free(blocks);
105}
In particular, the above code divides the trailing matrix update into numerous sub-tasks.
Test program
1#include <stdio.h>
2#include <stdlib.h>
3#include <time.h>
4#include <omp.h>
5
6extern double dnrm2_(int const *, double const *, int const *);
7
8extern void dtrmm_(char const *, char const *, char const *, char const *,
9 int const *, int const *, double const *, double const *, int const *,
10 double *, int const *);
11
12extern void dlacpy_(char const *, int const *, int const *, double const *,
13 int const *, double *, int const *);
14
15extern double dlange_(char const *, int const *, int const *, double const *,
16 int const *, double *);
17
18extern void dtrsm_(char const *, char const *, char const *, char const *,
19 int const *, int const *, double const *, double const *, int const *,
20 double *, int const *);
21
22extern void dgemm_(char const *, char const *, int const *, int const *,
23 int const *, double const *, double const *, int const *, double const *,
24 int const *, double const *, double *, int const *);
25
26double one = 1.0;
27double minus_one = -1.0;
28
29// returns the ceil of a / b
30int DIVCEIL(int a, int b)
31{
32 return (a+b-1)/b;
33}
34
35// returns the minimum of a and b
36int MIN(int a, int b)
37{
38 return a < b ? a : b;
39}
40
41// returns the maxinum of a and b
42int MAX(int a, int b)
43{
44 return a > b ? a : b;
45}
46
47void simple_lu(int n, int ldA, double *A)
48{
49 for (int i = 0; i < n; i++) {
50 for (int j = i+1; j < n; j++) {
51 A[i*ldA+j] /= A[i*ldA+i];
52
53 for (int k = i+1; k < n; k++)
54 A[k*ldA+j] -= A[i*ldA+j] * A[k*ldA+i];
55 }
56 }
57}
58
59////////////////////////////////////////////////////////////////////////////
60////////////////////////////////////////////////////////////////////////////
61////////////////////////////////////////////////////////////////////////////
62
63void blocked_lu(int block_size, int n, int ldA, double *A)
64{
65 // allocate and fill an array that stores the block pointers
66 int block_count = DIVCEIL(n, block_size);
67 double ***blocks = (double ***) malloc(block_count*sizeof(double**));
68 for (int i = 0; i < block_count; i++) {
69 blocks[i] = (double **) malloc(block_count*sizeof(double*));
70
71 for (int j = 0; j < block_count; j++)
72 blocks[i][j] = A+(j*ldA+i)*block_size;
73 }
74
75 // iterate through the diagonal blocks
76 for (int i = 0; i < block_count; i++) {
77
78 // calculate diagonal block size
79 int dsize = MIN(block_size, n-i*block_size);
80
81 // process the current diagonal block
82 //
83 // +--+--+--+--+
84 // | | | | |
85 // +--+--+--+--+ ## - process (read-write)
86 // | |##| | |
87 // +--+--+--+--+
88 // | | | | |
89 // +--+--+--+--+
90 // | | | | |
91 // +--+--+--+--+
92 //
93 simple_lu(dsize, ldA, blocks[i][i]);
94
95 // process the blocks to the right of the current diagonal block
96 for (int j = i+1; j < block_count; j++) {
97 int width = MIN(block_size, n-j*block_size);
98
99 // blocks[i][j] <- L1(blocks[i][i]) \ blocks[i][j]
100 //
101 // +--+--+--+--+
102 // | | | | |
103 // +--+--+--+--+ ## - process (read-write), current iteration
104 // | |rr|##|xx| xx - process (read-write)
105 // +--+--+--+--+ rr - read
106 // | | | | |
107 // +--+--+--+--+
108 // | | | | |
109 // +--+--+--+--+
110 //
111 dtrsm_("Left", "Lower", "No transpose", "Unit triangular",
112 &dsize, &width, &one, blocks[i][i], &ldA, blocks[i][j], &ldA);
113 }
114
115 // process the blocks below the current diagonal block
116 for (int j = i+1; j < block_count; j++) {
117 int height = MIN(block_size, n-j*block_size);
118
119 // blocks[j][i] <- U(blocks[i][i]) / blocks[j][i]
120 //
121 // +--+--+--+--+
122 // | | | | |
123 // +--+--+--+--+ ## - process (read-write), current iteration
124 // | |rr| | | xx - process (read-write)
125 // +--+--+--+--+ rr - read
126 // | |##| | |
127 // +--+--+--+--+
128 // | |xx| | |
129 // +--+--+--+--+
130 //
131 dtrsm_("Right", "Upper", "No Transpose", "Not unit triangular",
132 &height, &dsize, &one, blocks[i][i], &ldA, blocks[j][i], &ldA);
133 }
134
135 // process the trailing matrix
136
137 for (int ii = i+1; ii < block_count; ii++) {
138 for (int jj = i+1; jj < block_count; jj++) {
139 int width = MIN(block_size, n-jj*block_size);
140 int height = MIN(block_size, n-ii*block_size);
141
142 // blocks[ii][jj] <-
143 // blocks[ii][jj] - blocks[ii][i] * blocks[i][jj]
144 //
145 // +--+--+--+--+
146 // | | | | |
147 // +--+--+--+--+ ## - process (read-write), current iteration
148 // | | |rr|..| xx - process (read-write)
149 // +--+--+--+--+ rr - read, current iteration
150 // | |rr|##|xx| .. - read
151 // +--+--+--+--+
152 // | |..|xx|xx|
153 // +--+--+--+--+
154 //
155 dgemm_("No Transpose", "No Transpose",
156 &height, &width, &dsize, &minus_one, blocks[ii][i],
157 &ldA, blocks[i][jj], &ldA, &one, blocks[ii][jj], &ldA);
158 }
159 }
160
161 }
162
163 // free allocated resources
164 for (int i = 0; i < block_count; i++)
165 free(blocks[i]);
166 free(blocks);
167}
168
169////////////////////////////////////////////////////////////////////////////
170////////////////////////////////////////////////////////////////////////////
171////////////////////////////////////////////////////////////////////////////
172
173// computes C <- L * U
174void mul_lu(int n, int lda, int ldb, double const *A, double *B)
175{
176 // B <- U(A) = U
177 for (int i = 0; i < n; i++) {
178 for (int j = 0; j < i+1; j++)
179 B[i*ldb+j] = A[i*lda+j];
180 for (int j = i+1; j < n; j++)
181 B[i*ldb+j] = 0.0;
182 }
183
184 // B <- L1(A) * B = L * U
185 dtrmm_("Left", "Lower", "No Transpose", "Unit triangular",
186 &n, &n, &one, A, &lda, B, &ldb);
187}
188
189int main(int argc, char **argv)
190{
191 //
192 // check arguments
193 //
194
195 if (argc != 3) {
196 fprintf(stderr,
197 "[error] Incorrect arguments. Use %s (n) (block size)\n", argv[0]);
198 return EXIT_FAILURE;
199 }
200
201 int n = atoi(argv[1]);
202 if (n < 1) {
203 fprintf(stderr, "[error] Invalid matrix dimension.\n");
204 return EXIT_FAILURE;
205 }
206
207 int block_size = atoi(argv[2]);
208 if (block_size < 2) {
209 fprintf(stderr, "[error] Invalid block size.\n");
210 return EXIT_FAILURE;
211 }
212
213 //
214 // Initialize matrix A and store a duplicate to matrix B. Matrix C is for
215 // validation.
216 //
217
218 srand(time(NULL));
219
220 int ldA, ldB, ldC;
221 ldA = ldB = ldC = DIVCEIL(n, 8)*8; // align to 64 bytes
222 double *A = (double *) aligned_alloc(8, n*ldA*sizeof(double));
223 double *B = (double *) aligned_alloc(8, n*ldB*sizeof(double));
224 double *C = (double *) aligned_alloc(8, n*ldC*sizeof(double));
225
226 if (A == NULL || B == NULL || C == NULL) {
227 fprintf(stderr, "[error] Failed to allocate memory.\n");
228 return EXIT_FAILURE;
229 }
230
231 // A <- random diagonally dominant matrix
232 for (int i = 0; i < n; i++) {
233 for (int j = 0; j < n; j++)
234 A[i*ldA+j] = B[i*ldB+j] = 2.0*rand()/RAND_MAX - 1.0;
235 A[i*ldA+i] = B[i*ldB+i] = 1.0*rand()/RAND_MAX + n;
236 }
237
238 //
239 // compute
240 //
241
242 struct timespec ts_start;
243 clock_gettime(CLOCK_MONOTONIC, &ts_start);
244
245 // A <- (L,U)
246 blocked_lu(block_size, n, ldA, A);
247
248 struct timespec ts_stop;
249 clock_gettime(CLOCK_MONOTONIC, &ts_stop);
250
251 printf("Time = %f s\n",
252 ts_stop.tv_sec - ts_start.tv_sec +
253 1.0E-9*(ts_stop.tv_nsec - ts_start.tv_nsec));
254
255 // C <- L * U
256 mul_lu(n, ldA, ldC, A, C);
257
258 //
259 // validate
260 //
261
262 // C <- L * U - B
263 for (int i = 0; i < n; i++)
264 for (int j = 0; j < n; j++)
265 C[i*ldC+j] -= B[i*ldB+j];
266
267 // compute || C ||_F / || B ||_F = || L * U - B ||_F / || B ||_F
268 double residual = dlange_("Frobenius", &n, &n, C, &ldC, NULL) /
269 dlange_("Frobenius", &n, &n, B, &ldB, NULL);
270
271 printf("Residual = %E\n", residual);
272
273 int ret = EXIT_SUCCESS;
274 if (1.0E-12 < residual) {
275 fprintf(stderr, "The residual is too large.\n");
276 ret = EXIT_FAILURE;
277 }
278
279 //
280 // cleanup
281 //
282
283 free(A);
284 free(B);
285 free(C);
286
287 return ret;
288}
$ gcc -o finely-blocked finely-blocked.c -Wall ${LIBLAPACK} ${LIBBLAS}
$ OMP_NUM_THREADS=1 ./finely-blocked 3000 128
Time = 0.587407 s
Residual = 3.958161E-16
Challenge
Priorities
We can prioritize the critical path by using the following offsets:
Above, the priority is given by omp_get_max_task_priority() - offset
.