MPI与并行计算04:对等模式

Posted by Ling on 2023-07-20

一、对等模式

本部分内容将分为两章介绍两种基本的并行程序设计模式,对等模式和主从模式。顾名思义,对等模式即在设计算法时将各进程的地位视为相等,主从模式则把各进程的地位视作不相等,有主进程和从进程之分。

在对等模式中,各个进程的功能基本相同,因此用SPMD(单程序多数据)程序可以更好的表达。在讲解对等模式时,常用Jacobi迭代作为范例。Jacobi迭代在数值计算上是比较常用的算法,本文只考虑其算法本身,不考虑该方法的其他意义。因此后文只介绍其算法的计算规则,具体算法使用背景可自行检索。

由于时间原因,本文的伪代码是使用Fortran写的,之后会补充上C的版本。本文尽量使用最常规的语法规则去写,因此不论更擅长哪门编程语言,应该都能在伪代码中看到其并行算法的思想。如果你更擅长使用C语言,可以尝试参考结尾的Fortran代码用C语言进行改写。

二、Jacobi迭代算法介绍

通俗的讲,Jacobi迭代就是用上下左右周围的四个点取平均值来得到新的点,即

\[A(i,j)=0.25\ast(A(i-1,j)+A(i+1,j)+A(i,j-1)+A(i,j+1))\]

每一轮迭代计算所用的数值都是上一轮的结果,即还需要额外声明一个变量来储存当前这一轮迭代的结果,然后在迭代完成后刷新矩阵A的值。以下为Jacobi迭代的串行实现的Fortran伪代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
program Jacobi
    integer,parameter :: N=15
    real :: A(N+1,N+1),B(N+1,N+1) !假设计算规模是16x16的矩阵
    integer :: step !step为迭代的次数
    ! 略去赋值部分
    do k=1,step
        do i=2,N
            do j=2,N
                B(i,j)=0.25*(A(i-1,j)+A(i+1,j)+A(i,j+1)+A(i,j-1))
            end do
        end do
        do i=2,N
            do j=2,N
                A(i,j)=B(i,j)
            end do
        end do
    end do
    end program

上述的伪代码可以看出,在同一轮迭代中,计算任意一点之间都是相互独立的,代码的局部性很好,在这种情况下,就很适合把代码改成并行来提速。下文将一步步从串行代码改写为并行代码。

三、MPI编写Jacobi迭代算法

假设我们的A矩阵是16x16的,可以考虑使用4个进程来完成该Jacobi迭代。因为所迭代的矩阵A是一个二维数据,可以看做一个面板,而面板上的每一点计算都是独立的。所以很自然的就可以想到将这个面板划分成四块,每个进程计算相应部分的点,即如下图所示。

而这样又产生了新的问题,即算边界点时,需要相邻进程进程的边界值。比如计算进程0最右侧一列的点时,需要进程1最左侧一列点的值。从图中我们可以看出,进程1和进程2所执行的操作是一样的,都需要从左右两进程接收数据,也需要向左右两侧发送数据。而进程0和进程3则是只用往一侧发送和接收数据。但因为我们所编写的是SPMD程序,所有进程执行的都是同一个代码文件。为了保持一致性和代码的可读性,在每一块的左右边界各加上一列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
! 进程0-2向右侧的邻居接收数据 
if(myid<3) then 
    call MPI_RECV(A(1,mysize+2),totalsize,MPI_REAL,myid+1,10,MPI_COMM_WORLD,status,ierr) 
end if 
! 进程1-3向左侧的邻居发送数据 
if(myid>0) then 
    call MPI_SEND(A(1,2),totalsize,MPI_REAL,myid-1,10,MPI_COMM_WORLD,ierr) 
end if 
! 进程0-2向右侧的邻居发送数据 
if(myid<3) then 
    call MPI_SEND(A(1,mysize+1),totalsize,MPI_REAL,myid+1,10,MPI_COMM_WORLD,ierr) 
end if 
! 进程1-3向左侧的邻居接收数据 
if(myid>0) then 
    call MPI_RECV(A(1,1),totalsize,MPI_REAL,myid-1,10,MPI_COMM_WORLD,status,ierr) 
end if 

其中,MPI_RECV和MPI_SEND的第一个参数是所接收或传递的数组的起始位置,而第二个参数是数组的长度。Fortran对数组的存储是按列存储,所以如果以A(1,1)为起始位置,长度为totalsize,而每一进程中的A矩阵是16×6的,因此所传递的就是A(1:16,1),即最左侧的一列。其他都是同理。

