MPI与并行计算06:进阶

Posted by Ling on 2023-07-30

一、Jacobi代码改造

为了适应更多核数并行计算,并测试通信效果,将通信边界由只有横向分区改为横纵分区。

同时,还做了如下一些功能完善:

1.time计时器

2.Matplotlib显示渲染

3.每1000步汇总数据一次

4.支持串行运行

5.可设置n*n进程数

6.可设置网格精细度

完整Python代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
import os
import time
from mpi4py import MPI
import numpy as np
import math
import matplotlib.pyplot as plt

comm = MPI.COMM_WORLD
num_procs = comm.Get_size()
my_id = comm.Get_rank()

if my_id == 0:
    start_time = time.time()#计时开始   

print(f"process {my_id} of {num_procs} is running!")

steps = 20000
block=int(math.sqrt(num_procs))
mysize = 180//block#每个进程的计算量
num_procs=block*block

if my_id >= num_procs:
    exit(0)

#虚拟进程
left = my_id - 1 if my_id % block > 0 else MPI.PROC_NULL
right = my_id + 1 if my_id % block < block-1 else MPI.PROC_NULL
up = my_id - block if my_id // block > 0 else MPI.PROC_NULL
down = my_id + block if my_id // block < block-1 else MPI.PROC_NULL

tag1, tag2, tag3, tag4 = 1, 2, 3, 4

begin_col, end_col = 1, mysize
begin_row, end_row = 1, mysize

A = np.zeros((mysize + 2, mysize + 2), dtype=float)
B = np.zeros((mysize + 2, mysize + 2), dtype=float)
recv_data = np.empty(mysize, dtype=float)

if my_id % block == 0:
    begin_col = 2
    A[0:mysize+2, 1] = 8.0
if my_id % block == block-1:
    end_col = mysize-1
    A[0:mysize+2, mysize] = 8.0
if my_id // block == 0:
    begin_row = 2
    A[1, 0:mysize+2] = 8.0
if my_id // block == block-1:
    end_row = mysize-1
    A[mysize, 0:mysize+2] = 8.0

if my_id == 0:
    read_time = time.time()#读取计时
    execution_time = read_time - start_time
    print("读取耗时:", execution_time, "seconds")

for n in range(0,steps):
    comm.Sendrecv(np.ascontiguousarray(A[1:mysize+1, mysize]), dest=right, sendtag=tag1,
                  recvbuf=recv_data, source=left, recvtag=tag1)
    A[1:mysize+1, 0] = recv_data

    comm.Sendrecv(np.ascontiguousarray(A[1:1+mysize, 1]), dest=left, sendtag=tag2,
                  recvbuf=recv_data, source=right, recvtag=tag2)
    A[1:mysize+1, mysize+1] = recv_data

    comm.Sendrecv(np.ascontiguousarray(A[mysize, 1:mysize+1]), dest=down, sendtag=tag3,
                  recvbuf=recv_data, source=up, recvtag=tag3)
    A[0, 1:mysize+1] = recv_data

    comm.Sendrecv(np.ascontiguousarray(A[1, 1:mysize+1]), dest=up, sendtag=tag4,
                  recvbuf=recv_data, source=down, recvtag=tag4)
    A[mysize+1, 1:mysize+1] = recv_data

    for j in range(begin_col, end_col + 1):
        for i in range(begin_row, end_row + 1):
            B[i, j] = (A[i, j + 1] + A[i, j - 1] + A[i + 1, j] + A[i - 1, j]) * 0.25

    for j in range(begin_col, end_col + 1):
        for i in range(begin_row, end_row + 1):
            A[i, j] = B[i, j]

    if n%1000 == 0:
        C,D = None,None
        if my_id == 0:
            C = np.zeros(mysize*block*mysize*block, dtype=float)
            D = np.zeros((mysize*block, mysize*block), dtype=float)

        if my_id < num_procs:
            comm.Gather(np.ascontiguousarray(A[1:mysize + 1, 1:mysize + 1]), C, root=0)

        if my_id == 0:
            sub_arrays = np.array_split(C, num_procs)
            for sub_array in enumerate(sub_arrays):
                sub_array[1].resize(mysize,mysize)

            rows = []
            for i in range(0, num_procs, block):
                row = np.hstack(sub_arrays[i:i+block])
                rows.append(row)
            D = np.vstack(rows)

            plt.imshow(D, cmap='jet', vmin=0, vmax=8, origin='lower', extent=[1, mysize*block, 1, mysize*block])
            plt.title(f'Process All')
            plt.xlabel('Columns')
            plt.ylabel('Rows')
            if n == 0:
                plt.colorbar()
            os.makedirs(f"Jacobi2D_Process{num_procs}_size{mysize*block}", exist_ok=True)
                  plt.savefig(f"Jacobi2D_Process{num_procs}_size{mysize*block}/Jacobi2D{num_procs}_Process{num_procs}_size{mysize*block}_{n}_.png", dpi=300)

    print(f"Process {my_id} Step Index {n+1}")

#print(f"Process {my_id} Final Result:")
#print(A[1:mysize + 1, 1:mysize + 1])

if my_id == 0:
    compute_time = time.time()#计算计时
    execution_time = compute_time - read_time
    print("计算耗时:", execution_time, "seconds")
    total_time = execution_time*num_procs
    print("总计算核时:", total_time, "seconds")

C,D = None,None
if my_id == 0:
    C = np.zeros(mysize*block*mysize*block, dtype=float)
    D = np.zeros((mysize*block, mysize*block), dtype=float)

if my_id < num_procs:
    comm.Gather(np.ascontiguousarray(A[1:mysize + 1, 1:mysize + 1]), C, root=0)

if my_id == 0:
    sub_arrays = np.array_split(C, num_procs)
    for sub_array in enumerate(sub_arrays):
        sub_array[1].resize(mysize,mysize)

    rows = []
    for i in range(0, num_procs, block):
        row = np.hstack(sub_arrays[i:i+block])
        rows.append(row)
    D = np.vstack(rows)

    comm_time = time.time()#通讯计时
    execution_time = comm_time - compute_time
    print("通讯耗时:", execution_time, "seconds")

    #print(D)
    plt.imshow(D, cmap='jet', vmin=0, vmax=8, origin='lower', extent=[1, mysize*block, 1, mysize*block])
    plt.title(f'Process All')
    plt.xlabel('Columns')
    plt.ylabel('Rows')
    if n == 0:
        plt.colorbar()
    os.makedirs(f"Jacobi2D_Process{num_procs}_size{mysize*block}", exist_ok=True)
    plt.savefig(f"Jacobi2D_Process{num_procs}_size{mysize*block}/Jacobi2D{num_procs}_Process{num_procs}_size{mysize*block}_{steps}_{total_time}.png", dpi=300)
    plt.show()

二、显示效果

在设置mysize等于180,steps=2000,使用Matplotlib得到效果图如下:

三、效率

在笔记本电脑16核全开的情况下,CPU全负荷可持续达到100%。

同时,测试多核情况下,并行计算耗时对比。

由于笔记本为8核16线程,其中16线程使用了虚拟化技术,故在全负荷适合,笔记本计算效率不能保持很好的并行加速比。

参考资料

1.都志辉. 高性能计算并行编程技术:MPI并行程序设计[M]. 清华大学出版社, 2001.

2.两小时入门MPI与并行计算系列 - 知乎 (zhihu.com)