异步爬虫

本文最后更新于:1 年前

实例引入

比如在这里我们看这么一个示例网站:https://static4.scrape.cuiqingcai.com/,

1

这个网站在内部实现返回响应的逻辑的时候特意加了 5 秒的延迟,也就是说如果我们用 requests 来爬取其中某个页面的话,至少需要 5 秒才能得到响应。

另外这个网站的逻辑结构在之前的案例中我们也分析过,其内容就是电影数据,一共 100 部,每个电影的详情页是一个自增 ID,从 1~100,比如 https://static4.scrape.cuiqingcai.com/detail/43 就代表第 43 部电影,如图所示。

2

下面我们来用 requests 写一个遍历程序,直接遍历 1~100 部电影数据,代码实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import requests
import logging
import time
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s: %(message)s')
TOTAL_NUMBER = 100
BASE_URL = 'https://static4.scrape.cuiqingcai.com/detail/{id}'
start_time = time.time()
for id in range(1, TOTAL_NUMBER + 1):
url = BASE_URL.format(id=id)
logging.info('scraping %s', url)
response = requests.get(url)
end_time = time.time()
logging.info('total time %s seconds', end_time - start_time)

这里我们直接用循环的方式构造了 100 个详情页的爬取,使用的是 requests 单线程,在爬取之前和爬取之后记录下时间,最后输出爬取了 100 个页面消耗的时间。

由于每个页面都至少要等待 5 秒才能加载出来,因此 100 个页面至少要花费 500 秒的时间,总的爬取时间最终为 513.6 秒,将近 9 分钟。

这个在实际情况下是很常见的,有些网站本身加载速度就比较慢,稍慢的可能 1~3 秒,更慢的说不定 10 秒以上才可能加载出来。如果我们用 requests 单线程这么爬取的话,总的耗时是非常多的。此时如果我们开了多线程或多进程来爬取的话,其爬取速度确实会成倍提升,但有没有更好的解决方案呢?

本课时我们就来了解一下使用异步执行方式来加速的方法,此种方法对于 IO 密集型任务非常有效。如将其应用到网络爬虫中,爬取效率甚至可以成百倍地提升。

基本了解

在了解异步协程之前,我们首先得了解一些基础概念,如阻塞和非阻塞、同步和异步、多进程和协程。

阻塞

阻塞状态指程序未得到所需计算资源时被挂起的状态。程序在等待某个操作完成期间,自身无法继续处理其他的事情,则称该程序在该操作上是阻塞的。

常见的阻塞形式有:网络 I/O 阻塞、磁盘 I/O 阻塞、用户输入阻塞等。阻塞是无处不在的,包括 CPU 切换上下文时,所有的进程都无法真正处理事情,它们也会被阻塞。如果是多核 CPU 则正在执行上下文切换操作的核不可被利用。

非阻塞

程序在等待某操作过程中,自身不被阻塞,可以继续处理其他的事情,则称该程序在该操作上是非阻塞的。

非阻塞并不是在任何程序级别、任何情况下都可以存在的。仅当程序封装的级别可以囊括独立的子程序单元时,它才可能存在非阻塞状态。

非阻塞的存在是因为阻塞存在,正因为某个操作阻塞导致的耗时与效率低下,我们才要把它变成非阻塞的。

同步

不同程序单元为了完成某个任务,在执行过程中需靠某种通信方式以协调一致,我们称这些程序单元是同步执行的。

例如购物系统中更新商品库存,需要用“行锁”作为通信信号,让不同的更新请求强制排队顺序执行,那更新库存的操作是同步的。

简言之,同步意味着有序。

异步

为完成某个任务,不同程序单元之间过程中无需通信协调,也能完成任务的方式,不相关的程序单元之间可以是异步的。

例如,爬虫下载网页。调度程序调用下载程序后,即可调度其他任务,而无需与该下载任务保持通信以协调行为。不同网页的下载、保存等操作都是无关的,也无需相互通知协调。这些异步操作的完成时刻并不确定。