通过观察可以看出,本代码的实现难度主要在通信部分。因为需要处理好SEND和RECV的依赖关系,而依赖关系的错误编写会使程序产生死锁或者内存溢出等致命BUG。 通俗的讲,如果两个进程都在等待对方发送消息,这样程序执行到这一步时就会卡住。如果两个进程都在给对方进程发送,而此时两个进程都没有接收指令,也同样会出现问题。

四、死锁和内存溢出

MPI_SEND和MPI_RECV所采用的是MPI的标准通信模式,即是否对发送的数据进行缓存不是由程序员决定,而是由MPI决定。通信分为阻塞通信和非阻塞通信,通俗的讲,阻塞通信即为通信时只能做通信这一件事,而非阻塞通信则为该进开始发送消息时,不必等消息发送完成,即可继续执行下一步指令。

在阻塞通信时,若MPI_SEND和MPI_RECV顺序不当则会发生死锁的现象,例如进程0和进程1要相互发送消息。而进程0和进程1都先进行接收操作,则两个进程都会等待消息的发送而不会执行下一步的发送操作,此时程序就卡住不动,也就是死锁现象。

1
2
3
4
5
6
7
8
if(myid==0) then
    call MPI_RECV(...,...,...,1,tag,...,...,...)
    call MPI_SEND(...,...,...,1,tag,...,...)
end if
if(myid==1) then
    call MPI_RECV(...,...,...,0,tag,...,...,...)
    call MPI_SEND(...,...,...,0,tag,...,...)
end if

而如果在进程0和进程1中,都先执行了SEND操作时,也会出现错误。因进程0和进程1都会像系统缓冲区发送数据。而当系统缓冲区空间不足时,则会出现溢出的现象,有很大的危险性。

因此,在使用SEND和RECV的时候,为了保证程序的安全性,需要匹配SEND和RECV。将上述代码修改成如下方式即可顺利运行。

1
2
3
4
5
6
7
8
if(myid==0) then
    call MPI_SEND(...,...,...,1,tag,...,...)
    call MPI_RECV(...,...,...,1,tag,...,...,...)
end if
if(myid==1) then
    call MPI_RECV(...,...,...,0,tag,...,...,...)
    call MPI_SEND(...,...,...,0,tag,...,...)
end if

五、捆绑发送接收

在代码规模大且程序结构复杂的情况下,匹配SEND和RECV需要额外花费较多精力,有没有一种方法能更简便的方式来编写呢?在该算法的应用场景下,发送和接收操作是成对出现的,因此可以将发送和接收操作捆绑起来。接下来将介绍MPI_SEDNRECV和MPI_SENDRECV_REPLACE函数。

进程0和进程3只有一侧边界需要传输数据,因此处理起来会和进程1和进程2不同。若不考虑进程0和进程3的情况,该部分通信可写成如下形式。

1
2
3
4
5
6
do i=1,steps 
! 从左向右传递数据 
    call MPI_SENDRECV(A(1,mysize+1),totalsize,MPI_REAL,myid+1,10,A(1,1),totalsize,MPI_REAL,myid-1,10,MPI_COMM_WORLD,status,ierr) 
! 从右向左传递数据 
    call MPI_SENDRECV(A(1,2),totalsize,MPI_REAL,myid-1,10,A(1,mysize+2),totalsize,MPI_REAL,myid+1,10,MPI_COMM_WORLD,status,ierr) 
end do 

能看出来代码的简洁程度立刻就提升了,然而我们还需要处理进程0和进程3这两个特殊情况。有两种思路,一种是使用if语句将进程0和进程3单独编写,还有一种思路是引入进程拓扑和虚拟进程,使进程0和进程3与进程1和进程2在形式上保持一致。用这种方式可以保证代码更加简洁可读。

六、虚拟进程

MPI_PROC_NULL是一个假想的进程,其存在有助于编写时的方便。当一个真实进程向虚拟进程发送和接收数据时,会立刻执行一个空操作。该进程的引入可以很好的简化边界的代码。不仅可以使代码编写变得简单,也使代码的可读性大大提高。

为了使用虚拟进程,需要在进程0和进程3额外进行一下标记。引入right和left来对每一个进程左右两边进行记录。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
if(myid > 0) then 
    left=myid-1 
else 
    left=MPI_PROC_NULL 
end if 
if(myid < 3) then 
    right=myid+1 
else 
    right=MPI_PROC_NULL 
