初解Python并发
Jul 16, 2017
7 minute read

前言

昨天(2017年7月15日)偶然看到这个视频,看完之后豁然开朗,忍不住要分享给大家。

并发Demo

视频中举了个例子,用socket写了个迷你服务,只有一个作用,计算Fibonacci……

# -*- coding: utf-8 -*-
# server.py
import socket


def fib(n):
    if n <= 2:
        return 1
    else:
        return fib(n - 1) + fib(n - 2)


def server(addr, port):
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind((addr, port))
    sock.listen(5)
    while True:
        client, addr = sock.accept()
        print('Connection', addr)
        handler(client)


def handler(client):
    while True:
        req = client.recv(100)
        if not req:
            break
        n = int(req)
        result = fib(n)
        resp = str(result).encode('ascii') + b'\n'
        client.send(resp)
    print("Connection Closed")


server("", 25000)

当然目前这个只是一个堵塞版本,可以用以下代码测试一下

# -*- coding: utf-8 -*-
# nc.py
import socket

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', 25000))

while True:
    n = input()
    sock.send(n.encode('ascii'))
    resp = sock.recv(100)
    print(resp.decode('ascii'), end='')

该迷你服务一次只能处理一个请求,我们只需要稍微修改一下代码即可以改变现状

# -*- coding: utf-8 -*-
# server.py
import socket
from threading import Thread


def fib(n):
    if n <= 2:
        return 1
    else:
        return fib(n - 1) + fib(n - 2)


def server(addr, port):
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind((addr, port))
    sock.listen(5)
    while True:
        client, addr = sock.accept()
        print('Connection', addr)
        Thread(target=handler, args=(client,), daemon=True).start()


def handler(client):
    while True:
        req = client.recv(100)
        if not req:
            break
        n = int(req)
        result = fib(n)
        resp = str(result).encode('ascii') + b'\n'
        client.send(resp)
    print("Connection Closed")


server("", 25000)

每次收到请求,就会开启一个新的线程来处理。用以下代码来测试其性能。

# -*- coding: utf-8 -*-
# perf.py
import socket
import time
from threading import Thread

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', 25000))

counter = 0


def monitor():
    global counter
    while True:
        time.sleep(1)
        print(counter, 'reqs/sec')
        counter = 0


Thread(target=monitor, daemon=True).start()

while True:
    sock.send(b'1')
    resp = sock.recv(100)
    counter += 1

每当多开启一个测试端,各个测试端每秒请求数量或多或少都会受到影响。而且当我们用nc.py作一个需要大量计算资源请求,比如 fib(40) ,结果更是惨不忍睹。若是我们把线程换成进程。

# -*- coding: utf-8 -*-
# server.py
import socket
from threading import Thread
from concurrent.futures import ProcessPoolExecutor as Pool

pool = Pool(10)


def fib(n):
    if n <= 2:
        return 1
    else:
        return fib(n - 1) + fib(n - 2)


def server(addr, port):
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind((addr, port))
    sock.listen(5)
    while True:
        client, addr = sock.accept()
        print('Connection', addr)
        Thread(target=handler, args=(client, ), daemon=True).start()


def handler(client):
    while True:
        req = client.recv(100)
        if not req:
            break
        n = int(req)
        future = pool.submit(fib, n)
        result = future.result()
        resp = str(result).encode('ascii') + b'\n'
        client.send(resp)
    print("Connection Closed")

# 使用ProcessPoolExecutor必须用
# if __name__ == '__main__':
#     main()
# 因为子进程会导入__main__模块,若不用此方法避免子进程重复执行主进程的代码,
# 可能会导致一些奇怪的结果
if __name__ == '__main__':
    server("", 25000)

当然,我们要使用 ProcessPoolExecutor 来执行 CPU密集型任务 ;用 ThreadPoolExecutor 更适合 I/O密集型任务 。这样突然来了一个需要 fib(40) 的请求,也不会造成很大的影响。不过各个测试端的每秒请求数量,会比之前降了一个数量级,这是因为解释器与子进程沟通的消耗导致的。

深入了解

视频来到高潮部分,David Beazley讲Python圈子里有个特别火的观点,就是用 threading 非常糟糕,不要用!所以他把代码恢复到原来的版本。

# -*- coding: utf-8 -*-
# server.py
import socket


def fib(n):
    if n <= 2:
        return 1
    else:
        return fib(n - 1) + fib(n - 2)


def server(addr, port):
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind((addr, port))
    sock.listen(5)
    while True:
        client, addr = sock.accept()
        print('Connection', addr)
        handler(client)


def handler(client):
    while True:
        req = client.recv(100)
        if not req:
        break
        n = int(req)
        result = fib(n)
        resp = str(result).encode('ascii') + b'\n'
        client.send(resp)
    print("Connection Closed")

if __name__ == '__main__':
    server("", 25000)

如果不用 threading 模块,如何做到并发呢?用Python中的生成器特性,使用yield关键字。