简言之,异步意味着无序。

多进程

多进程就是利用 CPU 的多核优势,在同一时间并行地执行多个任务,可以大大提高执行效率。

协程

协程,英文叫作 Coroutine,又称微线程、纤程,协程是一种用户态的轻量级线程。

协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此协程能保留上一次调用时的状态,即所有局部状态的一个特定组合,每次过程重入时,就相当于进入上一次调用的状态。

协程本质上是个单进程,协程相对于多进程来说,无需线程上下文切换的开销,无需原子操作锁定及同步的开销,编程模型也非常简单。

我们可以使用协程来实现异步操作,比如在网络爬虫场景下,我们发出一个请求之后,需要等待一定的时间才能得到响应,但其实在这个等待过程中,程序可以干许多其他的事情,等到响应得到之后才切换回来继续处理,这样可以充分利用 CPU 和其他资源,这就是协程的优势。

协程用法

接下来,我们来了解下协程的实现,从 Python 3.4 开始,Python 中加入了协程的概念,但这个版本的协程还是以生成器对象为基础的,在 Python 3.5 则增加了 async/await,使得协程的实现更加方便。

Python 中使用协程最常用的库莫过于 asyncio,所以本文会以 asyncio 为基础来介绍协程的使用。

首先我们需要了解下面几个概念。

  • event_loop:事件循环,相当于一个无限循环,我们可以把一些函数注册到这个事件循环上,当满足条件发生的时候,就会调用对应的处理方法。
  • coroutine:中文翻译叫协程,在 Python 中常指代为协程对象类型,我们可以将协程对象注册到时间循环中,它会被事件循环调用。我们可以使用 async 关键字来定义一个方法,这个方法在调用时不会立即被执行,而是返回一个协程对象。
  • task:任务,它是对协程对象的进一步封装,包含了任务的各个状态。
  • future:代表将来执行或没有执行的任务的结果,实际上和 task 没有本质区别。

另外我们还需要了解 async/await 关键字,它是从 Python 3.5 才出现的,专门用于定义协程。其中,async 定义一个协程,await 用来挂起阻塞方法的执行。

定义协程

首先我们来定义一个协程,体验一下它和普通进程在实现上的不同之处,代码如下:

1
2
3
4
5
6
7
8
9
import asyncio
async def execute(x):
print('Number:', x)
coroutine = execute(1)
print('Coroutine:', coroutine)
print('After calling execute')
loop = asyncio.get_event_loop()
loop.run_until_complete(coroutine)
print('After calling loop')

首先我们引入了 asyncio 这个包,这样我们才可以使用 async 和 await,然后我们使用 async 定义了一个 execute 方法,方法接收一个数字参数,方法执行之后会打印这个数字。

随后我们直接调用了这个方法,然而这个方法并没有执行,而是返回了一个 coroutine 协程对象。随后我们使用 get_event_loop 方法创建了一个事件循环 loop,并调用了 loop 对象的 run_until_complete 方法将协程注册到事件循环 loop 中,然后启动。最后我们才看到了 execute 方法打印了输出结果。

可见,async 定义的方法就会变成一个无法直接执行的 coroutine 对象,必须将其注册到事件循环中才可以执行。

上面我们还提到了 task,它是对 coroutine 对象的进一步封装,它里面相比 coroutine 对象多了运行状态,比如 running、finished 等,我们可以用这些状态来获取协程对象的执行情况。

在上面的例子中,当我们将 coroutine 对象传递给 run_until_complete 方法的时候,实际上它进行了一个操作就是将 coroutine 封装成了 task 对象,我们也可以显式地进行声明,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
import asyncio
async def execute(x):
print('Number:', x)
return x
coroutine = execute(1)
print('Coroutine:', coroutine)
print('After calling execute')
loop = asyncio.get_event_loop()
task = loop.create_task(coroutine)
print('Task:', task)
loop.run_until_complete(task)
print('Task:', task)
print('After calling loop')