end if 
do i=1,steps 
    call MPI_SENDRECV(A(1,mysize+1),totalsize,MPI_REAL,right,tag1,A(1,1),totalsize,MPI_REAL,left,tag1,MPI_COMM_WORLD,status,ierr) 
    call MPI_SENDRECV(A(1,2),totalsize,MPI_REAL,left,tag2,A(1,mysize+2),totalsize,MPI_REAL,right,tag2,MPI_COMM_WORLD,status,ierr) 
end do 

七、总结

本文介绍了对等模式的基本概念,并用对等模式的方式改写了串行的Jacobi迭代算法。介绍了MPI_SENDRECV函数,并介绍了虚拟进程的概念。初学者可以试着结合上述的代码,试着把其完整的代码编写出来。该部分的完整版Fortran代码见文末附录,稍后会更新C代码。

通过实践,我们会发现下面的代码在输出结果时,各进程输出各自的矩阵A,在时间顺序上是不确定的,打印出来的矩阵A是乱序的。因为各进程计算的速度是不确定的,先计算完的进程就先执行输出语句。若想按顺序完整的输出最终的矩阵A,需要将各个进程中的结果汇总到一个进程中,由一个进程负责输出,这就涉及到了主从进程的思想,下一章将用矩阵相乘这一简单的例子来讲解主从模式。

Fortran完整代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
program main 
    use mpi 
    implicit none 
    integer,parameter :: steps = 10 
    integer,parameter :: totalsize = 16 
    integer,parameter :: mysize = 4 
    integer :: n,myid,numprocs,i,j,rc 
    integer :: left,right,tag1,tag2 
    real :: A(totalsize,mysize+2),B(totalsize,mysize+2) 
    integer :: begin_col,end_col,ierr 
    integer :: status(MPI_STATUS_SIZE) 
    call MPI_INIT(ierr) 
    call MPI_COMM_RANK(MPI_COMM_WORLD,myid,ierr) 
    call MPI_COMM_SIZE(MPI_COMM_WORLD,numprocs,ierr) 
    print *, "Process ", myid,"of ",numprocs,"is alive!" 
    do j=1,mysize+2 
        do i=1,totalsize 
            A(i,j)=0.0 
        end do 
    end do 
    if(myid==0) then 
        do i=1,totalsize 
            A(i,2)=8.0 
        end do 
    end if 
    if(myid==3) then 
        do i=1,totalsize 
            A(i,mysize+1)=8.0 
        end do 
    end if 
    do i=1,mysize+2 
        A(1,i)=8.0 
        A(totalsize,i)=8.0 
    end do 
    if(myid > 0) then 
        left=myid-1 
    else 
        left=MPI_PROC_NULL 
    end if 
    if(myid < 3) then 
        right=myid+1 
    else 
        right=MPI_PROC_NULL 
    end if 
    tag1=3 
    tag2=4 
    do n=1,steps 
        call MPI_SENDRECV(A(1,mysize+1),totalsize,MPI_REAL,right,tag1,& 
                    A(1,1),totalsize,MPI_REAL,left,tag1,MPI_COMM_WORLD,status,ierr) 
        call MPI_SENDRECV(A(1,2),totalsize,MPI_REAL,left,tag2,& 
                    A(1,mysize+2),totalsize,MPI_REAL,right,tag2,MPI_COMM_WORLD,status,ierr) 
        begin_col=2 
        end_col=mysize+1 
        if(myid==0) then 
            begin_col=3 
        end if 
        if(myid==3) then 
            end_col=mysize 
        end if 
        do j=begin_col,end_col 
            do i=2,totalsize-1 
                B(i,j)=(A(i,j+1)+A(i,j-1)+A(i+1,j)+A(i-1,j))*0.25 
            end do 
        end do 
        do j=begin_col,end_col 
            do i=2,totalsize-1 
                A(i,j)=B(i,j) 
            end do 
        end do 
    end do 
    do i=2,totalsize-1 
        print *, myid,(a(i,j),j=begin_col,end_col) 
    end do 
    call MPI_FINALIZE(rc) 
end program

C++完整代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
#include "mpi.h"
#include <iostream>
#include <iomanip>

