深圳幻海软件技术有限公司 欢迎您!

Python实用技巧大任务切分

2023-02-27

 今天来说说,Python中的任务切分。以爬虫为例,从一个存url的txt文件中,读取其内容,我们会获取一个url列表。我们把这一个url列表称为大任务。列表切分在不考虑内存占用的情况下,我们对上面的大任务进行一个切分。比如我们将大任务切分成的小任务是每秒最多只访问5个URL。 

 今天来说说,Python 中的任务切分。以爬虫为例,从一个存 url 的 txt 文件中,读取其内容,我们会获取一个 url 列表。我们把这一个 url 列表称为大任务。

列表切分在

不考虑内存占用的情况下,我们对上面的大任务进行一个切分。比如我们将大任务切分成的小任务是每秒最多只访问5个URL。

 

import os 
import time 
 
CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) 
 
def read_file(): 
    file_path = os.path.join(CURRENT_DIR, "url_list.txt"
    with open(file_path, "r", encoding="utf-8"as fs: 
        result = [i.strip() for i in fs.readlines()] 
    return result 
 
def fetch(url): 
    print(url) 
 
def run(): 
    max_count = 5 
    url_list = read_file() 
    for index in range(0, len(url_list), max_count): 
        start = time.time() 
        fetch(url_list[index:index + max_count]) 
        end = time.time() - start 
        if end < 1: 
            time.sleep(1 - end
 
 
if __name__ == '__main__'
    run() 
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.

关键代码都在for循环里,首先我们通过声明range的第三个参数,该参数指定迭代的步长为5,这样每次index增加都是以5为基数,即0,5,10。。。

然后我们对url_list做切片,每次取其五个元素,这五个元素会随着index的增加不断的在改变,如果最后不够五个了,按照切片的特性这个时候就会有多少取多少了,不会造成索引超下标的问题。

随着url列表的增加,我们会发现内存的占用也在提高了。这个时候我们就需要对代码进行修改了,我们知道生成器是比较节省内存的空间的,修改之后代码变成,下面的这样。

生成器切分

 

# -*- coding: utf-8 -*- 
# @时间 : 2019-11-23 23:47 
# @作者 : 陈祥安 
# @文件名 : g.py 
# @公众号: Python学习开发 
import os 
import time 
from itertools import islice 
 
CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) 
 
 
def read_file(): 
    file_path = os.path.join(CURRENT_DIR, "url_list.txt"
    with open(file_path, "r", encoding="utf-8"as fs: 
        for i in fs: 
            yield i.strip() 
 
 
def fetch(url): 
    print(url) 
 
 
def run(): 
    max_count = 5 
    url_gen = read_file() 
    while True
        url_list = list(islice(url_gen, 0, max_count)) 
        if not url_list: 
            break 
        start = time.time() 
        fetch(url_list) 
        end = time.time() - start 
        if end < 1: 
            time.sleep(1 - end
 
 
if __name__ == '__main__'
    run() 
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.

首先,我们修改了文件读取的方式,把原来读列表的形式,改为了生成器的形式。这样我们在调用该文件读取方法的时候大大节省了内存。

然后就是对上面for循环进行改造,因为生成器的特性,这里不适合使用for进行迭代,因为每一次的迭代都会消耗生成器的元素,通过使用itertools的islice对url_gen进行切分,islice是生成器的切片,这里我们每次切分出含有5个元素的生成器,因为生成器没有__len__方法所以,我们将其转为列表,然后判断列表是否为空,就可以知道迭代是否该结束了。

修改之后的代码,不管是性能还是节省内存上都大大的提高。读取千万级的文件不是问题。

除此之外,在使用异步爬虫的时候,也许会用到异步生成器切片。下面就和大家讨论,异步生成器切分的问题

异步生成器切分

首先先来看一个简单的异步生成器。

我们知道调用下面的代码会得到一个生成器

 

def foo(): 
    for i in range(20): 
        yield i 
  • 1.
  • 2.
  • 3.

如果在def前面加一个async,那么在调用的时候它就是个异步生成器了。

完整示例代码如下

 

import asyncio 
async def foo(): 
    for i in range(20): 
        yield i 
 
 
async def run(): 
    async_gen = foo() 
    async for i in async_gen: 
        print(i) 
 
 
if __name__ == '__main__'
    asyncio.run(run()) 
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.

关于async for的切分有点复杂,这里推荐使用aiostream模块,使用之后代码改为下面这样

 

import asyncio 
from aiostream import stream 
 
async def foo(): 
    for i in range(22): 
        yield i 
 
 
async def run(): 
    index = 0 
    limit = 5 
 
    while True
        xs = stream.iterate(foo()) 
        ys = xs[index:index + limit] 
        t = await stream.list(ys) 
        if not t: 
            break 
        print(t) 
        index += limit 
 
 
if __name__ == '__main__'
    asyncio.run(run()) 
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.