这里我们定义了 loop 对象之后,接着调用了它的 create_task 方法将 coroutine 对象转化为了 task 对象,随后我们打印输出一下,发现它是 pending 状态。接着我们将 task 对象添加到事件循环中得到执行,随后我们再打印输出一下 task 对象,发现它的状态就变成了 finished,同时还可以看到其 result 变成了 1,也就是我们定义的 execute 方法的返回结果。

另外定义 task 对象还有一种方式,就是直接通过 asyncio 的 ensure_future 方法,返回结果也是 task 对象,这样的话我们就可以不借助于 loop 来定义,即使我们还没有声明 loop 也可以提前定义好 task 对象,写法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
import asyncio
async def execute(x):
print('Number:', x)
return x
coroutine = execute(1)
print('Coroutine:', coroutine)
print('After calling execute')
task = asyncio.ensure_future(coroutine)
print('Task:', task)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task:', task)
print('After calling loop')

绑定回调

另外我们也可以为某个 task 绑定一个回调方法,比如我们来看下面的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import asyncio
import requests

async def request():
url = 'https://www.baidu.com'
status = requests.get(url)
return status

def callback(task):
print('Status:', task.result())

coroutine = request()
task = asyncio.ensure_future(coroutine)
task.add_done_callback(callback)
print('Task:', task)

loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task:', task)

在这里我们定义了一个 request 方法,请求了百度,获取其状态码,但是这个方法里面我们没有任何 print 语句。随后我们定义了一个 callback 方法,这个方法接收一个参数,是 task 对象,然后调用 print 方法打印了 task 对象的结果。这样我们就定义好了一个 coroutine 对象和一个回调方法,我们现在希望的效果是,当 coroutine 对象执行完毕之后,就去执行声明的 callback 方法。

那么它们二者怎样关联起来呢?很简单,只需要调用 add_done_callback 方法即可,我们将 callback 方法传递给了封装好的 task 对象,这样当 task 执行完毕之后就可以调用 callback 方法了,同时 task 对象还会作为参数传递给 callback 方法,调用 task 对象的 result 方法就可以获取返回结果了。

实际上不用回调方法,直接在 task 运行完毕之后也可以直接调用 result 方法获取结果,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import asyncio
import requests

async def request():
url = 'https://www.baidu.com'
status = requests.get(url)
return status

coroutine = request()
task = asyncio.ensure_future(coroutine)
print('Task:', task)

loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task:', task)
print('Task Result:', task.result())

多任务协程

上面的例子我们只执行了一次请求,如果我们想执行多次请求应该怎么办呢?我们可以定义一个 task 列表,然后使用 asyncio 的 wait 方法即可执行,看下面的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import asyncio
import requests

async def request():
url = 'https://www.baidu.com'
status = requests.get(url)
return status

tasks = [asyncio.ensure_future(request()) for _ in range(5)]
print('Tasks:', tasks)

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

for task in tasks:
print('Task Result:', task.result())

这里我们使用一个 for 循环创建了五个 task,组成了一个列表,然后把这个列表首先传递给了 asyncio 的 wait() 方法,然后再将其注册到时间循环中,就可以发起五个任务了。最后我们再将任务的运行结果输出出来,

协程实现

前面讲了这么多,又是 async,又是 coroutine,又是 task,又是 callback,但似乎并没有看出协程的优势啊?反而写法上更加奇怪和麻烦了,别急,上面的案例只是为后面的使用作铺垫,接下来我们正式来看下协程在解决 IO 密集型任务上有怎样的优势吧!

上面的代码中,我们用一个网络请求作为示例,这就是一个耗时等待的操作,因为我们请求网页之后需要等待页面响应并返回结果。耗时等待的操作一般都是 IO 操作,比如文件读取、网络请求等等。协程对于处理这种操作是有很大优势的,当遇到需要等待的情况的时候,程序可以暂时挂起,转而去执行其他的操作,从而避免一直等待一个程序而耗费过多的时间,充分利用资源。

