python中的异步IO

协程

协程,又称微线程,纤程。英文名Coroutine。

协程是啥

首先我们得知道协程是啥?协程其实可以认为是比线程更小的执行单元。 为啥说他是一个执行单元,因为他自带CPU上下文。这样只要在合适的时机, 我们可以把一个协程 切换到另一个协程。 只要这个过程中保存或恢复CPU上下文那么程序还是可以运行的。

通俗的理解:在一个线程中的某个函数,可以在任何地方保存当前函数的一些临时变量等信息,然后切换到另外一个函数中执行,注意不是通过调用函数的方式做到的,并且切换的次数以及什么时候再切换到原来的函数都由开发者自己确定。

协程和线程差异

那么这个过程看起来比线程差不多。其实不然, 线程切换从系统层面远不止保存和恢复 CPU上下文这么简单。 操作系统为了程序运行的高效性每个线程都有自己缓存Cache等等数据,操作系统还会帮你做这些数据的恢复操作。 所以线程的切换非常耗性能。但是协程的切换只是单纯的操作CPU的上下文,所以一秒钟切换个上百万次系统都抗的住。

协程的问题

但是协程有一个问题,就是系统并不感知,所以操作系统不会帮你做切换。 那么谁来帮你做切换?让需要执行的协程更多的获得CPU时间才是问题的关键。

目前的协程框架一般都是设计成 1:N 模式。所谓 1:N 就是一个线程作为一个容器里面放置多个协程。 那么谁来适时的切换这些协程?答案是有协程自己主动让出CPU,也就是每个协程池里面有一个调度器, 这个调度器是被动调度的。意思就是他不会主动调度。而且当一个协程发现自己执行不下去了(比如异步等待网络的数据回来,但是当前还没有数据到), 这个时候就可以由这个协程通知调度器,这个时候执行到调度器的代码,调度器根据事先设计好的调度算法找到当前最需要CPU的协程。 切换这个协程的CPU上下文把CPU的运行权交个这个协程,直到这个协程出现执行不下去需要等等的情况,或者它调用主动让出CPU的API之类,触发下一次调度。

其实是有问题的,假设这个线程中有一个协程是CPU密集型的他没有IO操作, 也就是自己不会主动触发调度器调度的过程,那么就会出现其他协程得不到执行的情况, 所以这种情况下需要程序员自己避免。这是一个问题,假设业务开发的人员并不懂这个原理的话就可能会出现问题。

协程的好处

在IO密集型的程序中由于IO操作远远慢于CPU的操作,所以往往需要CPU去等IO操作。 同步IO下系统需要切换线程,让操作系统可以在IO过程中执行其他的东西。 这样虽然代码是符合人类的思维习惯但是由于大量的线程切换带来了大量的性能的浪费,尤其是IO密集型的程序。

所以人们发明了异步IO。就是当数据到达的时候触发我的回调。来减少线程切换带来性能损失。 但是这样的坏处也是很大的,主要的坏处就是操作被 “分片” 了,代码写的不是 “一气呵成” 这种。 而是每次来段数据就要判断 数据够不够处理哇,够处理就处理吧,不够处理就在等等吧。这样代码的可读性很低,其实也不符合人类的习惯。

但是协程可以很好解决这个问题。比如 把一个IO操作 写成一个协程。当触发IO操作的时候就自动让出CPU给其他协程。要知道协程的切换很轻的。 协程通过这种对异步IO的封装 既保留了性能也保证了代码的容易编写和可读性。在高IO密集型的程序下很好。但是高CPU密集型的程序下没啥好处。

应用举例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import time


def A():
while True:
print("----A---")
yield
time.sleep(0.5)


def B(c):
while True:
print("----B---")
c.__next__()
time.sleep(0.5)


if __name__ == '__main__':
a = A()
B(a)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def consumer():
r = ''
while True:
n = yield r
if not n:
return
print('[CONSUMER] Consuming %s...' % n)
r = '200 OK'


def produce(c):
c.send(None)
n = 0
while n < 5:
n = n + 1
print('[PRODUCER] Producing %s...' % n)
r = c.send(n)
print('[PRODUCER] Consumer return: %s' % r)
c.close()