int main(int argc, char *argv[])
{
    int steps = 10;                        // 迭代次数
    const int totalsize = 16;              // 矩阵维度
    const int mysize = 4;                  // MPI 分块数目, 4 个 processes
    double A[totalsize][mysize + 2] = {0}; // all proc data matrix
    double B[totalsize][mysize + 2] = {0}; // all proc buffer matrix
    int begin_col = 1;                     // all proc 矩阵的起始列
    int end_col = mysize + 1;              // all proc 矩阵的结束列
    int myid = 0, numprocs = 0;
    int left = 0, right = 0;
    int tag1 = 3, tag2 = 4; // MPI 发送接收 tag
    MPI_Status status;      // MPI 状态
    MPI_Init(&argc, &argv);
    // MPI 初始化
    //  int thread_support = 0;
    // MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &thread_support);
    // 获取rank
    MPI_Comm_rank(
        MPI_COMM_WORLD /*MPI_Comm comm*/,
        &myid /*int* size*/
    );
    //获取进程数
    MPI_Comm_size(
        MPI_COMM_WORLD, /*MPI_Comm comm*/
        &numprocs       /* int* size */
    );
    // 打印进程信息
    std::cout << "Process " << myid << " of " << numprocs << " is alive!" << std::endl;
    // 4 proc 矩阵赋初值, 左右额外两列用于交换
    for (int j = 0; j < mysize + 2; j++)
    {
        for (int i = 0; i < totalsize; i++)
        {
            A[i][j] = 0.0;
        }
    }
    // proc 0 额外初值
    if (myid == 0)
        for (int i = 0; i < totalsize; i++)
        {
            A[i][1] = 8.0;
        }
    // proc 3 额外初值
    if (myid == 3)
        for (int i = 0; i < totalsize; i++)
        {
            A[i][mysize] = 8.0;
        }
    // proc all 赋初值
    for (int i = 0; i < mysize + 2; i++)
    {
        A[0][i] = 8.0;
        A[totalsize - 1][i] = 8.0;
    }
    // 计算相邻 proc 矩阵
    if (myid > 0)
    {
        left = myid - 1;
    }
    else
    {
        left = MPI_PROC_NULL;
    }
    if (myid < 3)
    {
        right = myid + 1;
    }
    else
    {
        right = MPI_PROC_NULL;
    }
    // 雅可比迭代 整体步数
    for (int i = 0; i < steps; i++)
    {
        // 绑定发送接收, proc:第4列  -> rihgt proc: 第0列
        MPI_Sendrecv(
            &A[0][mysize], // onst void *sendbuf; 最右侧数据, 要发送给下一个 proc
            totalsize,     // int sendcount; 数据长度
            MPI_DOUBLE,    // MPI_Datatype  sendtype; 数据类型
            right,         // int dest
            tag1,          // int sendtag,
            //---------------------------------------
            &A[0][0],       // void *recvbuf; 接收数据位置
            totalsize,      // int recvcount; 接收长度
            MPI_DOUBLE,     // MPI_Datatype  recvtype; 数据类型
            left,           // int source; 接收源
            tag1,           // int recvtag; 接收 tag
            MPI_COMM_WORLD, // MPI_Comm comm; 通信域
            &status         // MPI_Status *status; 发送或接收状态;
        );
        // 绑定发送接收, proc:第1列  -> left proc: 第5列
        MPI_Sendrecv(
            &A[0][1],   // onst void *sendbuf
            totalsize,  // int sendcount
            MPI_DOUBLE, // MPI_Datatype sendtype
            left,       // int dest
            tag2,       // int sendtag,
            //---------------------------------------
            &A[0][mysize + 1], // void *recvbuf,
            totalsize,         // int recvcount,
            MPI_DOUBLE,        // MPI_Datatype recvtype,
            right,             // int source,
            tag2,              // int recvtag,
            MPI_COMM_WORLD,    // MPI_Comm comm,
            &status            // MPI_Status *status
        );
        // proc 0
        if (myid == 0)
        {
            begin_col = 2; // 0th 列是缓冲, 1st 列缺少左边, 所以从 2nd 开始
        }
        // proc 3
        if (myid == 3)
        {
            end_col = mysize - 1; //类似上面
        }
        // 在每个 proc matrix 内部, 去除首位列, 迭代内部列
        for (int j = begin_col; j < end_col; j++)
        {
            // in each proc matrix, 去除首位行, 迭代内部行
            for (int i = 1; i < totalsize - 1; i++)
            {
                B[i][j] = (A[i][j + 1] + A[i][j - 1] + A[i + 1][j] + A[i - 1][j]) * 0.25;
            }
        }
        // 将数据从缓冲变量 提取到 原变量
        for (int j = begin_col; j < end_col; j++)
        {
            for (int i = 1; i < totalsize - 1; i++)
            {
                A[i][j] = B[i][j];
            }
        }
    }
    // 打印迭代结果
    for (int row = 1; row < totalsize - 1; row++)
    {
        std::cout << "proc (" << myid << "):  ";
        for (int col = begin_col; col < end_col; col++)
        {
            std::cout << std::setiosflags(std::ios_base::left)
                      << std::setw(15) << A[row][col]
                      << std::resetiosflags(std::ios_base::left);
        }
        std::cout << std::endl;
    }
    // MPI 收尾
    MPI_Finalize();
}