为了表现出协程的优势,我们还是拿本课时开始介绍的网站 https://static4.scrape.cuiqingcai.com/ 为例来进行演示,因为该网站响应比较慢,所以我们可以通过爬取时间来直观地感受到爬取速度的提升。

为了让你更好地理解协程的正确使用方法,这里我们先来看看使用协程时常犯的错误,后面再给出正确的例子来对比一下。

首先,我们还是拿之前的 requests 来进行网页请求,接下来我们再重新使用上面的方法请求一遍:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import asyncio
import requests
import time

start = time.time()

async def request():
url = 'https://static4.scrape.cuiqingcai.com/'
print('Waiting for', url)
response = requests.get(url)
print('Get response from', url, 'response', response)


tasks = [asyncio.ensure_future(request()) for _ in range(10)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

end = time.time()
print('Cost time:', end - start)

可以发现和正常的请求并没有什么两样,依然还是顺次执行的,耗时 51 秒,平均一个请求耗时 5 秒,说好的异步处理呢?

其实,要实现异步处理,我们得先要有挂起的操作,当一个任务需要等待 IO 结果的时候,可以挂起当前任务,转而去执行其他任务,这样我们才能充分利用好资源,上面方法都是一本正经的串行走下来,连个挂起都没有,怎么可能实现异步?想太多了。

要实现异步,接下来我们需要了解一下 await 的用法,使用 await 可以将耗时等待的操作挂起,让出控制权。当协程执行的时候遇到 await,时间循环就会将本协程挂起,转而去执行别的协程,直到其他的协程挂起或执行完毕。

所以,我们可能会将代码中的 request 方法改成如下的样子:

1
2
3
4
5
async def request():
url = 'https://static4.scrape.cuiqingcai.com/'
print('Waiting for', url)
response = await requests.get(url)
print('Get response from', url, 'response', response)

仅仅是在 requests 前面加了一个 await,然而执行以下代码,会得到如下报错:

1
2
3
4
5
6
7
8
9
10
11
Waiting for https://static4.scrape.cuiqingcai.com/
Waiting for https://static4.scrape.cuiqingcai.com/
Waiting for https://static4.scrape.cuiqingcai.com/
Waiting for https://static4.scrape.cuiqingcai.com/
...
Task exception was never retrieved
future: <Task finished coro=<request() done, defined at demo.py:8> exception=TypeError("object Response can't be used in 'await' expression")>
Traceback (most recent call last):
File "demo.py", line 11, in request
response = await requests.get(url)
TypeError: object Response can't be used in 'await' expression

这次它遇到 await 方法确实挂起了,也等待了,但是最后却报了这么个错,这个错误的意思是 requests 返回的 Response 对象不能和 await 一起使用,为什么呢?因为根据官方文档说明,await 后面的对象必须是如下格式之一:

  • A native coroutine object returned from a native coroutine function,一个原生 coroutine 对象。
  • A generator-based coroutine object returned from a function decorated with types.coroutine,一个由 types.coroutine 修饰的生成器,这个生成器可以返回 coroutine 对象。
  • An object with an await method returning an iterator,一个包含 await 方法的对象返回的一个迭代器。

可以参见:https://www.python.org/dev/peps/pep-0492/#await-expression。

requests 返回的 Response 不符合上面任一条件,因此就会报上面的错误了。

那么你可能会发现,既然 await 后面可以跟一个 coroutine 对象,那么我用 async 把请求的方法改成 coroutine 对象不就可以了吗?所以就改写成如下的样子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import asyncio
import requests
import time

start = time.time()

async def get(url):
return requests.get(url)

async def request():
url = 'https://static4.scrape.cuiqingcai.com/'
print('Waiting for', url)
response = await get(url)
print('Get response from', url, 'response', response)

tasks = [asyncio.ensure_future(request()) for _ in range(10)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

end = time.time()
print('Cost time:', end - start)

还是不行,它还不是异步执行,也就是说我们仅仅将涉及 IO 操作的代码封装到 async 修饰的方法里面是不可行的!我们必须要使用支持异步操作的请求方式才可以实现真正的异步,所以这里就需要 aiohttp 派上用场了。

使用 aiohttp

aiohttp 是一个支持异步请求的库,利用它和 asyncio 配合我们可以非常方便地实现异步请求操作。

安装方式如下:

1
pip3 install aiohttp

官方文档链接为:https://aiohttp.readthedocs.io/,它分为两部分,一部分是 Client,一部分是 Server,详细的内容可以参考官方文档。

下面我们将 aiohttp 用上来,将代码改成如下样子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import asyncio
import aiohttp
import time

start = time.time()

async def get(url):
session = aiohttp.ClientSession()
response = await session.get(url)
await response.text()
await session.close()
return response

async def request():
url = 'https://static4.scrape.cuiqingcai.com/'
print('Waiting for', url)
response = await get(url)
print('Get response from', url, 'response', response)

tasks = [asyncio.ensure_future(request()) for _ in range(10)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

end = time.time()
print('Cost time:', end - start)

成功了!我们发现这次请求的耗时由 51 秒变直接成了 6 秒,耗费时间减少了非常非常多。

代码里面我们使用了 await,后面跟了 get 方法,在执行这 10 个协程的时候,如果遇到了 await,那么就会将当前协程挂起,转而去执行其他的协程,直到其他的协程也挂起或执行完毕,再进行下一个协程的执行。

开始运行时,时间循环会运行第一个 task,针对第一个 task 来说,当执行到第一个 await 跟着的 get 方法时,它被挂起,但这个 get 方法第一步的执行是非阻塞的,挂起之后立马被唤醒,所以立即又进入执行,创建了 ClientSession 对象,接着遇到了第二个 await,调用了 session.get 请求方法,然后就被挂起了,由于请求需要耗时很久,所以一直没有被唤醒。

当第一个 task 被挂起了,那接下来该怎么办呢?事件循环会寻找当前未被挂起的协程继续执行,于是就转而执行第二个 task 了,也是一样的流程操作,直到执行了第十个 task 的 session.get 方法之后,全部的 task 都被挂起了。所有 task 都已经处于挂起状态,怎么办?只好等待了。5 秒之后,几个请求几乎同时都有了响应,然后几个 task 也被唤醒接着执行,输出请求结果,最后总耗时,6 秒!

怎么样?这就是异步操作的便捷之处,当遇到阻塞式操作时,任务被挂起,程序接着去执行其他的任务,而不是傻傻地等待,这样可以充分利用 CPU 时间,而不必把时间浪费在等待 IO 上。

你可能会说,既然这样的话,在上面的例子中,在发出网络请求后,既然接下来的 5 秒都是在等待的,在 5 秒之内,CPU 可以处理的 task 数量远不止这些,那么岂不是我们放 10 个、20 个、50 个、100 个、1000 个 task 一起执行,最后得到所有结果的耗时不都是差不多的吗?因为这几个任务被挂起后都是一起等待的。

理论来说确实是这样的,不过有个前提,那就是服务器在同一时刻接受无限次请求都能保证正常返回结果,也就是服务器无限抗压,另外还要忽略 IO 传输时延,确实可以做到无限 task 一起执行且在预想时间内得到结果。但由于不同服务器处理的实现机制不同,可能某些服务器并不能承受这么高的并发,因此响应速度也会减慢。

在这里我们以百度为例,来测试下并发数量为 1、3、5、10、…、500 的情况下的耗时情况,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import asyncio
import aiohttp
import time


def test(number):
start = time.time()

async def get(url):
session = aiohttp.ClientSession()
response = await session.get(url)
await response.text()
await session.close()
return response

async def request():
url = 'https://www.baidu.com/'
await get(url)

tasks = [asyncio.ensure_future(request()) for _ in range(number)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

end = time.time()
print('Number:', number, 'Cost time:', end - start)

for number in [1, 3, 5, 10, 15, 30, 50, 75, 100, 200, 500]:
test(number)

aiohttp

前面介绍的 asyncio 模块内部实现了对 TCP、UDP、SSL 协议的异步操作,但是对于 HTTP 请求的异步操作来说,我们就需要用到 aiohttp 来实现了。

aiohttp 是一个基于 asyncio 的异步 HTTP 网络模块,它既提供了服务端,又提供了客户端。其中我们用服务端可以搭建一个支持异步处理的服务器,用于处理请求并返回响应,类似于 Django、Flask、Tornado 等一些 Web 服务器。而客户端我们就可以用来发起请求,就类似于 requests 来发起一个 HTTP 请求然后获得响应,但 requests 发起的是同步的网络请求,而 aiohttp 则发起的是异步的。

本课时我们就主要来了解一下 aiohttp 客户端部分的使用。

基本使用

基本实例

首先我们来看一个基本的 aiohttp 请求案例,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
import aiohttp
import asyncio
async def fetch(session, url):
async with session.get(url) as response:
return await response.text(), response.status
async def main():
async with aiohttp.ClientSession() as session:
html, status = await fetch(session, 'https://www.baidu.com')
print(f'html: {html[:100]}...')
print(f'status: {status}')
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

我们可以看到其请求方法的定义和之前有了明显的区别,主要有如下几点:

  • 首先在导入库的时候,我们除了必须要引入 aiohttp 这个库之外,还必须要引入 asyncio 这个库,因为要实现异步爬取需要启动协程,而协程则需要借助于 asyncio 里面的事件循环来执行。除了事件循环,asyncio 里面也提供了很多基础的异步操作。
  • 异步爬取的方法的定义和之前有所不同,在每个异步方法前面统一要加 async 来修饰。
  • with as 语句前面同样需要加 async 来修饰,在 Python 中,with as 语句用于声明一个上下文管理器,能够帮我们自动分配和释放资源,而在异步方法中,with as 前面加上 async 代表声明一个支持异步的上下文管理器。
  • 对于一些返回 coroutine 的操作,前面需要加 await 来修饰,如 response 调用 text 方法,查询 API 可以发现其返回的是 coroutine 对象,那么前面就要加 await;而对于状态码来说,其返回值就是一个数值类型,那么前面就不需要加 await。所以,这里可以按照实际情况处理,参考官方文档说明,看看其对应的返回值是怎样的类型,然后决定加不加 await 就可以了。
  • 最后,定义完爬取方法之后,实际上是 main 方法调用了 fetch 方法。要运行的话,必须要启用事件循环,事件循环就需要使用 asyncio 库,然后使用 run_until_complete 方法来运行。

注意在 Python 3.7 及以后的版本中,我们可以使用 asyncio.run(main())
来代替最后的启动操作,不需要显式声明事件循环,run 方法内部会自动启动一个事件循环。但这里为了兼容更多的 Python
版本,依然还是显式声明了事件循环。

URL 参数设置

对于 URL 参数的设置,我们可以借助于 params 参数,传入一个字典即可,示例如下:

1
2
3
4
5
6
7
8
9
import aiohttp
import asyncio
async def main():
params = {'name': 'germey', 'age': 25}
async with aiohttp.ClientSession() as session:
async with session.get('https://httpbin.org/get', params=params) as response:
print(await response.text())
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())

其他请求类型

另外 aiohttp 还支持其他的请求类型,如 POST、PUT、DELETE 等等,这个和 requests 的使用方式有点类似,示例如下:

1
2
3
4
5
6
session.post('http://httpbin.org/post', data=b'data')
session.put('http://httpbin.org/put', data=b'data')
session.delete('http://httpbin.org/delete')
session.head('http://httpbin.org/get')
session.options('http://httpbin.org/get')
session.patch('http://httpbin.org/patch', data=b'data')

POST 数据

对于 POST 表单提交,其对应的请求头的 Content-type 为 application/x-www-form-urlencoded,我们可以用如下方式来实现,代码示例如下:

1
2
3
4
5
6
7
8
9
import aiohttp
import asyncio
async def main():
data = {'name': 'germey', 'age': 25}
async with aiohttp.ClientSession() as session:
async with session.post('https://httpbin.org/post', data=data) as response:
print(await response.text())
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())

对于 POST JSON 数据提交,其对应的请求头的 Content-type 为 application/json,我们只需要将 post 方法的 data 参数改成 json 即可,代码示例如下:

1
2
3
4
5
async def main():
data = {'name': 'germey', 'age': 25}
async with aiohttp.ClientSession() as session:
async with session.post('https://httpbin.org/post', json=data) as response:
print(await response.text())

响应字段

对于响应来说,我们可以用如下的方法分别获取响应的状态码、响应头、响应体、响应体二进制内容、响应体 JSON 结果,代码示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
import aiohttp
import asyncio
async def main():
data = {'name': 'germey', 'age': 25}
async with aiohttp.ClientSession() as session:
async with session.post('https://httpbin.org/post', data=data) as response:
print('status:', response.status)
print('headers:', response.headers)
print('body:', await response.text())
print('bytes:', await response.read())
print('json:', await response.json())
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())

超时设置

对于超时的设置,我们可以借助于 ClientTimeout 对象,比如这里我要设置 1 秒的超时,可以这么来实现:

1
2
3
4
5
6
7
8
9
import aiohttp
import asyncio
async def main():
timeout = aiohttp.ClientTimeout(total=1)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get('https://httpbin.org/get') as response:
print('status:', response.status)
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())

