一、Jacobi代码改造
为了适应更多核数并行计算,并测试通信效果,将通信边界由只有横向分区改为横纵分区。
同时,还做了如下一些功能完善:
1.time计时器
2.Matplotlib显示渲染
3.每1000步汇总数据一次
4.支持串行运行
5.可设置n*n进程数
6.可设置网格精细度
完整Python代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
import os
import time
from mpi4py import MPI
import numpy as np
import math
import matplotlib.pyplot as plt
comm = MPI.COMM_WORLD
num_procs = comm.Get_size()
my_id = comm.Get_rank()
if my_id == 0:
start_time = time.time()#计时开始
print(f"process {my_id} of {num_procs} is running!")
steps = 20000
block=int(math.sqrt(num_procs))
mysize = 180//block#每个进程的计算量
num_procs=block*block
if my_id >= num_procs:
exit(0)
#虚拟进程
left = my_id - 1 if my_id % block > 0 else MPI.PROC_NULL
right = my_id + 1 if my_id % block < block-1 else MPI.PROC_NULL
up = my_id - block if my_id // block > 0 else MPI.PROC_NULL
down = my_id + block if my_id // block < block-1 else MPI.PROC_NULL
tag1, tag2, tag3, tag4 = 1, 2, 3, 4
begin_col, end_col = 1, mysize
begin_row, end_row = 1, mysize
A = np.zeros((mysize + 2, mysize + 2), dtype=float)
B = np.zeros((mysize + 2, mysize + 2), dtype=float)
recv_data = np.empty(mysize, dtype=float)
if my_id % block == 0:
begin_col = 2
A[0:mysize+2, 1] = 8.0
if my_id % block == block-1:
end_col = mysize-1
A[0:mysize+2, mysize] = 8.0
if my_id // block == 0:
begin_row = 2
A[1, 0:mysize+2] = 8.0
if my_id // block == block-1:
end_row = mysize-1
A[mysize, 0:mysize+2] = 8.0
if my_id == 0:
read_time = time.time()#读取计时
execution_time = read_time - start_time
print("读取耗时:", execution_time, "seconds")
for n in range(0,steps):
comm.Sendrecv(np.ascontiguousarray(A[1:mysize+1, mysize]), dest=right, sendtag=tag1,
recvbuf=recv_data, source=left, recvtag=tag1)
A[1:mysize+1, 0] = recv_data
comm.Sendrecv(np.ascontiguousarray(A[1:1+mysize, 1]), dest=left, sendtag=tag2,
recvbuf=recv_data, source=right, recvtag=tag2)
A[1:mysize+1, mysize+1] = recv_data
comm.Sendrecv(np.ascontiguousarray(A[mysize, 1:mysize+1]), dest=down, sendtag=tag3,
recvbuf=recv_data, source=up, recvtag=tag3)
A[0, 1:mysize+1] = recv_data
comm.Sendrecv(np.ascontiguousarray(A[1, 1:mysize+1]), dest=up, sendtag=tag4,
recvbuf=recv_data, source=down, recvtag=tag4)
A[mysize+1, 1:mysize+1] = recv_data
for j in range(begin_col, end_col + 1):
for i in range(begin_row, end_row + 1):
B[i, j] = (A[i, j + 1] + A[i, j - 1] + A[i + 1, j] + A[i - 1, j]) * 0.25
for j in range(begin_col, end_col + 1):
for i in range(begin_row, end_row + 1):
A[i, j] = B[i, j]
if n%1000 == 0:
C,D = None,None
if my_id == 0:
C = np.zeros(mysize*block*mysize*block, dtype=float)
D = np.zeros((mysize*block, mysize*block), dtype=float)
if my_id < num_procs:
comm.Gather(np.ascontiguousarray(A[1:mysize + 1, 1:mysize + 1]), C, root=0)
if my_id == 0:
sub_arrays = np.array_split(C, num_procs)
for sub_array in enumerate(sub_arrays):
sub_array[1].resize(mysize,mysize)
rows = []
for i in range(0, num_procs, block):
row = np.hstack(sub_arrays[i:i+block])
rows.append(row)
D = np.vstack(rows)
plt.imshow(D, cmap='jet', vmin=0, vmax=8, origin='lower', extent=[1, mysize*block, 1, mysize*block])
plt.title(f'Process All')
plt.xlabel('Columns')
plt.ylabel('Rows')
if n == 0:
plt.colorbar()
os.makedirs(f"Jacobi2D_Process{num_procs}_size{mysize*block}", exist_ok=True)
plt.savefig(f"Jacobi2D_Process{num_procs}_size{mysize*block}/Jacobi2D{num_procs}_Process{num_procs}_size{mysize*block}_{n}_.png", dpi=300)
print(f"Process {my_id} Step Index {n+1}")
#print(f"Process {my_id} Final Result:")
#print(A[1:mysize + 1, 1:mysize + 1])
if my_id == 0:
compute_time = time.time()#计算计时
execution_time = compute_time - read_time
print("计算耗时:", execution_time, "seconds")
total_time = execution_time*num_procs
print("总计算核时:", total_time, "seconds")
C,D = None,None
if my_id == 0:
C = np.zeros(mysize*block*mysize*block, dtype=float)
D = np.zeros((mysize*block, mysize*block), dtype=float)
if my_id < num_procs:
comm.Gather(np.ascontiguousarray(A[1:mysize + 1, 1:mysize + 1]), C, root=0)
if my_id == 0:
sub_arrays = np.array_split(C, num_procs)
for sub_array in enumerate(sub_arrays):
sub_array[1].resize(mysize,mysize)
rows = []
for i in range(0, num_procs, block):
row = np.hstack(sub_arrays[i:i+block])
rows.append(row)
D = np.vstack(rows)
comm_time = time.time()#通讯计时
execution_time = comm_time - compute_time
print("通讯耗时:", execution_time, "seconds")
#print(D)
plt.imshow(D, cmap='jet', vmin=0, vmax=8, origin='lower', extent=[1, mysize*block, 1, mysize*block])
plt.title(f'Process All')
plt.xlabel('Columns')
plt.ylabel('Rows')
if n == 0:
plt.colorbar()
os.makedirs(f"Jacobi2D_Process{num_procs}_size{mysize*block}", exist_ok=True)
plt.savefig(f"Jacobi2D_Process{num_procs}_size{mysize*block}/Jacobi2D{num_procs}_Process{num_procs}_size{mysize*block}_{steps}_{total_time}.png", dpi=300)
plt.show()
二、显示效果
在设置mysize等于180,steps=2000,使用Matplotlib得到效果图如下:
三、效率
在笔记本电脑16核全开的情况下,CPU全负荷可持续达到100%。
同时,测试多核情况下,并行计算耗时对比。
由于笔记本为8核16线程,其中16线程使用了虚拟化技术,故在全负荷适合,笔记本计算效率不能保持很好的并行加速比。
参考资料
1.都志辉. 高性能计算并行编程技术:MPI并行程序设计[M]. 清华大学出版社, 2001.