c = consumer()
produce(c)

注意到consumer函数是一个generator,把一个consumer传入produce后:

  1. 首先调用c.send(None)启动生成器;
  2. 然后,一旦生产了东西,通过c.send(n)切换到consumer执行;
  3. consumer通过yield拿到消息,处理,又通过yield把结果传回;
  4. produce拿到consumer处理的结果,继续生产下一条消息;
  5. produce决定不生产了,通过c.close()关闭consumer,整个过程结束。

整个流程无锁,由一个线程执行,produce和consumer协作完成任务,所以称为“协程”,而非线程的抢占式多任务。

协程-greenlet版

为了更好使用协程来完成多任务,python中的greenlet模块对其封装,从而使得切换任务变的更加简单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from greenlet import greenlet
import time


def test1():
while True:
print("---A---")
gr2.switch()
time.sleep(0.5)


def test2():
while True:
print("---B--")
gr1.switch()
time.sleep(0.5)


gr1 = greenlet(test1)
gr2 = greenlet(test2)

# 切换到gr1中运行
gr1.switch()

协程-gevent

greenlet已经实现了协程,但是这个还的人工切换,是不是觉得太麻烦了,不要捉急,python还有一个比greenlet更强大的并且能够自动切换任务的模块gevent

其原理是当一个greenlet遇到IO(指的是input output 输入输出,比如网络、文件操作等)操作时,比如访问网络,就自动切换到其他的greenlet,等到IO操作完成,再在适当的时候切换回来继续执行。

由于IO操作非常耗时,经常使程序处于等待状态,有了gevent为我们自动切换协程,就保证总有greenlet在运行,而不是等待IO。

gevent的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import gevent


def f(n):
for i in range(n):
print("gevent is %s,i is %s " % (gevent.getcurrent(), i))


g1 = gevent.spawn(f, 5)
g2 = gevent.spawn(f, 5)
g3 = gevent.spawn(f, 5)
g1.join()
g2.join()
g3.join()


gevent is <Greenlet at 0x1070a2848: f(5)>,i is 0
gevent is <Greenlet at 0x1070a2848: f(5)>,i is 1
gevent is <Greenlet at 0x1070a2848: f(5)>,i is 2
gevent is <Greenlet at 0x1070a2848: f(5)>,i is 3
gevent is <Greenlet at 0x1070a2848: f(5)>,i is 4
gevent is <Greenlet at 0x1070a2948: f(5)>,i is 0
gevent is <Greenlet at 0x1070a2948: f(5)>,i is 1
gevent is <Greenlet at 0x1070a2948: f(5)>,i is 2
gevent is <Greenlet at 0x1070a2948: f(5)>,i is 3
gevent is <Greenlet at 0x1070a2948: f(5)>,i is 4
gevent is <Greenlet at 0x1070a2a48: f(5)>,i is 0
gevent is <Greenlet at 0x1070a2a48: f(5)>,i is 1
gevent is <Greenlet at 0x1070a2a48: f(5)>,i is 2
gevent is <Greenlet at 0x1070a2a48: f(5)>,i is 3
gevent is <Greenlet at 0x1070a2a48: f(5)>,i is 4

gevent切换执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import gevent


def f(n):
for i in range(n):
print("gevent is %s,i is %s " % (gevent.getcurrent(), i))
gevent.sleep(1)


g1 = gevent.spawn(f, 5)
g2 = gevent.spawn(f, 5)
g3 = gevent.spawn(f, 5)
g1.join()
g2.join()
g3.join()


gevent is <Greenlet at 0x10a0d2848: f(5)>,i is 0
gevent is <Greenlet at 0x10a0d2948: f(5)>,i is 0
gevent is <Greenlet at 0x10a0d2a48: f(5)>,i is 0
gevent is <Greenlet at 0x10a0d2848: f(5)>,i is 1
gevent is <Greenlet at 0x10a0d2948: f(5)>,i is 1
gevent is <Greenlet at 0x10a0d2a48: f(5)>,i is 1
gevent is <Greenlet at 0x10a0d2848: f(5)>,i is 2
gevent is <Greenlet at 0x10a0d2948: f(5)>,i is 2
gevent is <Greenlet at 0x10a0d2a48: f(5)>,i is 2
gevent is <Greenlet at 0x10a0d2848: f(5)>,i is 3
gevent is <Greenlet at 0x10a0d2948: f(5)>,i is 3
gevent is <Greenlet at 0x10a0d2a48: f(5)>,i is 3
gevent is <Greenlet at 0x10a0d2848: f(5)>,i is 4
gevent is <Greenlet at 0x10a0d2948: f(5)>,i is 4
gevent is <Greenlet at 0x10a0d2a48: f(5)>,i is 4

gevent版-TCP服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import gevent

from gevent import socket, monkey

monkey.patch_all()


def handle_request(conn):
while True:
data = conn.recv(1024)
if not data:
conn.close()
break
print("recv:", data)
conn.send(data)


def server(port):
s = socket.socket()
s.bind(('', port))
s.listen(5)
while True:
cli, addr = s.accept()
gevent.spawn(handle_request, cli)


if __name__ == '__main__':
server(7788)

asyncio

asyncio的编程模型就是一个消息循环。我们从asyncio模块中直接获取一个EventLoop的引用,然后把需要执行的协程扔到EventLoop中执行,就实现了异步IO。

用asyncio实现Hello world代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import asyncio

@asyncio.coroutine
def hello():
print("Hello world!")
# 异步调用asyncio.sleep(1):
r = yield from asyncio.sleep(1)
print("Hello again!")

# 获取EventLoop:
loop = asyncio.get_event_loop()
# 执行coroutine
loop.run_until_complete(hello())
loop.close()

@asyncio.coroutine把一个generator标记为coroutine类型,然后,我们就把这个coroutine扔到EventLoop中执行。

hello()会首先打印出Hello world!,然后,yield from语法可以让我们方便地调用另一个generator。由于asyncio.sleep()也是一个coroutine,所以线程不会等待asyncio.sleep(),而是直接中断并执行下一个消息循环。当asyncio.sleep()返回时,线程就可以从yield from拿到返回值(此处是None),然后接着执行下一行语句。

把asyncio.sleep(1)看成是一个耗时1秒的IO操作,在此期间,主线程并未等待,而是去执行EventLoop中其他可以执行的coroutine了,因此可以实现并发执行。

小结

asyncio提供了完善的异步IO支持;

异步操作需要在coroutine中通过yield from完成;

多个coroutine可以封装成一组Task然后并发执行。

async/await

为了简化并更好地标识异步IO,从Python 3.5开始引入了新的语法async和await,可以让coroutine的代码更简洁易读。

  1. 把@asyncio.coroutine替换为async;
  2. 把yield from替换为await。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import threading
import asyncio


# @asyncio.coroutine
# def hello():
# print('Hello world! (%s)' % threading.currentThread())
# yield from asyncio.sleep(1)
# print('Hello again! (%s)' % threading.currentThread())

async def hello():
print("Hello world!")
r = await asyncio.sleep(1)
print("Hello again!")


loop = asyncio.get_event_loop()
tasks = [hello(), hello()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

aiohttp

asyncio可以实现单线程并发IO操作。如果仅用在客户端,发挥的威力不大。如果把asyncio用在服务器端,例如Web服务器,由于HTTP连接就是IO操作,因此可以用单线程+coroutine实现多用户的高并发支持。

asyncio实现了TCP、UDP、SSL等协议,aiohttp则是基于asyncio实现的HTTP框架。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import asyncio

from aiohttp import web

async def index(request):
await asyncio.sleep(0.5)
return web.Response(body=b'<h1>Index</h1>')

async def hello(request):
await asyncio.sleep(0.5)
text = '<h1>hello, %s!</h1>' % request.match_info['name']
return web.Response(body=text.encode('utf-8'))

async def init(loop):
app = web.Application(loop=loop)
app.router.add_route('GET', '/', index)
app.router.add_route('GET', '/hello/{name}', hello)
srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000)
print('Server started at http://127.0.0.1:8000...')
return srv

loop = asyncio.get_event_loop()
loop.run_until_complete(init(loop))
loop.run_forever()