并发限制

由于 aiohttp 可以支持非常大的并发,比如上万、十万、百万都是能做到的,但这么大的并发量,目标网站是很可能在短时间内无法响应的,而且很可能瞬时间将目标网站爬挂掉。所以我们需要控制一下爬取的并发量。

在一般情况下,我们可以借助于 asyncio 的 Semaphore 来控制并发量,代码示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import asyncio
import aiohttp
CONCURRENCY = 5
URL = 'https://www.baidu.com'
semaphore = asyncio.Semaphore(CONCURRENCY)
session = None
async def scrape_api():
async with semaphore:
print('scraping', URL)
async with session.get(URL) as response:
await asyncio.sleep(1)
return await response.text()
async def main():
global session
session = aiohttp.ClientSession()
scrape_index_tasks = [asyncio.ensure_future(scrape_api()) for _ in range(10000)]
await asyncio.gather(*scrape_index_tasks)
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())

在这里我们声明了 CONCURRENCY 代表爬取的最大并发量为 5,同时声明爬取的目标 URL 为百度。接着我们借助于 Semaphore 创建了一个信号量对象,赋值为 semaphore,这样我们就可以用它来控制最大并发量了。怎么使用呢?我们这里把它直接放置在对应的爬取方法里面,使用 async with 语句将 semaphore 作为上下文对象即可。这样的话,信号量可以控制进入爬取的最大协程数量,最大数量就是我们声明的 CONCURRENCY 的值。