八、4X4进程并行通讯

Fortran代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
program main 
    use mpi 
    implicit none 
    integer,parameter :: steps = 10
    integer,parameter :: mysize = 5
    integer :: n,myid,numprocs,i,j,rc 
    integer :: left,right,up,down,tag1,tag2,tag3,tag4 
    real :: A(mysize+2,mysize+2),B(mysize+2,mysize+2),COL((mysize+2)*(mysize+2))
    integer :: begin_col,end_col,begin_row,end_row,ierr 
    integer :: status(MPI_STATUS_SIZE) 
    call MPI_INIT(ierr) 
    call MPI_COMM_RANK(MPI_COMM_WORLD,myid,ierr) 
    call MPI_COMM_SIZE(MPI_COMM_WORLD,numprocs,ierr) 
    print *, "Process ", myid,"of ",numprocs,"is alive!" 
    do j=1,mysize+2 
        do i=1,mysize+2 
            A(i,j)=0.0 
        end do 
    end do 
    if(myid==0) then 
        do i=2,mysize+2 
            A(i,2)=8.0 
        end do 
        do j=2,mysize+2 
            A(2,j)=8.0 
        end do 
    end if 
    if(myid==3) then 
        do i=2,mysize+2 
            A(i,mysize+1)=8.0 
        end do 
        do j=1,mysize+1 
            A(2,j)=8.0 
        end do 
    end if 
    if(myid==12) then 
        do i=1,mysize+1 
            A(i,2)=8.0 
        end do 
        do j=2,mysize+2 
            A(mysize+1,j)=8.0 
        end do 
    end if 
    if(myid==15) then 
        do i=1,mysize+1 
            A(i,mysize+1)=8.0 
        end do 
        do j=1,mysize+1 
            A(mysize+1,j)=8.0 
        end do 
    end if 
    if(myid > 0 .and. myid < 3) then 
        do j=1,mysize+2
            A(2,j)=8.0 
        end do 
    end if 
    if(myid > 12 .and. myid < 15) then 
        do j=1,mysize+2
            A(mysize+1,j)=8.0 
        end do 
    end if 
    if(myid == 4 .or. myid == 8) then 
        do i=1,mysize+2
            A(i,2)=8.0 
        end do 
    end if 
    if(myid == 7 .or. myid == 11) then 
        do i=1,mysize+2
            A(i,mysize+1)=8.0 
        end do 
    end if 
    if(MOD(myid, 4) > 0) then 
        left=myid-1 
    else 
        left=MPI_PROC_NULL 
    end if 
    if(MOD(myid, 4) < 3) then 
        right=myid+1 
    else 
        right=MPI_PROC_NULL 
    end if 
    if(myid/4 > 0) then 
        up=myid-4
    else 
        up=MPI_PROC_NULL 
    end if 
    if(myid/4 < 3) then 
        down=myid+4
    else 
        down=MPI_PROC_NULL 
    end if 

    tag1=1 
    tag2=2 
    tag3=3 
    tag4=4 
    do n=1,steps 
        call MPI_SENDRECV(A(2,mysize+1),mysize,MPI_REAL,right,tag1,& 
                    A(2,1),mysize,MPI_REAL,left,tag1,MPI_COMM_WORLD,status,ierr) 
        call MPI_SENDRECV(A(2,2),mysize,MPI_REAL,left,tag2,& 
                    A(2,mysize+2),mysize,MPI_REAL,right,tag2,MPI_COMM_WORLD,status,ierr) 
        do i=1,mysize+2
            do j=1,mysize+2
                COL((i-1)*(mysize+2)+j) = A(i,j)
            end do 
        end do 
        call MPI_SENDRECV(COL((mysize+2)*mysize+2),mysize,MPI_REAL,down,tag3,& 
                    COL(2),mysize,MPI_REAL,up,tag3,MPI_COMM_WORLD,status,ierr) 
        call MPI_SENDRECV(COL((mysize+2)*1+2),mysize,MPI_REAL,up,tag4,& 
                    COL((mysize+2)*(mysize+1)+2),mysize,MPI_REAL,down,tag4,MPI_COMM_WORLD,status,ierr) 
    do i=1,mysize+2
            do j=1,mysize+2
                A(i,j) = COL((i-1)*(mysize+2)+j)
            end do 
        end do 

        begin_col=2 
        end_col=mysize+1 
    begin_row=2 
        end_row=mysize+1 
        if(MOD(myid, 4)==0) then 
            begin_col=3 
        end if 
        if(MOD(myid, 4)==3) then 
            end_col=mysize 
        end if 
        if(myid/4==0) then 
            begin_row=3
        end if 
        if(myid/4==3) then 
            end_row=mysize
        end if 
        do j=begin_col,end_col 
            do i=begin_row,end_row 
                B(i,j)=(A(i,j+1)+A(i,j-1)+A(i+1,j)+A(i-1,j))*0.25 
            end do 
        end do 
        do j=begin_col,end_col 
            do i=begin_row,end_row 
                A(i,j)=B(i,j) 
            end do 
        end do 
        if(myid==0) then 
            print *,"Step Index ",n
        end if 
    end do 
    do i=2,mysize+1
        print *, myid,(a(i,j),j=2,mysize+1) 
    end do 
    call MPI_FINALIZE(rc) 
