站点图标 Park Lam's 每日分享

Python异步编程神器:asyncer 使用全解析

Python 凭借其简洁的语法和强大的生态体系,在 Web 开发、数据分析、机器学习、自动化脚本等多个领域占据重要地位。从金融科技中高频交易的实时数据处理,到教育科研里大规模数据集的并行计算,再到爬虫领域对海量网页的异步抓取,Python 的高效性与灵活性都得以充分展现。而在 Python 丰富的标准库与第三方库中,异步编程相关工具始终是提升程序性能的关键组件。本文将聚焦于 asyncer 这一轻量级异步任务管理库,深入探讨其在异步编程场景中的核心价值与实践应用。

一、asyncer 库的核心特性与技术架构

1.1 用途与应用场景

asyncer 是一个基于 Python 异步框架(asyncio)构建的任务管理库,主要用于简化异步任务的创建、调度与结果处理流程。其核心价值体现在以下场景:

1.2 工作原理与技术架构

asyncer 基于 asyncio 的事件循环(Event Loop)机制,通过以下组件实现任务管理:

1.3 优缺点分析

优点

缺点

1.4 开源协议(License)

asyncer 采用 MIT 开源协议,允许用户在商业项目中自由使用、修改和分发,但需在衍生作品中保留原作者版权声明。该协议宽松灵活,非常适合开源项目与商业产品的结合。

二、asyncer 的安装与基础使用

2.1 安装方式

通过 pip 包管理工具安装最新稳定版:

pip install asyncer

若需使用开发版功能,可从 GitHub 仓库克隆代码并手动安装:

git clone https://github.com/cooperyu/asyncer.git
cd asyncer
pip install -e .

2.2 基础使用流程

2.2.1 任务定义:使用装饰器创建异步任务

import asyncio
from asyncer import task, TaskPool

# 定义一个带参数的异步任务
@task
async def fetch_data(url: str, timeout: int = 5) -> str:
    """模拟异步网络请求"""
    try:
        await asyncio.sleep(1)  # 模拟请求延迟
        return f"Data from {url}"
    except asyncio.TimeoutError:
        return f"Timeout for {url}"

# 定义一个依赖前置任务的处理任务
@task
async def process_data(raw_data: str) -> str:
    """模拟数据处理"""
    await asyncio.sleep(0.5)
    return f"Processed: {raw_data}"

关键点说明

2.2.2 任务调度:通过任务池管理并发执行

async def main():
    urls = ["https://api.example.com/data1", "https://api.example.com/data2"]

    # 创建任务池(最大并发数为 2)
    async with TaskPool(max_workers=2) as pool:
        # 并发提交多个独立任务
        tasks = [pool.submit(fetch_data, url) for url in urls]

        # 等待所有任务完成并获取结果
        results = await pool.gather(*tasks)

    print("All tasks completed:", results)

执行结果

All tasks completed: ['Data from https://api.example.com/data1', 'Data from https://api.example.com/data2']

流程解析

  1. 通过 TaskPool(max_workers=2) 创建一个最大并发数为 2 的任务池。
  2. 使用 pool.submit(func, *args) 向任务池提交任务,返回 TaskHandle 对象。
  3. pool.gather(*tasks) 阻塞等待所有任务完成,返回按任务提交顺序排列的结果列表。

2.2.3 任务依赖:构建有向无环任务图

async def main():
    # 创建任务池
    async with TaskPool() as pool:
        # 定义前置任务:获取原始数据
        fetch_task1 = pool.submit(fetch_data, "https://api.example.com/data1")
        fetch_task2 = pool.submit(fetch_data, "https://api.example.com/data2")

        # 定义依赖任务:处理两个前置任务的结果
        process_task1 = pool.submit(process_data, fetch_task1.result())
        process_task2 = pool.submit(process_data, fetch_task2.result())

        # 定义最终合并任务:汇总处理结果
        merge_task = pool.submit(lambda a, b: f"Merge: {a}, {b}", 
                                process_task1.result(), 
                                process_task2.result())

        # 执行任务流并获取最终结果
        final_result = await merge_task

    print("Final result:", final_result)

执行结果

Final result: Merge: Processed: Data from https://api.example.com/data1, Processed: Data from https://api.example.com/data2

依赖关系解析

  1. 先执行 fetch_task1fetch_task2(并行执行)。
  2. 待两者完成后,执行 process_task1process_task2(并行执行)。
  3. 最后执行 merge_task

三、高级功能与实战场景

3.1 超时控制与错误处理

3.1.1 为单个任务设置超时时间

@task
async def risky_operation(timeout: int = 3) -> str:
    """模拟可能超时的操作"""
    await asyncio.sleep(timeout + 1)  # 故意超时 1 秒
    return "Operation succeeded"

async def main():
    async with TaskPool() as pool:
        # 提交任务时设置超时时间为 3 秒
        task = pool.submit(risky_operation, timeout=3, timeout=3)

        try:
            result = await task
        except asyncio.TimeoutError:
            result = "Task timed out"

    print("Result:", result)

执行结果

Result: Task timed out

实现原理

3.1.2 全局错误处理钩子