在 main 方法里面,我们声明了 10000 个 task,传递给 gather 方法运行。倘若不加以限制,这 10000 个 task 会被同时执行,并发数量太大。但有了信号量的控制之后,同时运行的 task 的数量最大会被控制在 5 个,这样就能给 aiohttp 限制速度了。

在这里,aiohttp 的基本使用就介绍这么多,更详细的内容还是推荐你到官方文档查阅,链接:https://docs.aiohttp.org/。

爬取实战

上面我们介绍了 aiohttp 的基本用法之后,下面我们来根据一个实例实现异步爬虫的实战演练吧。

本次我们要爬取的网站是:https://dynamic5.scrape.cuiqingcai.com/

这是一个书籍网站,整个网站包含了数千本书籍信息,网站是 JavaScript 渲染的,数据可以通过 Ajax 接口获取到,并且接口没有设置任何反爬措施和加密参数,另外由于这个网站比之前的电影案例网站数据量大一些,所以更加适合做异步爬取。

本课时我们要完成的目标有:

  • 使用 aiohttp 完成全站的书籍数据爬取。
  • 将数据通过异步的方式保存到 MongoDB 中。

在本课时开始之前,请确保你已经做好了如下准备工作:

  • 安装好了 Python(最低为 Python 3.6 版本,最好为 3.7 版本或以上),并能成功运行 Python 程序。
  • 了解了 Ajax 爬取的一些基本原理和模拟方法。
  • 了解了异步爬虫的基本原理和 asyncio 库的基本用法。
  • 了解了 aiohttp 库的基本用法。
  • 安装并成功运行了 MongoDB 数据库,并安装了异步存储库 motor。

