Python 并发编程之使用多线程和多处理器


在Python编码中我们经常讨论的一个方面就是如何优化模拟执行的性能。尽管在考虑量化代码时NumPy、SciPy和pandas在这方面已然非常有用,但在构建事件驱动系统时我们无法有效地使用这些工具。有没有可以加速我们代码的其他办法?答案是肯定的,但需要留意!

在这篇文章中,我们看一种不同的模型-并发,我们可以将它引入我们Python程序中。这种模型在模拟中工作地特别好,它不需要共享状态。Monte Carlo模拟器可以用来做期权定价以及检验算法交易等类型的各种参数的模拟。

我们将特别考虑Threading库和Multiprocessing库。

推荐阅读:

《Python开发技术详解》.( 周伟,宗杰).[高清PDF扫描版+随书视频+代码]

Python脚本获取Linux系统信息

在Ubuntu下用Python搭建桌面算法交易研究环境

Python并发

当Python初学者探索多线程的代码为了计算密集型优化时,问得最多的问题之一是:”当我用多线程的时候,为什么我的程序变慢了?“

在多核机器上,我们期望多线程的代码使用额外的核,从而提高整体性能。不幸的是,主Python解释器(CPython)的内部并不是真正的多线程,是通过一个全局解释锁(GIL)来进行处理的。

GIL是必须的,因为Python解释器是非线程安全的。这意味着当从线程内尝试安全的访问Python对象的时候将有一个全局的强制锁。在任何时候,仅仅一个单一的线程能够获取Python对象或者C API。每100个字节的Python指令解释器将重新获取锁,这(潜在的)阻塞了I/0操作。因为锁,CPU密集型的代码使用线程库时,不会获得性能的提高,但是当它使用多处理库时,性能可以获得提高。

并行库的实现

现在,我们将使用上面所提到的两个库来实现对一个“小”问题进行并发优化。

线程库

上面我们提到: 运行CPython解释器的Python不会支持通过多线程来实现多核处理。不过,Python确实有一个线程库。那么如果我们(可能)不能使用多个核心进行处理,那么使用这个库能取得什么好处呢?

许多程序,尤其是那些与网络通信或者数据输入/输出(I/O)相关的程序,都经常受到网络性能或者输入/输出(I/O)性能的限制。这样Python解释器就会等待哪些从诸如网络地址或者硬盘等“远端”数据源读写数据的函数调用返回。因此这样的数据访问比从本地内存或者CPU缓冲区读取数据要慢的多。

因此,如果许多数据源都是通过这种方式访问的,那么就有一种方式对这种数据访问进行性能提高,那就是对每个需要访问的数据项都产生一个线程 。

举个例子,假设有一段Python代码,它用来对许多站点的URL进行扒取。再假定下载每个URL所需时间远远超过计算机CPU对它的处理时间,那么仅使用一个线程来实现就会大大地受到输入/输出(I/O)性能限制。

通过给每个下载资源生成一个新的线程,这段代码就会并行地对多个数据源进行下载,在所有下载都结束的时候再对结果进行组合。这就意味着每个后续下载都不会等待前一个网页下载完成。此时,这段代码就受收到客户/服务端带宽的限制。

不过,许多与财务相关的应用都受到CPU性能的限制,这是因为这样的应用都是高度集中式的对数字进行处理。这样的应用都会进行大型线性代数计算或者数值的随机统计,比如进行蒙地卡罗模拟统计。所以只要对这样的应用使用Python和全局解释锁(GIL),此时使用Python线程库就不会有任何性能的提高。

Python实现

下面这段依次添加数字到列表的“玩具”代码,举例说明了多线程的实现。每个线程创建一个新的列表并随机添加一些数字到列表中。这个已选的“玩具”例子对CPU的消耗非常高。

下面的代码概述了线程库的接口,但是他不会比我们用单线程实现的速度更快。当我们对下面的代码用多处理库时,我们将看到它会显著的降低总的运行时间。

让我们检查一下代码是怎样工作的。首先我们导入threading库。然后我们创建一个带有三个参数的函数list_append。第一个参数count定义了创建列表的大小。第二个参数id是“工作”(用于我们输出debug信息到控制台)的ID。第三个参数out_list是追加随机数的列表。

__main__函数创建了一个107的size,并用两个threads执行工作。然后创建了一个jobs列表,用于存储分离的线程。threading.Thread对象将list_append函数作为参数,并将它附加到jobs列表。

最后,jobs分别开始并分别“joined”。join()方法阻塞了调用的线程(例如主Python解释器线程)直到线程终止。在打印完整的信息到控制台之前,确认所有的线程执行完成。

# thread_test.pyimport randomimport threadingdef list_append(count, id, out_list):
 """
 Creates an empty list and then appends a
 random number to the list 'count' number
 of times. A CPU-heavy operation!
 """
 for i in range(count):
  out_list.append(random.random())if __name__ == "__main__":
 size = 10000000  # Number of random numbers to add
 threads = 2  # Number of threads to create

 # Create a list of jobs and then iterate through
 # the number of threads appending each thread to
 # the job list
 jobs = []
 for i in range(0, threads):
  out_list = list()
  thread = threading.Thread(target=list_append(size, i, out_list))
  jobs.append(thread)

 # Start the threads (i.e. calculate the random number lists)
 for j in jobs:
  j.start()

 # Ensure all of the threads have finished
 for j in jobs:
  j.join()

 print "List processing complete."

我们能在控制台中调用如下的命令time这段代码

time python thread_test.py

将产生如下的输出

List processing complete.
real    0m2.003s
user    0m1.838s
sys    0m0.161s

注意user时间和sys时间相加大致等于real时间。这表明我们使用线程库没有获得性能的提升。我们期待real时间显著的降低。在并发编程的这些概念中分别被称为CPU时间和挂钟时间(wall-clock time)

更多详情见请继续阅读下一页的精彩内容:

  • 1
  • 2
  • 下一页

相关内容