Python 的协程

前言

最近在看部分Python源码时, 发现了async 这个关键字. 查了一下发现了Python中的协程.

协程这玩意, 在GO中我用过啊, 简单说, 就是一个轻量级的线程嘛, 由语言自己来实现不同协程的调度. 想着Python中可能也是差不多的东西吧. 但是我Google搜了一下, 前面的说明都给出了下面的例子:

def consumer():
    r = ''
    while True:
        n = yield r
        if not n:
            return
        print('[CONSUMER] Consuming %s...' % n)
        r = '200 OK'

def produce(c):
    c.send(None)
    n = 0
    while n < 5:
        n = n + 1
        print('[PRODUCER] Producing %s...' % n)
        r = c.send(n)
        print('[PRODUCER] Consumer return: %s' % r)
    c.close()

c = consumer()
produce(c)

这这这, 这是协程咩? 这不就是一个生成器咩?

不过你想一下生成器的特征:

  • 需要的时候, 返回一个值, 并将当前函数内部的状态保存
  • 下次执行的时候, 会恢复上次执行的环境并继续执行

能够保存运行状态并在下次执行时恢复, 说他是协程貌似也没什么问题哈. 而且, Python中的协程是跑在同一个线程中, 也就是串行执行的, 所以也不需要加锁.

协程

看了上面的生成器, 是不是有一种想法? 这玩意不就是个生成器么? 为什么要叫协程? 没错, 就是生成器.

上面也说了, 协程的特点, 就是可以停止当前函数的执行并保存当前状态, 并在下次执行时进行恢复. 对于单线程的运行来说, 什么时候需要这种操作呢? 等待的时候. 比如等待文件打开, 等待锁, 等待网络返回等等. 这时程序运行着也没什么事做, 就可以先去做其他事情, 等这边好了再继续回来执行. 下面以单纯的sleep举例.

自己实现

我们如何使用原生的yield生成器来实现一个任务队列呢? 我随便写了一下:

import time

def yield_sleep(delay):
    start_time = time.time()
    while True:
        if time.time() - start_time < delay:
            yield
        else:
            break

def hello(name):
    print(f"{name}-1-{time.strftime('%X')}")
    yield from yield_sleep(1)
    print(f"{name}-2-{time.strftime('%X')}")

# 创建任务
tasks = [hello('one'), hello('two')]
while True:
    if len(tasks) <= 0:
        break
    # 复制数组, 下方删除时能够正常遍历
    copy_tasks = tasks[:]
    for task in copy_tasks:
        try:
            next(task)
        except StopIteration:
            # 迭代完成, 删除元素
            tasks.remove(task)

image-20211008222517989

简单解释一下, 我们在每次任务调用yield临时返回时, 进行任务的轮换. 这样, 原本需要2s 执行的操作, 一共只需要1s 即可. (这里为了说明效果, 只是简单实现了一下. )

哎, 如此一来, 所有支持yield生成器的语言, 其实都是支持coroutine的呀, 比如我大 PHP, 嘿嘿.

asyncio

Python 3.4中, 引入了asyncio包. 将异步 IO 的操作进行了封装.

简单说, asyncio内部, 维护了一个任务队列, 在函数执行yield让出执行权时, 切换到下一个任务继续执行. 嗯, 大概就是这样.

import asyncio
import time

@asyncio.coroutine
def hello(name):
    print(f"{name}-1-{time.strftime('%X')}")
    yield from asyncio.sleep(1)
    print(f"{name}-2-{time.strftime('%X')}")

# 获取事件队列
loop = asyncio.get_event_loop()
# 并发执行任务
tasks = [hello('one'), hello('two')]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

image-20211005124103834

可以看到, 函数在首次执行到yield时, 进行了中断并将执行权让了出去.

  1. 通过asyncio.get_event_loop()方法, 生成一个事件队列
  2. 调用loop.run_until_complete方法运行指定的任务队列
  3. asyncio.sleep函数和我们上面实现的yield_sleep是一样的效果. 在yield中断时, 会从队列中找到另一个任务并执行