页面分析

在之前我们讲解了 Ajax 的基本分析方法,本课时的站点结构和之前 Ajax 分析的站点结构类似,都是列表页加详情页的结构,加载方式都是 Ajax,所以我们能轻松分析到如下信息:

  • 列表页的 Ajax 请求接口格式为:https://dynamic5.scrape.cuiqingcai.com/api/book/?limit=18&offset={offset},limit 的值即为每一页的书的个数,offset 的值为每一页的偏移量,其计算公式为 offset = limit * (page - 1) ,如第 1 页 offset 的值为 0,第 2 页 offset 的值为 18,以此类推。
  • 列表页 Ajax 接口返回的数据里 results 字段包含当前页 18 本书的信息,其中每本书的数据里面包含一个字段 id,这个 id 就是书本身的 ID,可以用来进一步请求详情页。
  • 详情页的 Ajax 请求接口格式为:https://dynamic5.scrape.cuiqingcai.com/api/book/{id},id 即为书的 ID,可以从列表页的返回结果中获取。

实现思路

其实一个完善的异步爬虫应该能够充分利用资源进行全速爬取,其思路是维护一个动态变化的爬取队列,每产生一个新的 task 就会将其放入队列中,有专门的爬虫消费者从队列中获取 task 并执行,能做到在最大并发量的前提下充分利用等待时间进行额外的爬取处理。

