用多线程加速爬虫的 lxml 解析
Jul 17, 2019
3 minute read

当编写一个爬虫时,会使用 lxml 库来解析 HTML 文件。当爬取到了一个超大且复杂的 HTML 文件,解析起来十分耗费时间,进而影响了爬虫的正常运行

为了不影响爬虫的正常运行,尝试把解析任务交给线程池来处理

模拟爬虫

先随便准备一个 html 文件,就直接从 lxml 的文档上下载一个

wget https://lxml.de/tutorial.html -O example.html

举一个重解析的模拟爬虫例子,每次请求的 io 时间假定为 0.01s ,请求并发数无上限,总量为 2000 个。

import asyncio
import time
from pathlib import Path

from lxml import html

_html_text = None
_latest_fetched = 0


async def fetch_text():
    # 模拟爬取
    await asyncio.sleep(0.01)
    # 记录最后的一个请求的结束时间
    global _latest_fetched
    _latest_fetched = time.perf_counter()
    return _html_text


def get_title(text):
    doc = html.fromstring(text)
    return doc.xpath("//title/text()")[0]


async def create_task():
    text = await fetch_text()
    title = get_title(text)
    return title


async def main():
    futs = []
    for _ in range(2000):
        fut = asyncio.ensure_future(create_task())
        futs.append(fut)

    await asyncio.gather(*futs)


if __name__ == "__main__":
    _html_text = Path("example.html").read_text()

    t_start = time.perf_counter()
    asyncio.run(main())
    t_end = time.perf_counter()
    print("total:", (t_end - t_start))
    print("latest fetched:", _latest_fetched - t_start)

这段代码执行的结果如下

total: 46.338736165
latest fetched: 38.004939187

模拟爬虫网络请求的耗时为 0.01 s,而且没有设置并发数,按道理说应该在极短的时间内完成所有网络请求

可实际上最后一个请求的结束时间却十分的大,这说明该代码的解析处理已经大大影响了爬虫的正常工作

用 ThreadPoolExecutor 把解析任务交给线程池

@@ -22,16 +22,20 @@ def get_title(text):
     return doc.xpath("//title/text()")[0]


-async def create_task():
+async def create_task(loop):
     text = await fetch_text()
-    title = get_title(text)
+    # 把解析任务交给线程池
+    title = await loop.run_in_executor(None, get_title, text)
     return title


 async def main():
+    loop = asyncio.get_event_loop()
+    # 可以用 loop.set_default_executor 来配置默认的线程池
+
     futs = []
     for _ in range(2000):
-        fut = asyncio.ensure_future(create_task())
+        fut = asyncio.ensure_future(create_task(loop))
         futs.append(fut)

     await asyncio.gather(*futs)

照以上更改后,代码执行的结果如下

total: 46.996482637
latest fetched: 0.087005389

最后一次爬取的结束时间就如我们预料,十分的小。可以看出使用多线程来做解析,就不会影响到主线程的爬虫工作

但由于 GIL ,实际上相当于串行处理,所以解析的速度没有任何起色

在解析时,避免 GIL 的解决方案

而幸运的是如果使用的是包含 1.1 版本以后的 lxml,可以通过以下方法来避开 GIL

@@ -1,4 +1,5 @@
 import asyncio
+import threading
 import time
 from pathlib import Path

@@ -6,6 +7,7 @@ from lxml import html

 _html_text = None
 _latest_fetched = 0
+_local = threading.local()


 async def fetch_text():
@@ -18,7 +20,12 @@ async def fetch_text():


 def get_title(text):
-    doc = html.fromstring(text)
+    # 通过为各个线程,创建属于自己的解析器
+    # 这样当解析时,线程就会释放 GIL
+    if not hasattr(_local, "parser"):
+        _local.parser = html.HTMLParser()
+
+    doc = html.fromstring(text, parser=_local.parser)
     return doc.xpath("//title/text()")[0]

照以上更改后,代码执行的结果如下下

total: 27.775925195000003
latest fetched: 0.425428249

这样的处理相比之前的,由于解析的过程中释放了 GIL,程序就能利用到多核 CPU 的优势,加快了处理速度,减少了运行时间

如果使用 Python 3.7 以上版本, 可以使用 ThreadPoolExecutor.initializer 来创建各个线程的解析器

使用 lxml 时,有哪些情况会释放掉 GIL

通过查看 lxml 的变更历史,看看具体哪些情况会释放掉 GIL

curl -sSL https://github.com/lxml/lxml/raw/master/CHANGES.txt | grep GIL

运行以上命令,得出的结果如下

* Parsing from a plain file object frees the GIL under Python 2.x.
* Running ``iterparse()`` on a plain file (or filename) frees the GIL
* Unlock the GIL for deep copying documents and for XPath()
  * All in-memory operations (tostring, parse(StringIO), etc.) free the GIL
  * File operations (on file names) free the GIL
  * Reading from file-like objects frees the GIL and reacquires it for reading

用 ProcessPoolExecutor 把解析任务交给进程池

用多进程处理的话,会更好的利用多核 CPU 的优势,但为什么不直接用多进程去做处理呢?

这是因为使用多进程处理有以下的缺点

  1. 多进程开销大,比如 IPC 进程间通信会产生额外的开销
  2. lxml 的对象无法 pickle ,这样同个对象就无法与多个进程间共享了

但在合适的应用场景,如果正确使用多进程,就能更快地处理

在第一个多线程例子的代码中,把 executor 置换成 ProcessPoolExecutor 即可使用多进程处理

@@ -1,5 +1,6 @@
 import asyncio
 import time
+from concurrent.futures import ProcessPoolExecutor
 from pathlib import Path

 from lxml import html
@@ -22,20 +23,20 @@ def get_title(text):
     return doc.xpath("//title/text()")[0]


-async def create_task(loop):
+async def create_task(loop, executor):
     text = await fetch_text()
-    # 把解析任务交给线程池
-    title = await loop.run_in_executor(None, get_title, text)
+    # 把解析任务交给进程池
+    title = await loop.run_in_executor(executor, get_title, text)
     return title


 async def main():
     loop = asyncio.get_event_loop()
-    # 可以用 loop.set_default_executor 来配置默认的线程池
+    executor = ProcessPoolExecutor()

     futs = []
     for _ in range(2000):
-        fut = asyncio.ensure_future(create_task(loop))
+        fut = asyncio.ensure_future(create_task(loop, executor))
         futs.append(fut)

     await asyncio.gather(*futs)

照以上更改后,代码执行的结果如下

total: 23.120031775
latest fetched: 0.16141446800000003

这种简单的解析处理,从完成速度来看,用多进程处理的效果比多线程的稍好些

但实际场景由于较复杂,多进程处理的收益可能反而不好

总结

多进程处理的劣势反过来就是多线程处理的优势了

  1. 多线程处理的开销小,收益较大
  2. lxml 的对象能在不同的线程间共享 (但解析的操作最好在同个线程内作处理,这样会更高效)

要结合实际的应用场景,采用合适的方案,才能得到最大的收益

参考

  1. 以上例子的完整代码 | GIST
  2. lxml FAQ - Threading



comments powered by Disqus