5.1 并行任务

5.1.1 Python中的并行计算

Python对并行计算提供的原生支持远非完美。CPython实现了GIL(Global Interpreter Lock),GIL是必需的,因为CPython的内存管理不是线程安全的。但它带来了另一个很严重的问题,即它会阻止多线程的CPython程序运行于多核之上。

放弃掉GIL,NumPy中的一些线性代数函数可运行于多核上,但仅限于通过特定的库编译的(如ATLAS、MKL等)NumPy。否则,一般会用多进程而非多线程实现并行计算。由于进程间并不共享内存空间,进程间通信是个问题,比如使用Python的multithreading模块。一个更强大也更复杂的选择是使用Message Passing Interface(MPI)。

IPython对于上述两种方案都表现良好,它提供了一个并行计算的通用框架。借助于负载均衡,独立的任务很容易分发至不同的核或机器。数据也可以在不同的引擎间传送,从而可实现复杂的分布式计算。

5.1.2 多核并行

IPython提供的并行计算功能很强大,也是高度自定义的,不过这里仅介绍其最基本者。实现多核并行需要如下几步:

  • 加载几个IPython引擎(每核一个)
  • 为每个引擎创建一个Client对象作为proxy
  • 使用client执行任务,获得执行结果

这些任务可按同步或异步的方式执行。

启动引擎

# conda install ipyparallel

# start one engine per core
ipcluster start

# or specify the number
ipcluster start -n 2

如果首次使用,先安装ipyparallel,除了ipcluster命令,还可以使用Notebook的Clusters。

创建Client实例

from ipyparallel import Client
rc = Client()

rc.ids
# [0, 1, 2, 3]

开始并行魔法

最简单的方法是使用%px魔法命令,它在引擎上执行一条Python命令。

%px sum(range(101))
Out[0:8]: 5050
Out[1:8]: 5050
Out[2:8]: 5050
Out[3:8]: 5050

# specify engine
%pxconfig --targets 1
Out[1:9]: 5050

%%px命令提供的类似功能。值得注意的是,最开始这些命令是不可用的,在我们创建了一个Client实例后才变得可用。更多魔法命令的信息可参考Parallel Magic Commands

In [1]: %%px --targets :-1
   ...: import os
   ...: print(os.getpid())
   ...:
[stdout:0] 11368
[stdout:1] 3516
[stdout:2] 2044

默认情况下,%%px是blocking模式,如果需要非blocking,可通过--noblock选项指定:

In [11]: %%px --noblock
   ....: import os
   ....: import time
   ....: time.sleep(1)
   ....: os.getpid()
   ....:
Out[11]: <AsyncResult: execute>

此时各任务异步执行,%pxresult命令阻塞解释器,直至诸任务完成,返回结果:

In [12]: %pxresult
Out[0:11]: 11368
Out[1:12]: 3516
Out[2:11]: 2044
Out[3:10]: 1204

并行执行的map函数

Python内置的map函数,对一个序列中的元素逐一应用指定的函数,IPython的并行版map函数语义与此相同,但很容易将任务分发至不同的引擎。

# create a view
v = rc[:]

# import packages on the engines
with v.sync_imports():
    import time

# create a function for tasks
def f(x):
    time.sleep(1)
    return x * x

# map synchronously
v.map_sync(f, range(10))
# %timeit -n 1 -r 1 v.map_sync(f, range(10))

# map asynchronously
%timeit -n 1 -r 1 v.map(f, range(10))

# no parallel
%timeit -n 1 -r 1 map(f, range(10))
r = v.map(f, range(10))

r.ready(), r.elapsed
(False, 2.014)

r.ready(), r.elapsed
(True, 6.028)

r.get()
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

r.elapsed, r.serial_time
(6.028, 10.025)

5.1.3 并行任务实例:Monte Carlo模拟

TODO

5.1.4 使用MPI

MPI(Message Passing Interface,消息传递接口)在并行计算领域是极为高效的。要在Python中使用MPI,可考虑Open-MPI + mpi4py

TODO

5.1.5 高级并行计算特性

  • 动态负载均衡
  • 在引擎间push和pull对象
  • 在不同机器上运行引擎
  • 在Amazon EC2上通过StarCluster运行IPython
  • 将所有request和result保存在数据库中
  • 通过有向无环图(DAG)管理任务依赖关系