但上面的实现思路整体较为烦琐,需要设计爬取队列、回调函数、消费者等机制,需要实现的功能较多。由于我们刚刚接触 aiohttp 的基本用法,本课时也主要是了解 aiohttp 的实战应用,所以这里我们将爬取案例的实现稍微简化一下。

在这里我们将爬取的逻辑拆分成两部分,第一部分为爬取列表页,第二部分为爬取详情页。由于异步爬虫的关键点在于并发执行,所以我们可以将爬取拆分为两个阶段:

  • 第一阶段为所有列表页的异步爬取,我们可以将所有的列表页的爬取任务集合起来,声明为 task 组成的列表,进行异步爬取。
  • 第二阶段则是拿到上一步列表页的所有内容并解析,拿到所有书的 id 信息,组合为所有详情页的爬取任务集合,声明为 task 组成的列表,进行异步爬取,同时爬取的结果也以异步的方式存储到 MongoDB 里面。

因为两个阶段的拆分之后需要串行执行,所以可能不能达到协程的最佳调度方式和资源利用情况,但也差不了很多。但这个实现思路比较简单清晰,代码实现也比较简单,能够帮我们快速了解 aiohttp 的基本使用。


异步爬虫
http://example.com/2022/09/26/异步爬虫/
作者
Crush
发布于
2022年9月26日
更新于
2023年7月9日
许可协议