一、主从模式
上一章中,可以发现,各进程执行程序的速度是不确定的。用多个进程输出和打印结果,顺序是无法保证的。本文将介绍另一种并行程序的设计模式——主从模式。因此,我们可以在逻辑上规定一个主进程,用于将数据发送给各个进程,再收集各个进程所计算的结果。
在介绍主从模式时,矩阵相乘是一个既简单又具有典型性的一个例子。所以本文就以矩阵相乘这一典型的例子来做介绍。其中,A设置为100×100的矩阵,b设定为100×1的矩阵,来计算矩阵A和矩阵b相乘。在学线性代数时,我们知道按矩阵相乘的计算法则,每一个元素的计算都是独立的,因此可以分别独立的计算A矩阵的任一行和b矩阵这一列相乘并求和的过程,因此很自然的可以将这些计算分配给不同进程的中,再将结果进行汇总即可。
矩阵相乘是数值计算中最基本的模块,学习该例子主要是为了理解主从模式的并行程序的设计逻辑而非相乘本身,我们手写的矩阵乘法计算速度基本是不会快过诸如Petsc或Numpy等这种成熟的科学计算库,这些成熟的计算库都在各个层面对计算进行了优化。
接下来,将分别讲解主进程和从进程执行的过程。
二、主进程
首先,在主进程中,定义好需要计算的矩阵a和b。
1
2
3
4
5
6
do i=1,cols
b(i)=1
do j=1,rows
a(i,j)=i
end do
end do
接下来要做的是数据分发工作,将A矩阵的不同行和b传递给从进程。为了程序的可扩展性,考虑到所用的进程数可能比矩阵A的行数要小,因此还需要定义numsent这一变量来计算已发送的行数。
如下可以看到,主进程在发送数据时,针对矩阵A的不同行和矩阵b采用了不同的API,针对矩阵A采用了之前提到过的MPI_SEND,而针对矩阵b,使用了MPI_BCAST。这是因为每个从进程都接收相同的矩阵b,如果一对一的去发送矩阵b会多此一举,不仅程序变得复杂,运行也会更慢。这时我们可以考虑使用广播这一操作,即主进程将矩阵b向通信域内所有进程广播一下矩阵b,然后从进程就可以都接收到矩阵b这一变量了。
1
2
3
4
5
6
7
8
9
//MPI广播矩阵B
call MPI_BCAST(b,cols,MPI_DOUBLE_PRECISION,master,MPI_COMM_WORLD,ierr)
do i=1,min(numprocs-1,rows)
do j=1,cols
buffer(j)=a(i,j)
end do
call MPI_SEND(buffer,cols,MPI_DOUBLE_PRECISION,i,i,MPI_COMM_WORLD,ierr)
numsent=numsent+1
end do
在执行完发送步骤后,需要将计算结果收回。从进程计算的结果用ans存储,在发送时,所标注的tag和矩阵的行标是相同的,因此直接用c(anstype)=ans来在对应位置存储结果。sender用于记录已经将结果发送回主进程的从进程号,因其已经发送回主进程,即可代表该从进程已经处于空闲状态。在之后的发送中,就向空闲的进程继续发送计算任务。在每次循环中,都判断一次numsent和rows的关系,用于判断是否每一行都发送完成。当都发送完之后,向从进程发送一个空信息,从进程接收到空信息时,即执行MPI_FINALIZE来结束。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
do i=1,row
call MPI_RECV(ans,1,MPI_DOUBLE_PRECISION,MPI_ANY_SOURCE,MPI_ANY_TAG,MPI_COMM_WORLD,status,ierr)
sender=status(MPI_SOURCE)
anstype=status(MPI_TAG)
c(anstype)=ans
if(numsent<rows) then
do j=1,cols
buffer(j)=a(numsent+1,j)
end do
call MPI_SEND(buffer,cols,MPI_DOUBLE_PRECISION,sender,numsent+1,MPI_COMM_WORLD,ierr)
numsent=numsent+1
else
call MPI_SEND(1.0,0,MPI_DOUBLE_PRECISION,sender,0,MPI_COMM_WORLD,ierr)
end if
end do
因此,主进程主要干了三件事,定义数据、发送数据和接收计算结果,分别对应上述三块代码。
三、从进程
从进程首先需要接收主进程广播的矩阵b。
1
call MPI_BCAST(b,cols,MPI_DOUBLE_PRECISION,master,MPI_COMM_WORLD,ierr)
从进程的计算模块放入一个循环中,直到矩阵A的所有行都计算完成后,主进程会发送一个tag为0的空消息,当收到这个空tag时,跳出循环,即完成了计算任务。
1
2
3
4
5
6
7
8
9
10
11
12
13
do while(1)
call MPI_RECV(buffer,cols,MPI_DOUBLE_PRECISION,master,MPI_ANY_TAG,MPI_COMM_WORLD,status,ierr)
if(status(MPI_TAG/=0)) then
row=status(MPI_TAG)
ans=0.0
do i=1,cols
ans=ans+buffer(i)*b(i)
end do
call MPI_SEND(ans,1,MPI_DOUBLE_PRECISION,master,row,MPI_COMM_WORLD,ierr)
else
exit
end if
end do
四、总结
综上,主从模式的并行程序框架基本可以写成如下范式。
1
2
3
4
5
6
7
8
9
10
11
12
13
program main
use mpi
implicit none
...
call MPI_INIT(ierr)
call MPI_COMM_RANK(MPI_COMM_WORLD,myid,ierr)
if (myid==0) then
call master() !主进程的程序代码
else
call slave() !从进程的程序代码
end if
call MPI_FINALIZE(rc)
end program
理解了这两种并行程序的范式,结合这二者的思想,通常已经足以满足大部分计算需求了。前面所讲的基本的API理论上足够表达各式各样的编程逻辑,但在有些计算场景会及其繁琐。比如本文所用到的广播函数,虽然MPI_SEND也能实现相同功能,但BCAST极大的简化了程序编写的复杂程度。MPI中还有很多诸如此类的API,在理解了基本函数的API后,只需稍微阅读手册即可理解那些复杂的API,并应用在你的计算场景之中。
C++完整代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
#include <mpi.h>
#include <iostream>
#include <iomanip>
#include <algorithm>
// 全局变量初始化
const int rows = 100;
const int cols = 100;
double a[rows][cols] = { 0 }; //矩阵 [rows,cols]
double b[cols] = { 0 }; // 列矩阵
double c[rows] = { 0 }; // 存储结果矩阵
double buffer[cols] = { 0 }; //缓冲变量
int numsent = 0; // 已发送行数
int numprocs = 0; //设置进程数
const int m_id = 0; // 主进程 id
const int end_tag = 0; // 标志发送完成的 tag
MPI_Status status; // MPI 状态
// 主进程主要干三件事: 定义数据, 发送数据和接收计算结果
void master()
{
// 1准备数据
for (int i = 0; i < cols; i++)
{
b[i] = i;
for (int j = 0; j < rows; j++)
{
a[i][j] = i + 1;
}
};
// 2 每个从进程都接收相同的矩阵b,
// 考虑使用广播操作, 即主进程将矩阵b向通信域内所有进程广播一下矩阵b,
// 然后从进程就可以都接收到矩阵b这一变量了.
MPI_Bcast(
b, // void* buffer
cols, // int count
MPI_DOUBLE, // MPI_Datatype datatype
m_id, // int root,
MPI_COMM_WORLD // MPI_Comm comm
);
// 矩阵A采用了之前提到过的 MPI_SEND, 发送每行的数据
// 实际的子进程数是 numprocs - 1 个
for (int i = 0; i < std::min(numprocs - 1, rows); i++)
{
for (int j = 0; j < cols; j++)
{
buffer[j] = a[i][j];
}
//发送矩阵A 的行数据, 使用矩阵行数作为 tag MPI_DOUBLE,
MPI_Send(
buffer, // const void* buf,
cols, // int count,
MPI_DOUBLE, // MPI_Datatype datatype,
i + 1, // int dest, 0 列发给 rank 1, 以此类推
i + 1, // int tag, row number +1
MPI_COMM_WORLD // MPI_Comm comm
);
numsent = numsent + 1; // 记录已发送的行数
};
// 3 在执行完发送步骤后, 需要将计算结果收回
double ans = 0.0; // 存储结果的元素
for (int i = 0; i < rows; i++)
{
MPI_Recv(
&ans, // void* buf,
1, // int count,
MPI_DOUBLE, // MPI_Datatype datatype,
MPI_ANY_SOURCE, // int source,
MPI_ANY_TAG, // int tag,
MPI_COMM_WORLD, // MPI_Comm comm,
&status // MPI_Status * status
);
// sender 用于记录已经将结果发送回主进程的从进程号
int sender = status.MPI_SOURCE;
//在发送时, 所标注的 tag = 矩阵的行号+1,
int rtag = status.MPI_TAG - 1;
c[rtag] = ans; //用 c(rtag)=ans来在对应位置存储结果
// numsent 是已发送行, 用于判断是否发送完所有行
// 因其已经发送回主进程, 即可代表该从进程已经处于空闲状态
// 在之后的发送中, 就向空闲的进程继续发送计算任务
if (numsent < rows)
{
// 获取下一列
for (int j = 0; j < cols; j++)
{
buffer[j] = a[numsent][j];
}
MPI_Send(
buffer, cols, MPI_DOUBLE,
sender, numsent + 1, MPI_COMM_WORLD);
numsent = numsent + 1;
}
//当都发送完之后, 向从进程发送一个空信息,
//从进程接收到空信息时, 即执行MPI_FINALIZE来结束.
else
{
int tmp = 1.0;
MPI_Send(
&tmp, 0, MPI_DOUBLE,
sender, end_tag, MPI_COMM_WORLD);
}
};
}
// 子进程
void slave()
{
//从进程首先需要接收主进程广播的矩阵b
MPI_Bcast(b, cols, MPI_DOUBLE, m_id, MPI_COMM_WORLD);
while (1)
{
MPI_Recv(
buffer, cols, MPI_DOUBLE,
m_id, MPI_ANY_TAG, MPI_COMM_WORLD,
&status);
//直到矩阵A的所有行都计算完成后, 主进程会发送 tag 为 end_tag 的空消息,
if (status.MPI_TAG != end_tag)
{
int row = status.MPI_TAG;
double ans = 0.0;
for (int i = 0; i < cols; i++)
{
ans = ans + buffer[i] * b[i];
}
MPI_Send(
&ans, 1, MPI_DOUBLE,
m_id, row, MPI_COMM_WORLD);
}
else {
break;
}
}
}
int main(int argc, char* argv[])
{
//----------------------------------
// MPI 初始化
MPI_Init(&argc, &argv);
//获取进程总数
MPI_Comm_size(
MPI_COMM_WORLD, // MPI_Comm comm
&numprocs // int* size
);
// 获取rank
int myid = 0; // rank number
MPI_Comm_rank(
MPI_COMM_WORLD, // MPI_Comm comm,
&myid // int* size
);
// 打印进程信息
std::cout << "Process " << myid << " of " << numprocs << " is alive!" << std::endl;
//----------------------------------
if (myid == m_id)
{
master(); //主进程的程序代码
}
else
{
slave(); //从进程的程序代码
}
// 打印结果
if (myid == m_id) {
for (int i = 0; i < rows; i++)
{
std::cout << "the ele (" << i << "): "
<< std::setiosflags(std::ios_base::right)
<< std::setw(15) << c[i]
<< std::resetiosflags(std::ios_base::right)
<< std::endl;
}
}
// MPI 收尾
MPI_Finalize();
}
参考资料
1.都志辉. 高性能计算并行编程技术:MPI并行程序设计[M]. 清华大学出版社, 2001.