>>> def countdown(n):
...     while n > 0:
...         yield n
... n -= 1
>>> cd = countdown(3)
>>> next(cd)
>>> 3
>>> for i in cd:
...     print(i)
2
1

>>> next(cd)
Traceback (most recent call last):
    File "<stdin>", line 1, in <module>
StopIteration

当解释器运行到yield会返回结果,并挂起该函数,等待下次激活。利用这个特性,稍微修改一下代码

# -*- coding: utf-8 -*-
# server.py
import socket
import select
from collections import deque

tasks = deque()
recv_wait = dict() # Mapping sockets -> tasks (generators)
send_wait = dict()


def run():
    while any([tasks, recv_wait, send_wait]):
        while not tasks:
            # 没有正在运行的task
            # 等待I/O操作
            can_recv, can_send, _ = select.select(recv_wait, send_wait, [])
            for sock in can_recv:
                tasks.append(recv_wait.pop(sock))
            for sock in can_send:
                tasks.append(send_wait.pop(sock))

        task = tasks.popleft()
        try:
            why, what = next(task)
            if why == 'recv':
                recv_wait[what] = task
            elif why == 'send':
                send_wait[what] = task
            else:
                raise RuntimeError('需要参数!')
        except StopIteration:
            print('Task Done!')


def fib(n):
    if n <= 2:
        return 1
    else:
        return fib(n - 1) + fib(n - 2)


def server(addr, port):
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind((addr, port))
    sock.listen(5)
    while True:
        yield 'recv', sock
        client, addr = sock.accept()
        print('Connection', addr)
        tasks.append(handler(client)) # 添加新任务


def handler(client):
    while True:
        yield 'recv', client
        req = client.recv(100)
        if not req:
            break
        n = int(req)
        result = fib(n)
        resp = str(result).encode('ascii') + b'\n'
        yield 'send', client
        client.send(resp)
    print("Connection Closed")


if __name__ == '__main__':
    tasks.append(server("", 25000)) # 添加初始任务
    run()

在需要进行I/O操作的地方使用 yield ,挂起该函数,用 select 查看socket的I/O操作是否准备好。准备好就唤醒该函数,执行I/O操作。使用perf.py或者nc.py测试一下,发现我们成功地实现了并发,没用借助 threading 模块。但我们遇到CPU密集型任务,该代码的表现就乏善可陈。我们尝试用 ProcessPoolExecutor 来解决这个问题

# -*- coding: utf-8 -*-
# server.py
import socket
import select
from collections import deque
from concurrent.futures import ProcessPoolExecutor as Pool

pool = Pool(10)
tasks = deque()
recv_wait = dict() # Mapping sockets -> tasks (generators)
send_wait = dict()
future_wait = dict()

future_notify, future_event = socket.socketpair()


def future_done(future):
    tasks.append(future_wait.pop(future))
    future_notify.send(b'x')


def future_monitor():
    while True:
        yield 'recv', future_event
        future_event.recv(100)


def run():
    while any([tasks, recv_wait, send_wait]):
        while not tasks:
            # 没有正在运行的task
            # 等待I/O操作
            can_recv, can_send, _ = select.select(recv_wait, send_wait, [])
            for sock in can_recv:
                tasks.append(recv_wait.pop(sock))
            for sock in can_send:
                tasks.append(send_wait.pop(sock))

        task = tasks.popleft()
        try:
            why, what = next(task)
            if why == 'recv':
                recv_wait[what] = task
            elif why == 'send':
                send_wait[what] = task
            elif why == 'future':
                future_wait[what] = task
                what.add_done_callback(future_done)
            else:
                raise RuntimeError('需要参数!')
        except (StopIteration, ConnectionResetError):
            print('Task Done!')


def fib(n):
    if n <= 2:
        return 1
    else:
        return fib(n - 1) + fib(n - 2)


def server(addr, port):
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind((addr, port))
    sock.listen(5)
    while True:
        yield 'recv', sock
        client, addr = sock.accept()
        print('Connection', addr)
        tasks.append(handler(client))


def handler(client):
    while True:
        yield 'recv', client
        req = client.recv(100)
        if not req:
            break
        n = int(req)
        future = pool.submit(fib, n)
        yield 'future', future
        result = future.result()
        resp = str(result).encode('ascii') + b'\n'
        yield 'send', client
        client.send(resp)
        print("Closed")


if __name__ == '__main__':
    tasks.append(server("", 25000))
    tasks.append(future_monitor())
    run()

因为future对象不是文件描述符,无法用 select 来查看任务是否完成。但有更好的方法,future对象支持添加callback函数。用 socket.socketpair 配合 future.add_done_callback 函数来解决这个问题。接下来有个小问题, yield 在代码中很突兀,有什么办法把它包装起来?!?

# -*- co```ding: utf-8 -*-
# server.py
import socket
import select
from collections import deque
from concurrent.futures import ProcessPoolExecutor as Pool

pool = Pool(10)
tasks = deque()
recv_wait = dict() # Mapping sockets -> tasks (generators)
send_wait = dict()
future_wait = dict()

future_notify, future_event = socket.socketpair()