def handle_error(task: "TaskHandle", exc: Exception):
    """全局错误处理函数"""
    print(f"Task {task} failed with error: {exc}")

async def main():
    async with TaskPool(error_hook=handle_error) as pool:
        # 提交一个会抛出异常的任务
        task = pool.submit(lambda: 1 / 0)  # 故意引发除零错误
        await task  # 触发错误处理

执行结果

Task <TaskHandle: lambda> failed with error: division by zero

关键点

3.2 结果流式处理与异步迭代

async def generate_tasks():
    """生成器函数:动态创建任务"""
    for i in range(3):
        yield pool.submit(fetch_data, f"https://api.example.com/data{i+1}")

async def main():
    async with TaskPool() as pool:
        # 异步迭代任务生成器,实时处理结果
        async for task in generate_tasks():
            result = await task
            print(f"Received result: {result}")

执行结果

Received result: Data from https://api.example.com/data1
Received result: Data from https://api.example.com/data2
Received result: Data from https://api.example.com/data3

适用场景

3.3 与同步函数集成

from asyncer import run_in_executor

def sync_heavy_task(n: int) -> int:
    """模拟同步耗时任务"""
    return sum(i for i in range(n))

async def main():
    async with TaskPool() as pool:
        # 将同步函数转换为异步任务提交
        task = pool.submit(run_in_executor, sync_heavy_task, 1000000)
        result = await task
        print("Sync task result:", result)

执行结果

Sync task result: 499999500000

实现原理

四、实战案例:异步爬虫数据抓取与处理

4.1 需求描述

构建一个异步爬虫程序,实现以下功能:

  1. 从给定的 URL 列表中并发抓取网页内容。
  2. 对每个网页内容进行解析,提取标题和正文关键词。
  3. 将结果按指定格式存储到 JSON 文件中。
  4. 支持任务超时控制、错误重试和结果流式处理。

4.2 技术选型

4.3 完整代码实现

import asyncio
import json
from asyncer import task, TaskPool, run_in_executor
from httpx import AsyncClient
from bs4 import BeautifulSoup

# --------------------- 任务定义 ---------------------
@task
async def fetch_page(url: str, client: AsyncClient, timeout: int = 10) -> str:
    """异步抓取网页内容"""
    try:
        response = await client.get(url, timeout=timeout)
        response.raise_for_status()  # 抛出 HTTP 错误
        return response.text
    except Exception as e:
        return f"ERROR: {str(e)}"

@task
def parse_page(html: str) -> dict:
    """解析网页内容,提取标题和关键词"""
    if html.startswith("ERROR"):
        return {"url": "N/A", "title": "抓取失败", "keywords": []}

    soup = BeautifulSoup(html, "html.parser")
    title = soup.title.string.strip() if soup.title else "无标题"

    # 提取正文前 100 字作为关键词(简化逻辑)
    text = soup.get_text(strip=True)
    keywords = text[:100].split()[:20]  # 取前 20 个词

    return {"url": "N/A", "title": title, "keywords": keywords}

@task
def save_to_json(results: list, filename: str = "results.json"):
    """将结果保存到 JSON 文件"""
    with open(filename, "w", encoding="utf-8") as f:
        json.dump(results, f, ensure_ascii=False, indent=2)
    print(f"Results saved to {filename}")

# --------------------- 任务调度 ---------------------
async def main(urls: list[str]):
    async with AsyncClient() as client:
        async with TaskPool(max_workers=5, error_hook=handle_error) as pool:
            results = []

            # 动态生成抓取任务
            fetch_tasks = [
                pool.submit(fetch_page, url, client, timeout=8)
                for url in urls
            ]

            # 异步迭代处理每个抓取结果
            async for fetch_task in fetch_tasks:
                html = await fetch_task
                if html.startswith("ERROR"):
                    print(f"Fetch failed: {html}")
                    continue

                # 创建解析任务(依赖抓取结果)
                parse_task = pool.submit(parse_page, html)
                parsed_data = await parse_task

                # 添加 URL 到结果中(解析函数未获取 URL,此处补充)
                parsed_data["url"] = fetch_task.args[0]  # 获取原始 URL
                results.append(parsed_data)

            # 所有任务完成后,提交保存任务
            save_task = pool.submit(save_to_json, results)
            await save_task

# --------------------- 辅助函数 ---------------------
def handle_error(task: "TaskHandle", exc: Exception):
    """错误处理钩子:记录任务错误"""
    print(f"Task {task} failed: {str(exc)}")

# --------------------- 执行入口 ---------------------
if __name__ == "__main__":
    sample_urls = [
        "https://example.com",
        "https://python.org",
        "https://github.com",
        "https://invalid-url.example",  # 故意设置无效 URL
        "https://httpbin.org/delay/5"  # 模拟延迟 5 秒的 URL
    ]

    asyncio.run(main(sample_urls))

4.4 代码执行流程

  1. 任务提交阶段
  1. 结果处理阶段
  1. 结果保存阶段

4.5 执行结果示例(results.json 部分内容)

“`json
[
{
“url”: “https://example.com”,
“title”: “Example Domain”,
“keywords”: [“This”, “is”, “a”,

关注我,每天分享一个实用的Python自动化工具。

退出移动版