目录

Python 协程/异步IO

最近做项目用到 Python 协程/异步,现在总结一下:

导入

1
import asyncio

如果在 IPython 环境里使用,需要加两行:

1
2
3
import nest_asyncio
nest_asyncio.apply()
import asyncio

协程

协程(coroutines)是 Python 异步编程的核心。一个 coroutine 要用 async def 来定义。

1
2
3
async def main():
    # do something
    print("Hello world!")

为了运行它,不能直接调用 main(),而是需要用 run()

1
asyncio.run(main())

如何在一个协程里嵌套另一个协程呢?就像我们在一个函数里嵌套另一个函数那样。你需要使用 await 语句:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
import asyncio

async def coro_1():
    print("I am the coroutine 1.")

async def coro_2():
    print("I am the coroutine 2.")

async def main():
    await coro_1()
    await coro_2()
    print("Hello World!")

asyncio.run(main())

协程嵌套在另一个协程里,需要使用 await 来调用。如果直接写成下面这样:

1
2
3
4
async def main():
    coro_1()
    coro_2()
    print("Hello World!")

则会收到警告如下:

1
2
3
4
5
6
7
01.py:10: RuntimeWarning: coroutine 'coro_1' was never awaited
  coro_1()
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
01.py:11: RuntimeWarning: coroutine 'coro_2' was never awaited
  coro_2()
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
Hello World!

可以看到,coro_1coro_2 并没有被调用。

任务

如果像上一节那样使用协程,那么协程就失去了它的意义。协程真正有用的地方,在于它们可以并发执行。考虑如下代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
import asyncio

async def coro_1():
    print("I am the coroutine 1.")

async def coro_2():
    print("I am the coroutine 2.")

async def main():
    task_1 = asyncio.create_task(coro_1())
    task_2 = asyncio.create_task(coro_2())

    await task_1
    await task_2

    print("Hello World!")

asyncio.run(main())

在这个代码中,我们用 create_taskcoro_1coro_2 分别创建了任务 task_1task_2。它们实际上是被一起执行的。为了验证这一点,我们加入一些延时:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import asyncio
import time

async def coro_1():
    print("I am the coroutine 1.")
    await asyncio.sleep(1)

async def coro_2():
    print("I am the coroutine 2.")
    await asyncio.sleep(1)

async def main():
    st = time.time()

    task_1 = asyncio.create_task(coro_1())
    task_2 = asyncio.create_task(coro_2())

    await task_1
    await task_2

    et = time.time()

    print("Elapsed: %f s" % (et - st))

asyncio.run(main())

运行结果如下:

1
2
3
I am the coroutine 1.
I am the coroutine 2.
Elapsed: 1.001298 s

而如果不使用 create_task,直接 await 两个协程:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
import asyncio
import time

async def coro_1():
    print("I am the coroutine 1.")
    await asyncio.sleep(1)

async def coro_2():
    print("I am the coroutine 2.")
    await asyncio.sleep(1)

async def main():
    st = time.time()

    await coro_1()
    await coro_2()

    et = time.time()

    print("Elapsed: %f s" % (et - st))

asyncio.run(main())

那么结果如下:

1
2
3
I am the coroutine 1.
I am the coroutine 2.
Elapsed: 2.002527 s

也就是说,如果不使用 create_task 来创建任务,那么 coro_2() 实际上是在 coro_1() 执行完之后才开始被执行的。

从这个例子中我们可以看出,await 真正的意思是“等待任务完成”。

另外,也可以使用 gather 来同时运行两个协程:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
import asyncio

async def coro_1():
    print("Coroutine 1 starts")
    await asyncio.sleep(1)
    print("Coroutine 1 finishes")

async def coro_2():
    print("Coroutine 2 starts")
    await asyncio.sleep(2)
    print("Coroutine 2 finishes")

async def main():
    print("Starting main coroutine")
    await asyncio.gather(coro_1(), coro_2())
    print("Main coroutine finished")

asyncio.run(main())

异步循环

现在我们来看看 async for

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
import asyncio
import time
 
async def async_generator():
    for i in range(10):
        await asyncio.sleep(1)
        yield i
 
async def custom_coroutine():
    async for item in async_generator():
        print(item)
        time.sleep(1)
 
st = time.time()
asyncio.run(custom_coroutine())
et = time.time()

print("Elasped: %f s" % (et - st))

运行结果为

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
0
1
2
3
4
5
6
7
8
9
Elasped: 10.013134 s

async foryield 组合使用,可以得到一个异步生成器。实际上,一个异步生成器是定义了 __aiter____anext__ 方法的类实例。下面给出与上述代码等价的实现:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import asyncio
import time

class AsyncGenerator:
    def __init__(self, N):
        self.i = 0
        self.N = N

    def __aiter__(self):
        return self

    async def __anext__(self):
        i = self.i
        if i >= self.N:
            raise StopAsyncIteration
        await asyncio.sleep(1)
        self.i += 1
        return i

async def main():
    async for p in AsyncGenerator(10):
        print(p)

st = time.time()
asyncio.run(main())
et = time.time()
print("Elasped: %f s" % (et - st))    

案例

最后给出一个我在项目中使用到的案例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
sdr = RtlSdr()
sdr.center_freq = 92_700_000 # an FM radio station running at 92.7MHz

async def main():
    async for data in sdr.stream():        
        # perform FM demodulation
        audio = fm_demodulation(data)
        # play the audio
        display(Audio(audio, autoplay=True, rate=48000, normalize=False))

asyncio.run(main())

这是一个 FM 收音机应用,使用 async for 来从收音机读取数据,然后在 async for 循环内对数据进行解调和播放。以这种方式,我们就能实现对音频数据的实时流处理。