def future_done(future):
    tasks.append(future_wait.pop(future))
    future_notify.send(b'x')


def future_monitor():
    while True:
        yield 'recv', future_event
        future_event.recv(100)


def run():
    while any([tasks, recv_wait, send_wait]):
        while not tasks:
            # 没有正在运行的task
            # 等待I/O操作
            can_recv, can_send, _ = select.select(recv_wait, send_wait, [])
            for sock in can_recv:
                tasks.append(recv_wait.pop(sock))
            for sock in can_send:
                tasks.append(send_wait.pop(sock))

        task = tasks.popleft()
        try:
            why, what = next(task)
            if why == 'recv':
                recv_wait[what] = task
            elif why == 'send':
                send_wait[what] = task
            elif why == 'future':
                future_wait[what] = task
                what.add_done_callback(future_done)
            else:
                raise RuntimeError('需要参数!')
        except (StopIteration, ConnectionResetError):
            print('Task Done!')


def fib(n):
    if n <= 2:
        return 1
    else:
        return fib(n - 1) + fib(n - 2)


class AsyncSocket(object):
    def __init__(self, sock):
        self.sock = sock

    def recv(self, maxsize):
        yield 'recv', self.sock
        return self.sock.recv(maxsize)

    def send(self, data):
        yield 'send', self.sock
        return self.sock.send(data)

    def accept(self):
        yield 'recv', self.sock
        client, addr = self.sock.accept()
        return AsyncSocket(client), addr

    def __getattr__(self, name):
        return getattr(self.sock, name)


def server(addr, port):
    sock = AsyncSocket(socket.socket(socket.AF_INET, socket.SOCK_STREAM))
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind((addr, port))
    sock.listen(5)
    while True:
        client, addr = yield from sock.accept()
        print('Connection', addr)
        tasks.append(handler(client))


def handler(client):
    while True:
        req = yield from client.recv(100)
        if not req:
            break
        n = int(req)
        future = pool.submit(fib, n)
        yield 'future', future
        result = future.result()
        resp = str(result).encode('ascii') + b'\n'
        yield from client.send(resp)
    print("Connection Closed")


if __name__ == '__main__':
    tasks.append(server("", 25000))
    tasks.append(future_monitor())
    run()

yield from 是用来代理子生成器的,一种简单方式来控制子生成器的行为。在Python3.5之后,这种写法就可以用更简单的 async/await 来代替。

async/await

看了这个视频后,就看了pycon上关于Asynchronous的视频。研究了一下3.5后的协程语法,把代码修改成以下的样子。

# -*- coding: utf-8 -*-
# server.py
import asyncio
import logging
import sys
from concurrent.futures import ProcessPoolExecutor as Executor

logger = logging.getLogger('SERVER')
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
executor = Executor(4)


def fib(n: int) -> int:
    if n <= 2:
        return 1
    else:
        return fib(n - 1) + fib(n - 2)


async def handler(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
    addr = writer.get_extra_info('peername')
    logger.info('Connection<%s:%d> Created', *addr)
    loop = asyncio.get_event_loop()
    while True:
        req = await reader.readline()
        if not req.strip():
            break
        logger.debug('Connection<%s:%d> Recv %r', *addr, req)
        try:
            n = int(req)
            future = loop.run_in_executor(executor, fib, n)
            result = await asyncio.gather(future, return_exceptions=True)
            if isinstance(result, Exception):
                break
            resp = str(result[0]).encode('ascii') + b'\n'
            logger.debug('Connection<%s:%d> Send %r', *addr, resp)
            writer.write(resp)
            await writer.drain()
        except Exception as e:
            writer.write(b'[Error]\n')
            await writer.drain()
            logger.exception(e)
            continue
    writer.close()
    logger.info('Connection<%s:%d> Done', *addr)


def Server(host: str, port: int):
    loop = asyncio.get_event_loop()
    coro = asyncio.start_server(handler, host, port, loop=loop)
    server = loop.run_until_complete(coro)

    print('Serving on http://%s:%d' % (host, port))
    try:
        loop.run_forever()
    finally:
        executor.shutdown()
        server.close()
        try:
            loop.run_until_complete(server.wait_closed())
        finally:
            pass
        loop.close()


def main():
    Server('localhost', 12345)


if __name__ == '__main__':
    main()

是不是看起简洁了许多?刚学体会也不是很深,不是很了解。但觉得代码可读性变好了。

小结

CPU密集型任务 IO密集型任务
multiprocessing threading
concurrent.futures.ProcessPoolExecutor concurrent.futures.ThreadPoolExecutor
async / await

以上关于Fibonacci的例子是为了更好地理解并发,不是一个很好的算法。

def fib(n):
    a , b = 1, 1
    for _ in range(n):
        a, b = b, a + b
    return a

使用以上版本大大加快了fib的计算速度,就没必要用多进程了。 其实在多线程中最好不要开多进程,因为可能会出现死锁,导致程序出现不可预知的错误。因此,必须慎重对待多进程和多线程混搭使用。

参考链接

扩展阅读



comments powered by Disqus