需要注意的一点, Python的协程是需要手动让出执行权的. 这点与Go不同. 也就是说, 发生协程切换的时机为:

  1. 任务主动让出执行权
  2. 任务执行完成

举个例子(将上面的例子简单修改):

import asyncio
import time

@asyncio.coroutine
def hello(name, delay, not_yield):
    print(f"{name}-1-{time.strftime('%X')}")
    # 占用协程等待
    if not_yield:
        time.sleep(delay)
    else:
        yield from asyncio.sleep(1)
    print(f"{name}-2-{time.strftime('%X')}")

# 获取事件队列
loop = asyncio.get_event_loop()
# 并发执行任务
tasks = [hello('one', 2, False), hello('two', 5, True)]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

image-20211005124241888

可以看到, 尽管协程1只是想等待1s, 但因为协程2一直占用这执行权没有放出来, 故协程1等到协程2执行结束后才再次获得执行权, 既5s 后

async/await

Python 3.5中, 增加了async/await语法糖.

简单来说, 将上面的@asyncio.coroutine换成async, 将yield from换成await就行了. 其他不变. 替换后, 上面的代码就变成了这样, 意思是一样的.

import asyncio
import time

async def hello(name):
    print(f"{name}-1-{time.strftime('%X')}")
    await asyncio.sleep(1)
    print(f"{name}-2-{time.strftime('%X')}")

# 获取事件队列
loop = asyncio.get_event_loop()
# 并发执行任务
tasks = [hello('one'), hello('two')]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

通用方式

前面说的方式是简单介绍下实现, 协程在Python中较为平常的使用方式如下:

import asyncio
import time

async def hello(name):
    print(f"{name}-1-{time.strftime('%X')}")
    await asyncio.sleep(1)
    print(f"{name}-2-{time.strftime('%X')}")

async def main():
    """创建任务并执行"""
    # 方案1: 添加并执行任务
    task1 = asyncio.create_task(hello('one'))
    task2 = asyncio.create_task(hello('tow'))
    # 调用`asyncio.create_task`方法调用后, 任务就已经存在调度队列中了
    # 即使没有手动`await`等待, 在协程切换时也会被执行. 我们添加`await`只是为了等待所有任务完成
    await task1
    await task2
    # 等待其中所有协程执行完成, 与分开 await 相同
    await asyncio.wait({task1, task2})
    # 等待协程执行. 若指定时间后还没有执行完毕, 则会抛出异常
    await asyncio.wait_for(task1, 1)

    # 方案2: 批量执行协程. 方案1的简化版
    # 返回值为所有协程的集合
    await asyncio.gather(
        hello('one'),
        hello('tow')
    )

    """获取任务信息"""
    # 获取当前执行的任务
    current_task = asyncio.current_task()
    # 获取事件循环中所有未完成的任务
    all_task = asyncio.all_tasks()
    # 关闭一个协程, 不再执行
    task1.cancel()
    # 获取任务的结果. 若协程被关闭了, 会抛出异常
    ret = task1.result()

asyncio.run(main())

创建并执行一个协程任务, 这样就不用操心事件队列的问题了, 我们在main函数中要做的就是创建任务并执行.

注意, 使用关键字async定义的方法, 是一个协程对象, 不能单纯调用hello('1234'), 其实现是一个装饰器, 返回的是一个coroutine对象.


对于Python中的协程, 差不多就是这么个东西了. 简单说, 就是在执行 IO 耗时操作时, 将执行权暂时让出, 以活动更好的执行效率.

但是问题来了, 对于这种耗时操作, 我们总不能每次都自己实现一遍吧. 勿慌, 其实很多异步操作, 都已经有了实现, 具体可见: https://github.com/aio-libs, 列出来当前已经实现异步操作的大部分库. 当然了, 系统asyncio库中也有部分简单的异步操作实现.

以后再写耗时操作的时候, 就可以用上协程了, 比如爬虫. 爬虫在发起请求的时候, 是需要等待返回的, 这时候同时发起 n 个请求, 就可以极大的提高爬虫的效率.

订阅评论
提醒
guest
0 评论
内联反馈
查看所有评论
0
希望看到您的想法,请发表评论。x