end program

Python代码(附带显示渲染):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD
num_procs = comm.Get_size()
my_id = comm.Get_rank()

print(f"process {my_id} of {num_procs} is running!")

steps = 10
mysize = 5

left = my_id - 1 if my_id % 4 > 0 else MPI.PROC_NULL
right = my_id + 1 if my_id % 4 < 3 else MPI.PROC_NULL
up = my_id - 4 if my_id // 4 > 0 else MPI.PROC_NULL
down = my_id + 4 if my_id // 4 < 3 else MPI.PROC_NULL

tag1, tag2, tag3, tag4 = 1, 2, 3, 4

begin_col, end_col = 1, mysize
begin_row, end_row = 1, mysize

A = np.zeros((mysize + 2, mysize + 2), dtype=float)
B = np.zeros((mysize + 2, mysize + 2), dtype=float)
recv_data = np.empty(mysize, dtype=float)

if my_id % 4 == 0:
    begin_col = 2
    A[0:mysize+2, 1] = 8.0
if my_id % 4 == 3:
    end_col = mysize-1
    A[0:mysize+2, mysize] = 8.0
if my_id // 4 == 0:
    begin_row = 2
    A[1, 0:mysize+2] = 8.0
if my_id // 4 == 3:
    end_row = mysize-1
    A[mysize, 0:mysize+2] = 8.0

for n in range(0,steps):
    comm.Sendrecv(np.ascontiguousarray(A[1:mysize+1, mysize]), dest=right, sendtag=tag1,
                  recvbuf=recv_data, source=left, recvtag=tag1)
    A[1:mysize+1, 0] = recv_data

    comm.Sendrecv(np.ascontiguousarray(A[1:1+mysize, 1]), dest=left, sendtag=tag2,
                  recvbuf=recv_data, source=right, recvtag=tag2)
    A[1:mysize+1, mysize+1] = recv_data

    comm.Sendrecv(np.ascontiguousarray(A[mysize, 1:mysize+1]), dest=down, sendtag=tag3,
                  recvbuf=recv_data, source=up, recvtag=tag3)
    A[0, 1:mysize+1] = recv_data

    comm.Sendrecv(np.ascontiguousarray(A[1, 1:mysize+1]), dest=up, sendtag=tag4,
                  recvbuf=recv_data, source=down, recvtag=tag4)
    A[mysize+1, 1:mysize+1] = recv_data

    for j in range(begin_col, end_col + 1):
        for i in range(begin_row, end_row + 1):
            B[i, j] = (A[i, j + 1] + A[i, j - 1] + A[i + 1, j] + A[i - 1, j]) * 0.25

    for j in range(begin_col, end_col + 1):
        for i in range(begin_row, end_row + 1):
            A[i, j] = B[i, j]

    #print(f"Process {my_id} Step Index {n+1}")

print(f"Process {my_id} Final Result:")
print(A[1:mysize + 1, 1:mysize + 1])
参考资料

1.都志辉. 高性能计算并行编程技术:MPI并行程序设计[M]. 清华大学出版社, 2001.

2.两小时入门MPI与并行计算系列 - 知乎 (zhihu.com)