Python协程用法

1.使用 async 和 await 定义协程函数

1
2
3
4
5
async def my_coroutine_test(self):
    print(f"my_coroutine_test start")
    await asyncio.sleep(2)
    print(f"my_coroutine_test finish")
    

调用

1
    asyncio.run(self.my_coroutine_test())

执行情况

1
2
my_coroutine_test start
my_coroutine_test finish

2.创建和调度多个协程任务

1
2
3
4
5
6
7
8

    async def my_task(self, number):
        print(f"task:{number}")

    async def coroutine_tasks(self):

        my_tasks = [self.my_task(i) for i in range(10)]
        await asyncio.gather(*my_tasks)

调用

1
    asyncio.run(self.coroutine_tasks())

执行情况

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
task:0
task:1
task:2
task:3
task:4
task:5
task:6
task:7
task:8
task:9

3.使用 asyncio.wait() 等待一组协程任务完成

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
    async def wait_my_task(self):
        print(f"wait_my_task start")
        await asyncio.sleep(2)
        print(f"wait_my_task end")

    async def wait_coroutine_task(self):
        mytask = [asyncio.create_task(self.wait_my_task()) for _ in range(5)]
        (done, pending) = await asyncio.wait(mytask)
        for task in done:
            print(task.result())

调用

1
2

    asyncio.run(self.wait_coroutine_task())

执行情况

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
wait_my_task start
wait_my_task start
wait_my_task start
wait_my_task start
wait_my_task start
wait_my_task end
wait_my_task end
wait_my_task end
wait_my_task end
wait_my_task end
None
None
None
None
None

4.协程间通过queue进行数据传递

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
    async def coroutine_data_communication(self):
        queue = asyncio.Queue()
        producer = asyncio.create_task(self.producer(queue))
        consumer = asyncio.create_task(self.consumer(queue))
        await asyncio.gather(producer, consumer)
        print(f"---coroutine_data_communication finish---")

    async def producer(self, queue):
        print(f"---produce coroutine---")
        for i in range(10):
            await queue.put(i)
            print(f"produce:{i}")
            await asyncio.sleep(1)

    async def consumer(self, queue):
        print(f"---consumer coroutine---")
        while True:

            if not queue.empty():
                item = await queue.get()
                print(f"Consumed:{item}")
                self.count = 0
            else:
                self.count += 1
                if self.count >= 3:
                    break

            await asyncio.sleep(1)

调用

1
     asyncio.run(self.coroutine_data_communication())

执行情况

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
---produce coroutine---
produce:0
---consumer coroutine---
Consumed:0
produce:1
Consumed:1
produce:2
Consumed:2
produce:3
Consumed:3
produce:4
Consumed:4
produce:5
Consumed:5
produce:6
Consumed:6
produce:7
Consumed:7
produce:8
Consumed:8
produce:9
Consumed:9

5.使用 asyncio.Event 进行协程间的通信

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
    async def event_send(self, event):
        print(f"event event_send---1")
        await asyncio.sleep(2)
        event.set()
        print(f"event event_send---2")

    async def event_rev(self, event):
        print(f"event_rev---1")
        await event.wait()
        print(f"event_rev---2")

    async def event_test(self):
        print(f"event_test---1")
        event = asyncio.Event()
        await asyncio.gather(self.event_send(event), self.event_rev(event))
        print(f"event_test---2")

调用

1
     asyncio.run(self.coroutine_data_communication())

执行情况

1
2
3
4
5
6
7
使用 asyncio.Event 进行通信
event_test---1
event event_send---1
event_rev---1
event event_send---2
event_rev---2
event_test---2