此文紧接上文协程和并发课程【部分一~二】。
第三部分:协程和事件分发
协程还能用于处理事件流,很强大吧!这里作者举的例子是处理 XML 文件,采用的方法是 SAX(Simple API for XML)。下边先简介下 SAX 及其用法。
SAX 简介
SAX(Simple API for XML)跟 DOM 一样也是用于访问 XML 文档的接口,原先只是 Java 上的 API,后来也在其他语言中有实现版本,如 Python 版的 SAX。详细介绍可参考 IBM 的一篇博文使用 SAX 处理 XML 文档。
SAX 是一个事件驱动的接口,这使得它能够增量处理大容量 XML 文件而不用担心内存不足,其模型如下:
一个简单的例子为:
1 | # basicsax.py |
这里我把它改进了下,让输出更友好点:
1 | # basicsax2.py |
但作者认为这种底层事件处理有点过于底层和笨拙,所以就引入协程。下边我们来看看怎么用协程来处理 XML 文件。
协程 + SAX
方案
协程加 SAX 的方案如下:
代码如下:
1 | # cosax.py |
过滤版本
过滤版本的太强大了:
1 | # buses.py |
用 Expat 解析 XML
Expat 相当于 sax.xml.parse 版本,但速度更快,而且用法一样:
1 | # coexpat.py |
作者测试了当输入的 XML 文件大小为 30MB 时,SAX 版本的所需时间是 8.37s,而 Expat 版本的则只需要 4.51s,节省了接近一半。
自定义 XML 解析器
作者很有心,还自己定制了一个 C 语言版 XML 解析器,源代码 cxml/cxmlparse.c:
1 |
|
要让该模块可用,需要编译并安装,具体步骤如下(cxml 目录下):
- 如果没有 expat.h 文件,安装 Expat:yum install expat-devel.x86_64
- 编译并安装:python setup.py build_ext –inplace
- 运行测试文件 test:python test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70import cxmlparse
def coroutine(func):
def start(*args, **kwargs):
cr = func(*args, **kwargs)
cr.next()
return cr
return start
def buses_to_dicts(target):
while True:
event, value = (yield)
if event == 'start' and value[0] == 'bus':
busdict = {}
fragments = []
# Capture inner elements of <bus> elements
while True:
event, value = (yield)
if event == 'start':
fragments = []
elif event == 'text':
fragments.append(value)
elif event == 'end':
if value != 'bus':
busdict[value] = "".join(fragments)
else:
target.send(busdict)
break
def filter_on_field(fieldname, value, target):
while True:
d = (yield)
if d.get(fieldname) == value:
target.send(d)
def bus_locations():
while True:
bus = (yield)
print "%(route)s,%(id)s,\"%(direction)s\"," \
"%(latitude)s,%(longitude)s" % bus
def printer():
while True:
item = (yield)
print item
cxmlparse.parse(
"../allroutes.xml",
buses_to_dicts(
filter_on_field(
"route", "22",
filter_on_field(
"direction", "North Bound",
bus_locations()
)
)
)
)
用作者定制版的,处理 30MB 的 XML 文件就只需 2.95s 左右,更加省时。
插曲:用 ElementTree 解析 XML
用 ElementTree 来解析 XML 相对要简单一点(缺点在于处理大容量 XML 文件时容易导致内存不足):
1 | # iterbus.py |
第四部分:数据处理及并发编程
我们可以把协程包装进线程和子进程中,如下图:
多线程版本
1 | # cothread.py |
该例子概览图如下:
作者测试,用多线程后,速度慢了 50%。
子进程版本
1 | # coprocess.py |
1 | # busproc.py |
该例子概览图如下:
这里需要注意的是,用多线程和多进程不一定就能够加快速度,需要仔细测试。
隐藏的威胁
向正在执行的协程发送数据
调用 send() 向正在执行的协程发送数据会导致程序崩溃
,如用多线程同时向一个协程发送数据:
1 | # cocrash.py |
协程之间形成环
如下图,在协程之间形成环也会不行的: