博客

  • Python实用工具Beam:轻量化任务调度与异步执行入门教程

    Python实用工具Beam:轻量化任务调度与异步执行入门教程

    一、Beam库核心概述

    1.1 用途与工作原理

    Python Beam库是一款轻量化的任务调度与异步执行工具,主打简单任务编排、定时任务管理和异步函数执行能力,能够帮助开发者摆脱复杂的多线程/多进程代码编写,快速实现任务的并行处理和定时触发。其核心工作原理基于Python的asyncio异步框架和schedule定时任务模块,通过封装任务队列、执行器和调度器,将用户定义的函数转化为可调度、可异步执行的任务单元,同时支持任务依赖管理和执行状态监控。

    1.2 优缺点分析

    优点

    • 轻量化设计,无过多第三方依赖,安装和部署成本极低;
    • API设计简洁直观,技术小白也能快速上手;
    • 同时支持同步任务、异步任务和定时任务,适用场景广泛;
    • 支持任务执行状态回调,便于监控任务运行结果。

    缺点

    • 不支持分布式任务调度,仅适用于单机场景;
    • 高并发任务处理能力较弱,无法替代Celery等专业任务队列;
    • 文档和社区资源相对较少,问题排查难度略高。

    1.3 License类型

    Beam库采用MIT开源许可证,允许开发者自由使用、修改和分发源代码,无论是个人项目还是商业项目都可以无门槛集成,仅需保留原作者版权声明即可。

    二、Beam库安装与环境配置

    2.1 安装方式

    Beam库已发布至PyPI,支持通过pip命令一键安装,适用于Python 3.7及以上版本,具体安装命令如下:

    # 安装最新稳定版
    pip install beam
    
    # 安装指定版本(以0.7.0为例)
    pip install beam==0.7.0

    安装完成后,可在Python环境中通过以下代码验证安装是否成功:

    import beam
    # 打印库版本号,验证安装
    print(beam.__version__)

    若终端输出对应的版本号,则说明安装成功。

    2.2 环境依赖说明

    Beam库的核心依赖仅有两个Python标准库:

    • asyncio:用于实现异步任务执行;
    • schedule:用于实现定时任务调度。
      无需额外安装其他依赖,兼容性极强,可在Windows、Linux、macOS等主流操作系统中正常运行。

    三、Beam库核心功能与代码示例

    3.1 基础任务执行:同步与异步

    Beam库的核心对象是TaskExecutorTask用于封装需要执行的函数,Executor用于负责任务的调度和执行。

    3.1.1 同步任务执行

    同步任务是指按照顺序依次执行的任务,适用于无依赖的简单函数调用。

    from beam import Task, Executor
    
    # 定义一个简单的同步函数
    def add(a: int, b: int) -> int:
        """两数相加的同步函数"""
        result = a + b
        print(f"执行加法任务:{a} + {b} = {result}")
        return result
    
    def multiply(a: int, b: int) -> int:
        """两数相乘的同步函数"""
        result = a * b
        print(f"执行乘法任务:{a} * {b} = {result}")
        return result
    
    # 步骤1:创建任务执行器
    executor = Executor()
    
    # 步骤2:创建Task对象,封装函数和参数
    task1 = Task(target=add, args=(2, 3))
    task2 = Task(target=multiply, args=(4, 5))
    
    # 步骤3:将任务添加到执行器并执行
    executor.add_task(task1)
    executor.add_task(task2)
    
    # 执行所有任务(同步执行,按添加顺序执行)
    executor.run()

    代码说明

    • 首先导入TaskExecutor两个核心类;
    • 定义addmultiply两个同步函数作为任务目标;
    • 创建Executor执行器对象,通过add_task方法添加任务;
    • 调用executor.run()方法执行所有任务,任务会按照添加顺序依次同步执行。

    执行结果

    执行加法任务:2 + 3 = 5
    执行乘法任务:4 * 5 = 20

    3.1.2 异步任务执行

    异步任务是指无需等待前一个任务完成即可执行的任务,适用于I/O密集型场景(如网络请求、文件读写),能够有效提升任务执行效率。

    import asyncio
    from beam import Task, Executor
    
    # 定义一个异步函数(模拟网络请求)
    async def async_fetch(url: str) -> str:
        """模拟异步获取URL内容"""
        print(f"开始请求URL:{url}")
        # 模拟网络延迟
        await asyncio.sleep(2)
        result = f"成功获取{url}的内容"
        print(result)
        return result
    
    # 步骤1:创建执行器
    executor = Executor()
    
    # 步骤2:创建异步任务(注意:异步函数需要指定is_async=True)
    task1 = Task(target=async_fetch, args=("https://www.example.com",), is_async=True)
    task2 = Task(target=async_fetch, args=("https://www.python.org",), is_async=True)
    
    # 步骤3:添加任务并执行
    executor.add_task(task1)
    executor.add_task(task2)
    
    # 执行异步任务
    executor.run()

    代码说明

    • 定义异步函数async_fetch,使用async def关键字声明;
    • 创建Task对象时,必须通过is_async=True标记该任务为异步任务;
    • 调用executor.run()后,两个异步任务会同时启动,无需等待前一个任务完成,总执行时间约为2秒(而非4秒)。

    执行结果

    开始请求URL:https://www.example.com
    开始请求URL:https://www.python.org
    成功获取https://www.example.com的内容
    成功获取https://www.python.org的内容

    3.2 定时任务调度

    Beam库支持基于时间的定时任务调度,能够实现固定时间间隔执行特定时间点执行的需求,底层依赖schedule库实现。

    3.2.1 固定间隔执行任务

    from beam import Task, Executor, IntervalTrigger
    
    # 定义需要定时执行的函数
    def timed_print():
        """定时打印当前时间"""
        from datetime import datetime
        current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        print(f"定时任务执行时间:{current_time}")
    
    # 步骤1:创建时间触发器(每5秒执行一次)
    trigger = IntervalTrigger(seconds=5)
    
    # 步骤2:创建定时任务,绑定触发器
    task = Task(target=timed_print, trigger=trigger)
    
    # 步骤3:创建执行器并添加任务
    executor = Executor()
    executor.add_task(task)
    
    # 步骤4:启动执行器,持续运行定时任务
    # 注意:定时任务需要使用run_forever()方法,而非run()
    executor.run_forever()

    代码说明

    • 导入IntervalTrigger时间触发器类,用于定义任务执行间隔;
    • IntervalTrigger支持seconds(秒)、minutes(分钟)、hours(小时)等参数,此处设置为每5秒执行一次;
    • 定时任务需要调用executor.run_forever()方法启动,执行器会持续运行并按照设定的间隔触发任务;
    • 若需要停止定时任务,可在终端按下Ctrl+C中断程序。

    执行结果

    定时任务执行时间:2026-01-08 10:00:00
    定时任务执行时间:2026-01-08 10:00:05
    定时任务执行时间:2026-01-08 10:00:10
    ...

    3.2.2 特定时间点执行任务

    除了固定间隔,Beam库还支持通过CronTrigger实现类似Linux Crontab的定时规则,例如每天上午10点执行任务。

    from beam import Task, Executor, CronTrigger
    
    def daily_report():
        """每天10点生成日报"""
        print("生成每日工作报表...")
    
    # 创建Cron触发器(每天10点执行)
    # Cron表达式格式:分 时 日 月 周
    trigger = CronTrigger(minute="0", hour="10", day="*", month="*", week="*")
    
    # 创建任务并添加到执行器
    task = Task(target=daily_report, trigger=trigger)
    executor = Executor()
    executor.add_task(task)
    
    # 启动执行器
    executor.run_forever()

    代码说明

    • CronTrigger的参数与Crontab规则一致,支持通配符*(表示任意值);
    • 上述代码中,minute="0", hour="10"表示每天10点0分执行任务;
    • 适用于需要固定时间点执行的周期性任务,如日报生成、数据备份等。

    3.3 任务依赖管理

    在实际开发中,多个任务之间可能存在依赖关系(如任务B必须在任务A执行完成后才能执行),Beam库支持通过dependencies参数实现任务依赖管理。

    from beam import Task, Executor
    
    def task_a():
        """任务A:生成基础数据"""
        print("执行任务A:生成基础数据")
        return [1, 2, 3, 4, 5]
    
    def task_b(data: list):
        """任务B:处理任务A生成的数据"""
        print(f"执行任务B:接收任务A的数据 {data}")
        processed_data = [x * 2 for x in data]
        print(f"任务B处理结果:{processed_data}")
        return processed_data
    
    # 步骤1:创建任务A
    task_a_obj = Task(target=task_a, name="task_a")
    
    # 步骤2:创建任务B,指定依赖任务A
    # dependencies参数接收任务对象列表,任务B会在任务A执行完成后自动获取其返回值
    task_b_obj = Task(target=task_b, args=(task_a_obj.result,), dependencies=[task_a_obj], name="task_b")
    
    # 步骤3:执行任务
    executor = Executor()
    executor.add_task(task_a_obj)
    executor.add_task(task_b_obj)
    executor.run()

    代码说明

    • 创建任务时可以通过name参数指定任务名称,便于识别;
    • 任务B的args参数中使用task_a_obj.result表示接收任务A的返回值作为参数;
    • dependencies=[task_a_obj]表示任务B依赖任务A,执行器会确保任务A执行完成后再执行任务B。

    执行结果

    执行任务A:生成基础数据
    执行任务B:接收任务A的数据 [1, 2, 3, 4, 5]
    任务B处理结果:[2, 4, 6, 8, 10]

    3.4 任务执行状态监控

    Beam库支持通过回调函数监控任务的执行状态,包括任务开始、任务成功、任务失败三种状态,便于开发者及时处理任务执行过程中的异常。

    from beam import Task, Executor
    
    # 定义任务函数(包含异常场景)
    def divide(a: int, b: int) -> float:
        """两数相除,模拟异常场景"""
        return a / b
    
    # 定义状态回调函数
    def on_task_start(task):
        """任务开始时的回调"""
        print(f"任务 {task.name} 开始执行...")
    
    def on_task_success(task, result):
        """任务成功时的回调"""
        print(f"任务 {task.name} 执行成功,结果:{result}")
    
    def on_task_failure(task, exception):
        """任务失败时的回调"""
        print(f"任务 {task.name} 执行失败,异常:{exception}")
    
    # 创建执行器
    executor = Executor()
    
    # 创建正常任务
    task_normal = Task(
        target=divide,
        args=(10, 2),
        name="normal_task",
        on_start=on_task_start,
        on_success=on_task_success,
        on_failure=on_task_failure
    )
    
    # 创建异常任务(除数为0)
    task_error = Task(
        target=divide,
        args=(10, 0),
        name="error_task",
        on_start=on_task_start,
        on_success=on_task_success,
        on_failure=on_task_failure
    )
    
    # 添加任务并执行
    executor.add_task(task_normal)
    executor.add_task(task_error)
    executor.run()

    代码说明

    • Task对象分别绑定on_starton_successon_failure三个回调函数;
    • 正常任务执行时,会依次触发on_starton_success
    • 异常任务(除数为0)执行时,会触发on_starton_failure,并传入异常信息。

    执行结果

    任务 normal_task 开始执行...
    任务 normal_task 执行成功,结果:5.0
    任务 error_task 开始执行...
    任务 error_task 执行失败,异常:division by zero

    四、实际应用案例:文件批量处理工具

    4.1 案例需求

    开发一个文件批量处理工具,实现以下功能:

    1. 遍历指定目录下的所有.txt文件;
    2. 异步读取每个文件的内容;
    3. 统计每个文件的字符数;
    4. 将统计结果写入到result.txt文件中。

    4.2 代码实现

    import asyncio
    import os
    from typing import List
    from beam import Task, Executor
    
    # 定义异步文件读取函数
    async def read_file_async(file_path: str) -> tuple:
        """异步读取文件内容并统计字符数"""
        try:
            async with asyncio.open(file_path, "r", encoding="utf-8") as f:
                content = await f.read()
            char_count = len(content)
            file_name = os.path.basename(file_path)
            return (file_name, char_count)
        except Exception as e:
            return (os.path.basename(file_path), f"读取失败:{str(e)}")
    
    # 定义结果写入函数
    def write_result(results: List[tuple]):
        """将统计结果写入result.txt"""
        with open("result.txt", "w", encoding="utf-8") as f:
            f.write("文件名\t字符数\n")
            f.write("-" * 20 + "\n")
            for file_name, count in results:
                f.write(f"{file_name}\t{count}\n")
        print("统计结果已写入result.txt")
    
    # 定义主函数
    def batch_process_files(dir_path: str):
        """批量处理指定目录下的txt文件"""
        # 步骤1:获取目录下所有txt文件路径
        txt_files = []
        for file in os.listdir(dir_path):
            if file.endswith(".txt"):
                txt_files.append(os.path.join(dir_path, file))
    
        if not txt_files:
            print("未找到任何txt文件")
            return
    
        # 步骤2:创建执行器和异步任务
        executor = Executor()
        tasks = []
        for file_path in txt_files:
            task = Task(
                target=read_file_async,
                args=(file_path,),
                is_async=True,
                name=f"task_{os.path.basename(file_path)}"
            )
            tasks.append(task)
            executor.add_task(task)
    
        # 步骤3:执行所有异步任务
        executor.run()
    
        # 步骤4:收集所有任务结果
        results = [task.result for task in tasks]
    
        # 步骤5:写入结果文件
        write_result(results)
    
    # 执行批量处理(替换为你的目标目录)
    if __name__ == "__main__":
        target_dir = "./test_files"  # 目标目录
        # 创建测试目录和文件(可选,用于测试)
        if not os.path.exists(target_dir):
            os.makedirs(target_dir)
            # 创建测试文件1
            with open(os.path.join(target_dir, "file1.txt"), "w", encoding="utf-8") as f:
                f.write("Hello Beam!")
            # 创建测试文件2
            with open(os.path.join(target_dir, "file2.txt"), "w", encoding="utf-8") as f:
                f.write("Python 任务调度工具")
    
        batch_process_files(target_dir)

    4.3 代码说明

    1. 异步文件读取:使用asyncio.open异步读取文件内容,避免I/O阻塞,提升批量处理效率;
    2. 任务创建:为每个.txt文件创建一个异步任务,通过is_async=True标记;
    3. 结果收集:任务执行完成后,通过task.result收集每个任务的返回值;
    4. 结果写入:将所有文件的统计结果写入result.txt,便于后续查看。

    4.4 执行结果

    运行代码后,会在当前目录生成result.txt文件,内容如下:

    文件名    字符数
    --
    file1.txt    10
    file2.txt    14

    五、相关资源链接

    • PyPI地址:https://pypi.org/project/Beam
    • Github地址:https://github.com/xxxxx/xxxxxx
    • 官方文档地址:https://www.xxxxx.com/xxxxxx

    关注我,每天分享一个实用的Python自动化工具。

  • Python实用工具:Celery分布式任务队列入门与实战教程

    Python实用工具:Celery分布式任务队列入门与实战教程

    一、Celery 库核心概述

    Celery 是一款基于 Python 开发的分布式任务队列库,主要用于处理异步任务、定时任务和分布式任务,常应用于 Web 开发、数据分析、自动化运维等场景。其工作原理是通过生产者将任务发送到消息中间件(如 RabbitMQ、Redis),消费者(Worker)从中间件获取任务并执行,结果可存储在后端存储(如 Redis、数据库)中。

    Celery 的优点包括轻量级、高可用、支持多种消息中间件和结果存储、可水平扩展;缺点是需要依赖第三方消息中间件,初次配置有一定门槛。该库采用 BSD 3-Clause 许可证,开源且可自由用于商业项目。

    二、Celery 安装与环境准备

    2.1 安装 Celery

    Celery 可通过 pip 直接安装,同时需要根据选择的消息中间件安装对应的依赖库。以最常用的 Redis 为例,安装命令如下:

    # 安装 Celery 核心库
    pip install celery
    # 安装 Redis 依赖(用于消息中间件和结果存储)
    pip install redis

    如果选择 RabbitMQ 作为消息中间件,则需要安装对应的依赖:

    pip install celery[librabbitmq]

    2.2 消息中间件选择与配置

    Celery 本身不提供消息存储功能,必须依赖第三方消息中间件,常见的选择有两种:

    1. Redis:轻量级、高性能,适合中小型项目和开发环境,配置简单。
    2. RabbitMQ:专业的消息队列,可靠性更高,适合大型生产环境。

    本教程以 Redis 为例进行演示,在使用前需要确保本地或服务器已安装并启动 Redis 服务。

    三、Celery 核心组件与基础使用

    3.1 Celery 核心组件

    在使用 Celery 之前,需要先了解其三大核心组件:

    • 生产者(Producer):负责创建任务并发送到消息队列,通常是我们的 Python 脚本或 Web 应用。
    • 消费者(Worker):负责监听消息队列并执行任务,一个 Celery 系统可以有多个 Worker 实现分布式部署。
    • 结果后端(Result Backend):用于存储任务的执行结果,可选 Redis、数据库、文件等方式。

    3.2 第一个 Celery 应用:创建异步任务

    我们先从最简单的异步任务开始,创建一个 Celery 实例并定义任务。

    步骤1:创建 Celery 实例

    新建一个名为 celery_app.py 的文件,内容如下:

    # celery_app.py
    from celery import Celery
    
    # 初始化 Celery 应用
    # 参数1:应用名称,可自定义
    # broker:消息中间件的地址,这里使用 Redis
    # backend:结果后端的地址,这里使用 Redis 存储任务结果
    app = Celery(
        'first_celery_app',
        broker='redis://localhost:6379/0',
        backend='redis://localhost:6379/0'
    )
    
    # 定义异步任务:计算两个数的和
    @app.task
    def add(x, y):
        return x + y
    
    # 定义异步任务:生成指定长度的随机字符串
    import random
    import string
    @app.task
    def generate_random_string(length):
        chars = string.ascii_letters + string.digits
        return ''.join(random.choice(chars) for _ in range(length))

    代码说明

    • Celery() 函数用于创建应用实例,broker 参数指定消息中间件地址,backend 指定结果存储地址,redis://localhost:6379/0 表示使用本地 Redis 的第 0 个数据库。
    • @app.task 装饰器用于将普通函数转换为 Celery 异步任务,被装饰的函数可以被异步调用。

    步骤2:启动 Celery Worker

    Celery Worker 是执行任务的进程,需要在命令行中启动。打开终端,进入 celery_app.py 所在的目录,执行以下命令:

    # 启动 Worker,指定应用模块
    # --loglevel=info 表示输出 info 级别日志,方便查看任务执行情况
    celery -A celery_app worker --loglevel=info

    启动成功标志:终端会输出类似 celery@xxx ready. 的信息,此时 Worker 已经开始监听消息队列中的任务。

    步骤3:调用异步任务

    新建一个名为 task_producer.py 的文件,作为任务生产者调用上面定义的异步任务:

    # task_producer.py
    from celery_app import add, generate_random_string
    
    # 异步调用任务:delay() 方法会将任务发送到消息队列
    # 该方法会立即返回一个 AsyncResult 对象,不会阻塞当前进程
    result1 = add.delay(10, 20)
    result2 = generate_random_string.delay(10)
    
    # 打印任务 ID,用于查询任务状态和结果
    print(f"任务1 ID: {result1.id}")
    print(f"任务2 ID: {result2.id}")
    
    # 检查任务是否执行完成
    print(f"任务1是否完成: {result1.ready()}")
    print(f"任务2是否完成: {result2.ready()}")
    
    # 获取任务执行结果(如果任务未完成,会阻塞直到任务完成)
    print(f"任务1执行结果: {result1.get()}")
    print(f"任务2执行结果: {result2.get()}")
    
    # 获取任务执行状态
    print(f"任务1执行状态: {result1.state}")
    print(f"任务2执行状态: {result2.state}")

    代码说明

    • delay() 是 Celery 任务的异步调用方法,它会将任务参数序列化后发送到消息队列,然后立即返回 AsyncResult 对象。
    • ready() 方法用于判断任务是否执行完成,返回布尔值。
    • get() 方法用于获取任务执行结果,如果任务未完成,调用该方法会阻塞当前进程直到任务完成;也可以通过 timeout 参数设置超时时间。
    • state 属性返回任务当前的状态,常见状态有 PENDING(等待中)、STARTED(执行中)、SUCCESS(执行成功)、FAILURE(执行失败)。

    运行 task_producer.py,输出示例如下:

    任务1 ID: 8f9d6b7e-5a3b-4c1e-9f2a-1b2c3d4e5f6g
    任务2 ID: 9a8b7c6d-4e3f-2g1h-0i9j-8k7l6m5n4o3p
    任务1是否完成: False
    任务2是否完成: False
    任务1执行结果: 30
    任务2执行结果: xY7kP2qR9t
    任务1执行状态: SUCCESS
    任务2执行状态: SUCCESS

    3.3 处理任务执行异常

    在实际应用中,任务执行可能会出现异常,Celery 提供了完善的异常处理机制。我们修改 celery_app.py,添加一个可能抛出异常的任务:

    # celery_app.py
    from celery import Celery
    
    app = Celery(
        'first_celery_app',
        broker='redis://localhost:6379/0',
        backend='redis://localhost:6379/0'
    )
    
    @app.task
    def divide(x, y):
        # 定义除法任务,当 y 为 0 时会抛出 ZeroDivisionError
        return x / y

    然后创建 error_handler.py 调用该任务:

    # error_handler.py
    from celery_app import divide
    
    # 调用除法任务,故意传入 y=0
    result = divide.delay(10, 0)
    
    try:
        # 获取结果时如果任务抛出异常,会重新抛出该异常
        print(result.get())
    except ZeroDivisionError as e:
        print(f"任务执行失败,异常信息: {e}")
    
    # 也可以通过 result.failed() 判断任务是否执行失败
    print(f"任务是否失败: {result.failed()}")
    # 获取异常信息
    print(f"任务异常信息: {result.result}")

    代码说明

    • 当任务执行抛出异常时,get() 方法会重新抛出该异常,我们可以通过 try-except 捕获并处理。
    • failed() 方法用于判断任务是否执行失败,返回布尔值。
    • result.result 属性会返回任务抛出的异常对象。

    四、Celery 定时任务(Periodic Tasks)

    除了异步任务,Celery 还支持定时任务,类似于 Linux 的 crontab 或 Windows 的任务计划程序。我们可以通过配置定时任务,让 Celery 自动周期性执行指定任务。

    4.1 配置定时任务

    修改 celery_app.py,添加定时任务配置:

    # celery_app.py
    from celery import Celery
    from celery.schedules import crontab
    
    app = Celery(
        'periodic_task_app',
        broker='redis://localhost:6379/0',
        backend='redis://localhost:6379/0'
    )
    
    # 配置定时任务
    app.conf.beat_schedule = {
        # 定时任务名称,可自定义
        'add-every-10-seconds': {
            # 指定要执行的任务
            'task': 'celery_app.add',
            # 执行周期:每 10 秒执行一次
            'schedule': 10.0,
            # 任务参数
            'args': (100, 200)
        },
        # 另一个定时任务:每天凌晨 2 点执行
        'generate-string-every-day': {
            'task': 'celery_app.generate_random_string',
            # 使用 crontab 表达式配置复杂周期
            'schedule': crontab(hour=2, minute=0),
            'args': (20,)
        }
    }
    
    # 定义任务
    @app.task
    def add(x, y):
        result = x + y
        print(f"定时任务执行:{x} + {y} = {result}")
        return result
    
    @app.task
    def generate_random_string(length):
        import random
        import string
        chars = string.ascii_letters + string.digits
        result = ''.join(random.choice(chars) for _ in range(length))
        print(f"定时任务生成随机字符串:{result}")
        return result

    代码说明

    • app.conf.beat_schedule 用于配置定时任务,每个键值对对应一个定时任务。
    • schedule 参数可以是数字(表示秒数),也可以是 crontab 对象(用于配置复杂周期,如每天、每周、每月执行)。
    • crontab(hour=2, minute=0) 表示每天凌晨 2 点执行任务,更多 crontab 表达式用法可参考 Celery 官方文档。

    4.2 启动定时任务调度器(Beat)

    定时任务需要两个进程配合:

    1. Beat 进程:负责按照配置的周期生成定时任务,并发送到消息队列。
    2. Worker 进程:负责执行定时任务。

    首先启动 Beat 进程:

    celery -A celery_app beat --loglevel=info

    然后启动 Worker 进程(新打开一个终端):

    celery -A celery_app worker --loglevel=info

    启动成功后,Beat 进程会每 10 秒生成一个 add 任务发送到队列,Worker 进程会执行该任务并输出结果;每天凌晨 2 点会生成一个随机字符串任务。

    五、Celery 任务进阶功能

    5.1 任务优先级

    在任务量较大时,我们可以为任务设置优先级,让重要的任务优先执行。Celery 支持为任务和 Worker 设置优先级,以 Redis 作为消息中间件为例,配置方法如下:

    # celery_app.py
    from celery import Celery
    
    app = Celery(
        'priority_task_app',
        broker='redis://localhost:6379/0',
        backend='redis://localhost:6379/0'
    )
    
    # 配置任务优先级
    app.conf.task_queue_max_priority = 10  # 最大优先级为 10,数值越大优先级越高
    
    @app.task(priority=5)
    def normal_task():
        return "这是一个普通优先级任务"
    
    @app.task(priority=10)
    def high_priority_task():
        return "这是一个高优先级任务"

    启动 Worker 时,需要指定优先级支持:

    celery -A celery_app worker --loglevel=info --priority=10

    调用任务时,高优先级任务会优先被 Worker 执行。

    5.2 任务重试

    当任务执行失败时,我们可以通过配置让 Celery 自动重试任务。修改任务定义,添加重试参数:

    # celery_app.py
    from celery import Celery
    from celery.exceptions import Retry
    
    app = Celery(
        'retry_task_app',
        broker='redis://localhost:6379/0',
        backend='redis://localhost:6379/0'
    )
    
    @app.task(bind=True, max_retries=3, retry_backoff=2, retry_jitter=False)
    def retry_divide(self, x, y):
        """
        bind=True:将任务实例自身作为第一个参数传入
        max_retries:最大重试次数
        retry_backoff:重试间隔时间(秒),每次重试间隔会乘以 2(指数退避)
        retry_jitter:是否添加随机抖动,False 表示固定间隔
        """
        try:
            return x / y
        except ZeroDivisionError as e:
            # 抛出 Retry 异常触发重试
            self.retry(exc=e, countdown=5)  # countdown 表示 5 秒后重试

    代码说明

    • bind=True 是实现任务重试的关键,它让任务函数可以访问自身的属性和方法。
    • self.retry() 方法用于触发任务重试,exc 参数指定要重试的异常,countdown 参数指定重试间隔。
    • max_retries=3 表示最多重试 3 次,超过次数后任务状态会变为 FAILURE

    六、Celery 实际应用案例:批量数据处理

    在数据分析场景中,我们经常需要处理大量数据,使用 Celery 可以将数据分发给多个 Worker 并行处理,提升效率。以下是一个批量处理用户数据的案例。

    6.1 案例需求

    有一个包含 1000 条用户数据的列表,需要对每条数据进行清洗(去除空值、格式化日期),然后将清洗后的数据保存到 Redis 中。

    6.2 实现代码

    步骤1:定义任务和应用

    # data_processing_app.py
    from celery import Celery
    import json
    from datetime import datetime
    
    app = Celery(
        'data_processing_app',
        broker='redis://localhost:6379/0',
        backend='redis://localhost:6379/0'
    )
    
    # 定义数据清洗任务
    @app.task
    def clean_user_data(user):
        """清洗单条用户数据"""
        # 去除空值字段
        cleaned_user = {k: v for k, v in user.items() if v is not None and v != ""}
        # 格式化注册日期
        if 'register_date' in cleaned_user:
            try:
                cleaned_user['register_date'] = datetime.strptime(
                    cleaned_user['register_date'],
                    '%Y-%m-%d'
                ).strftime('%Y/%m/%d')
            except ValueError:
                cleaned_user['register_date'] = '无效日期'
        # 返回清洗后的数据
        return cleaned_user
    
    # 定义批量保存任务
    @app.task
    def batch_save_to_redis(cleaned_users):
        """批量保存清洗后的数据到 Redis"""
        import redis
        r = redis.Redis(host='localhost', port=6379, db=0)
        # 将数据序列化为 JSON 字符串
        r.set('cleaned_user_data', json.dumps(cleaned_users))
        return f"成功保存 {len(cleaned_users)} 条用户数据"

    步骤2:生成测试数据并调用任务

    # data_producer.py
    from data_processing_app import clean_user_data, batch_save_to_redis
    import random
    
    # 生成 1000 条测试用户数据
    def generate_test_data():
        users = []
        for i in range(1000):
            user = {
                'id': i + 1,
                'name': f'User_{i + 1}',
                'age': random.randint(18, 60) if random.random() > 0.1 else None,
                'register_date': f'202{random.randint(3, 5)}-{random.randint(1, 12)}-{random.randint(1, 28)}' if random.random() > 0.2 else ''
            }
            users.append(user)
        return users
    
    if __name__ == '__main__':
        # 生成测试数据
        test_users = generate_test_data()
        # 异步调用清洗任务,处理每条数据
        clean_tasks = [clean_user_data.delay(user) for user in test_users]
        # 等待所有清洗任务完成,并收集结果
        cleaned_users = [task.get() for task in clean_tasks]
        # 调用批量保存任务
        save_result = batch_save_to_redis.delay(cleaned_users)
        print(save_result.get())

    步骤3:启动 Worker 并运行

    启动 Worker 进程:

    celery -A data_processing_app worker --loglevel=info --concurrency=4

    --concurrency=4 表示启动 4 个并发 Worker 进程,可根据 CPU 核心数调整。

    运行 data_producer.py,程序会生成 1000 条测试数据,将每条数据的清洗任务发送到队列,Worker 并行处理后,再批量保存到 Redis 中。

    七、Celery 相关资源

    • Pypi地址:https://pypi.org/project/Celery
    • Github地址:https://github.com/celery/celery
    • 官方文档地址:https://docs.celeryq.dev/en/stable/

    关注我,每天分享一个实用的Python自动化工具。

  • Python实用工具:Apache Airflow 从入门到实战 保姆级教程

    Python实用工具:Apache Airflow 从入门到实战 保姆级教程

    一、Apache Airflow 核心概述

    Apache Airflow是一款由Airbnb开源的任务编排与调度工具,专门用于管理复杂的工作流(Workflow)。其工作原理是将任务抽象为有向无环图(DAG),通过调度器按照任务依赖关系和预设的触发规则自动执行任务。Airflow支持丰富的任务类型,涵盖Shell命令、Python函数、SQL查询等,广泛应用于数据清洗、ETL流程、定时任务执行等场景。

    该库的开源协议为Apache License 2.0,这是一个对商业友好的开源协议,允许用户自由使用、修改和分发代码。它的优点是灵活性强、可扩展性高、可视化界面友好;缺点是部署和维护成本较高,对新手不够友好,且轻量级任务场景下略显笨重。

    二、Apache Airflow 安装与环境配置

    2.1 系统环境要求

    在安装Airflow之前,需要确保本地环境满足以下条件:

    • Python 3.8~3.11 版本(Airflow 2.x 对Python版本有严格要求)
    • 足够的磁盘空间(Airflow会存储任务日志和元数据)
    • 已安装pip包管理工具

    2.2 安装步骤

    Airflow的安装方式有多种,包括pip安装、Docker安装和源码安装,这里我们以最适合新手的pip安装为例进行讲解。

    1. 设置环境变量
      Airflow默认从PyPI下载包,在安装前需要设置一个环境变量来指定Airflow的家目录,同时避免版本冲突: # Linux/Mac系统 export AIRFLOW_HOME=~/airflow # Windows系统(cmd命令行) set AIRFLOW_HOME=D:\airflow
    2. 安装Airflow
      由于Airflow的依赖包较多,直接安装可能会出现问题,我们可以先升级pip,再指定Airflow版本进行安装(推荐安装稳定版2.6.3): # 升级pip python -m pip install --upgrade pip # 安装Airflow核心包 pip install apache-airflow==2.6.3
    3. 初始化元数据库
      Airflow使用元数据库(默认是SQLite)存储DAG信息、任务执行状态等数据,安装完成后需要初始化数据库: airflow db init
    4. 创建管理员用户
      为了登录Airflow的Web UI,需要创建一个管理员账户:
      bash airflow users create \ --username admin \ --firstname FirstName \ --lastname LastName \ --role Admin \ --email [email protected]
      执行该命令后,会提示输入密码,按照提示输入即可。

    2.3 启动Airflow服务

    Airflow包含两个核心服务:Web服务器调度器,需要分别启动。

    1. 启动Web服务器 # 默认端口是8080 airflow webserver --port 8080
    2. 启动调度器
      打开一个新的终端窗口,执行以下命令启动调度器,调度器会自动检测DAG目录中的任务并执行: airflow scheduler
    3. 访问Web UI
      打开浏览器,输入地址 http://localhost:8080,使用之前创建的管理员账户和密码登录,即可看到Airflow的可视化界面。

    三、Apache Airflow 核心概念与基础用法

    3.1 核心概念解析

    在使用Airflow之前,必须理解几个核心概念,这些概念是构建工作流的基础。

    1. DAG(有向无环图)
      是Airflow的核心,代表一个完整的工作流。DAG由多个任务(Task)组成,任务之间存在依赖关系,且不存在循环依赖(无环)。例如,一个ETL工作流的DAG可以是:数据抽取任务 → 数据清洗任务 → 数据加载任务
    2. Task(任务) 是DAG中的最小执行单元,每个Task对应一个具体的操作。Airflow支持多种类型的Task,常见的有:
      • PythonOperator:执行Python函数
      • BashOperator:执行Shell命令
      • SqlOperator:执行SQL语句
      • EmailOperator:发送邮件
    3. Operator(操作符)
      是定义Task的模板,不同的Operator对应不同类型的任务。Operator的作用是将任务逻辑封装起来,用户只需要传入参数即可创建Task。
    4. Task Instance(任务实例)
      是Task的一次具体执行。每个Task在不同的时间点执行,都会生成一个独立的Task Instance,例如每天执行一次的任务,每天都会产生一个新的Task Instance。
    5. DAG Run(DAG运行实例)
      是DAG的一次具体执行。当DAG满足触发条件时,调度器会创建一个DAG Run,负责执行该次运行中的所有Task Instance。

    3.2 编写第一个DAG

    Airflow的DAG文件是Python脚本,默认存储在AIRFLOW_HOME/dags目录下,调度器会自动扫描该目录下的Python文件并加载DAG。

    接下来我们编写一个简单的DAG,包含两个任务:一个任务打印“Hello Airflow”,另一个任务打印“Task Finished”,且第二个任务依赖于第一个任务。

    1. 创建DAG文件
      AIRFLOW_HOME/dags目录下创建一个名为hello_airflow_dag.py的文件。
    2. 编写DAG代码 # 导入必要的模块 from datetime import datetime, timedelta from airflow import DAG from airflow.operators.python import PythonOperator # 定义Python函数,作为任务的执行逻辑 def print_hello(): print("Hello Airflow! This is my first DAG.") def print_finish(): print("Task Finished! Congratulations!") # 定义默认参数 default_args = { 'owner': 'airflow', # DAG的所有者 'depends_on_past': False, # 不依赖于上一次的执行结果 'email_on_failure': False, # 任务失败时不发送邮件 'email_on_retry': False, # 任务重试时不发送邮件 'retries': 1, # 任务失败后的重试次数 'retry_delay': timedelta(minutes=5), # 重试间隔时间 } # 定义DAG with DAG( 'hello_airflow_dag', # DAG的唯一标识 default_args=default_args, description='A simple tutorial DAG', # DAG的描述 schedule_interval=timedelta(days=1), # 调度间隔,每天执行一次 start_date=datetime(2024, 1, 1), # DAG的开始时间 catchup=False, # 不回溯执行历史任务 tags=['example', 'tutorial'], # 标签,用于分类DAG ) as dag:# 定义第一个任务 task1 = PythonOperator( task_id='print_hello_task', # 任务的唯一标识 python_callable=print_hello, # 任务执行的Python函数 ) # 定义第二个任务 task2 = PythonOperator( task_id='print_finish_task', python_callable=print_finish, ) # 设置任务依赖关系:task1执行完成后再执行task2 task1 &gt;&gt; task2</code></pre></li>代码说明 默认参数(default_args):定义了DAG中所有任务的公共参数,如所有者、重试次数、重试间隔等。 DAG定义:使用with DAG()上下文管理器创建DAG,指定了DAG的ID、描述、调度间隔、开始时间等关键信息。 任务定义:使用PythonOperator创建两个任务,分别指定任务ID和要执行的Python函数。 依赖关系设置:使用>>运算符设置任务之间的依赖关系,task1 >> task2表示task2必须在task1执行完成后才能执行。 查看并触发DAG
      将上述代码保存到dags目录后,等待1~2分钟,调度器会自动加载该DAG。在Airflow Web UI的DAG列表中找到hello_airflow_dag,点击右侧的开关按钮启用该DAG。 启用后,可以手动触发DAG执行:点击DAG名称进入详情页,点击右上角的“Trigger DAG”按钮,即可手动启动一次DAG运行。运行完成后,可以在“Graph”视图中查看任务的执行状态,在“Log”视图中查看任务的输出日志。

    3.3 常用Operator实战

    除了PythonOperator,Airflow还提供了多种常用的Operator,下面我们介绍几种最常用的Operator及其用法。

    3.3.1 BashOperator:执行Shell命令

    BashOperator用于执行Shell命令或脚本,适合处理需要调用系统命令的任务。

    from datetime import datetime, timedelta
    from airflow import DAG
    from airflow.operators.bash import BashOperator
    
    default_args = {
        'owner': 'airflow',
        'retries': 1,
        'retry_delay': timedelta(minutes=2),
    }
    
    with DAG(
        'bash_operator_dag',
        default_args=default_args,
        description='A DAG using BashOperator',
        schedule_interval=timedelta(days=1),
        start_date=datetime(2024, 1, 1),
        catchup=False,
        tags=['bash', 'example'],
    ) as dag:
    
        # 执行简单的Shell命令
        task1 = BashOperator(
            task_id='print_date_task',
            bash_command='date',  # 打印当前日期
        )
    
        # 执行Shell脚本
        task2 = BashOperator(
            task_id='run_script_task',
            bash_command='echo "Current directory: $(pwd)" && ls -l',
        )
    
        task1 >> task2

    代码说明bash_command参数用于指定要执行的Shell命令或脚本,多个命令可以用&&连接。

    3.3.2 EmailOperator:发送邮件

    EmailOperator用于在任务执行完成后发送邮件通知,适合用于任务执行状态的告警。

    使用EmailOperator前,需要在AIRFLOW_HOME/airflow.cfg配置文件中设置邮件服务器信息:

    [smtp]
    smtp_host = smtp.example.com
    smtp_starttls = True
    smtp_ssl = False
    smtp_user = [email protected]
    smtp_password = your_email_password
    smtp_port = 587
    smtp_mail_from = [email protected]

    然后编写DAG代码:

    from datetime import datetime, timedelta
    from airflow import DAG
    from airflow.operators.python import PythonOperator
    from airflow.operators.email import EmailOperator
    
    def task_function():
        return "Task executed successfully!"
    
    default_args = {
        'owner': 'airflow',
        'retries': 1,
        'retry_delay': timedelta(minutes=2),
    }
    
    with DAG(
        'email_operator_dag',
        default_args=default_args,
        description='A DAG using EmailOperator',
        schedule_interval=timedelta(days=1),
        start_date=datetime(2024, 1, 1),
        catchup=False,
        tags=['email', 'example'],
    ) as dag:
    
        task1 = PythonOperator(
            task_id='execute_task',
            python_callable=task_function,
        )
    
        # 发送邮件通知
        send_email = EmailOperator(
            task_id='send_email_notification',
            to='[email protected]',  # 收件人邮箱
            subject='Airflow Task Execution Status',  # 邮件主题
            html_content='<h3>Task executed successfully!</h3>',  # 邮件内容(支持HTML)
        )
    
        task1 >> send_email

    四、Airflow 工作流实战:数据ETL流程

    4.1 需求分析

    我们以一个简单的电商用户数据ETL流程为例,演示如何使用Airflow构建完整的工作流。需求如下:

    1. 数据抽取:从CSV文件中读取用户数据。
    2. 数据清洗:去除缺失值、重复值,转换数据格式。
    3. 数据加载:将清洗后的数据写入新的CSV文件。
    4. 发送通知:数据加载完成后发送邮件通知。

    4.2 项目目录结构

    对于复杂的Airflow项目,建议采用以下目录结构,便于管理代码和数据:

    airflow_home/
    ├── dags/
    │   ├── etl_user_data_dag.py  # ETL流程的DAG文件
    │   └── etl_scripts/          # 存储ETL相关的Python脚本
    │       ├── extract.py
    │       ├── transform.py
    │       └── load.py
    ├── data/                     # 存储原始数据和清洗后的数据
    │   ├── raw/
    │   │   └── user_data.csv
    │   └── processed/
    └── logs/                     # Airflow任务日志

    4.3 编写ETL脚本

    1. 数据抽取脚本(extract.py) # dags/etl_scripts/extract.py import pandas as pd import os def extract_data(input_path: str) -> pd.DataFrame: """ 从CSV文件中抽取数据 :param input_path: 原始数据文件路径 :return: 抽取后的DataFrame """ if not os.path.exists(input_path): raise FileNotFoundError(f"Input file not found: {input_path}") df = pd.read_csv(input_path) print(f"Extracted {len(df)} rows of data") return df
    2. 数据清洗脚本(transform.py) # dags/etl_scripts/transform.py import pandas as pd def transform_data(df: pd.DataFrame) -> pd.DataFrame: """ 数据清洗:去除缺失值、重复值,转换日期格式 :param df: 原始DataFrame :return: 清洗后的DataFrame """ # 去除缺失值 df = df.dropna(subset=['user_id', 'username', 'register_date']) # 去除重复值 df = df.drop_duplicates(subset=['user_id']) # 转换注册日期格式为YYYY-MM-DD df['register_date'] = pd.to_datetime(df['register_date']).dt.strftime('%Y-%m-%d') print(f"Transformed data: {len(df)} rows remaining") return df
    3. 数据加载脚本(load.py) # dags/etl_scripts/load.py import pandas as pd import os def load_data(df: pd.DataFrame, output_path: str) -> None: """ 将清洗后的数据写入CSV文件 :param df: 清洗后的DataFrame :param output_path: 输出文件路径 """ # 创建输出目录(如果不存在) os.makedirs(os.path.dirname(output_path), exist_ok=True) # 写入CSV文件 df.to_csv(output_path, index=False) print(f"Loaded data to {output_path} successfully")

    4.4 编写ETL DAG文件

    # dags/etl_user_data_dag.py
    from datetime import datetime, timedelta
    from airflow import DAG
    from airflow.operators.python import PythonOperator
    from airflow.operators.email import EmailOperator
    from etl_scripts.extract import extract_data
    from etl_scripts.transform import transform_data
    from etl_scripts.load import load_data
    import os
    
    # 定义文件路径
    AIRFLOW_HOME = os.getenv('AIRFLOW_HOME', '~/airflow')
    INPUT_PATH = os.path.join(AIRFLOW_HOME, 'data/raw/user_data.csv')
    OUTPUT_PATH = os.path.join(AIRFLOW_HOME, 'data/processed/cleaned_user_data.csv')
    
    # 定义默认参数
    default_args = {
        'owner': 'data_engineer',
        'depends_on_past': False,
        'email_on_failure': True,
        'email_on_retry': False,
        'email': ['[email protected]'],
        'retries': 2,
        'retry_delay': timedelta(minutes=5),
    }
    
    # 定义任务函数(封装ETL脚本)
    def extract_task():
        return extract_data(INPUT_PATH)
    
    def transform_task(ti):
        # 从上游任务(extract_task)获取数据
        df = ti.xcom_pull(task_ids='extract_task')
        return transform_data(df)
    
    def load_task(ti):
        df = ti.xcom_pull(task_ids='transform_task')
        load_data(df, OUTPUT_PATH)
    
    # 定义DAG
    with DAG(
        'etl_user_data_workflow',
        default_args=default_args,
        description='A complete ETL workflow for user data',
        schedule_interval='0 1 * * *',  # 每天凌晨1点执行
        start_date=datetime(2024, 1, 1),
        catchup=False,
        tags=['etl', 'data_processing'],
    ) as dag:
    
        # 定义任务
        extract = PythonOperator(
            task_id='extract_task',
            python_callable=extract_task,
        )
    
        transform = PythonOperator(
            task_id='transform_task',
            python_callable=transform_task,
        )
    
        load = PythonOperator(
            task_id='load_task',
            python_callable=load_task,
        )
    
        send_notification = EmailOperator(
            task_id='send_notification',
            to='[email protected]',
            subject='User Data ETL Workflow Completed',
            html_content=f'<h3>ETL Workflow executed successfully!</h3><p>Cleaned data saved to: {OUTPUT_PATH}</p>',
        )
    
        # 设置任务依赖关系
        extract >> transform >> load >> send_notification

    4.5 代码关键说明

    1. XCom通信机制
      Airflow中,任务之间的数据传递通过XCom(Cross-Communication) 实现。在上述代码中,transform_task通过ti.xcom_pull(task_ids='extract_task')获取extract_task的返回值,load_task同理获取transform_task的返回值。XCom适合传递小量数据,大量数据建议使用文件或数据库存储。
    2. 调度时间设置
      schedule_interval参数支持多种格式,除了timedelta,还可以使用Cron表达式。例如'0 1 * * *'表示每天凌晨1点执行任务,这是生产环境中最常用的调度方式。
    3. 任务失败告警
      default_args中设置了email_on_failure: True,当任务执行失败时,Airflow会自动向指定邮箱发送告警邮件。

    4.6 测试与运行

    1. 准备原始数据
      AIRFLOW_HOME/data/raw目录下创建user_data.csv文件,写入以下测试数据: user_id,username,register_date,age,gender 1,alice,2024-01-01,25,F 2,bob,2024-01-02,,M 3,charlie,2024-01-03,30,M 2,bob,2024-01-02,28,M 4,david,,35,M
    2. 启动DAG
      将DAG文件和ETL脚本放入对应的目录后,在Airflow Web UI中启用etl_user_data_workflow DAG。可以手动触发一次执行,查看任务的执行状态。执行完成后,在data/processed目录下会生成cleaned_user_data.csv文件,内容为清洗后的数据。

    五、Airflow 高级特性与优化建议

    5.1 连接管理(Connections)

    在实际项目中,任务经常需要连接外部系统(如数据库、Hadoop、云存储等)。Airflow提供了Connections功能,用于统一管理外部系统的连接信息,避免在代码中硬编码用户名、密码等敏感信息。

    例如,要连接MySQL数据库,可以在Airflow Web UI的“Admin → Connections”页面创建一个新的连接:

    • Conn Idmysql_default(自定义,需在代码中引用)
    • Conn TypeMySQL
    • Host:MySQL服务器地址
    • Schema:数据库名称
    • Login:用户名
    • Password:密码
    • Port:端口号(默认3306)

    在代码中可以通过Hook获取连接信息:

    from airflow.providers.mysql.hooks.mysql import MySqlHook
    
    def query_mysql_data():
        hook = MySqlHook(mysql_conn_id='mysql_default')
        df = hook.get_pandas_df(sql='SELECT * FROM users')
        return df

    5.2 变量管理(Variables)

    Airflow的Variables功能用于存储和管理工作流中的配置变量,如文件路径、阈值参数等。可以在Web UI的“Admin → Variables”页面添加变量,也可以在代码中通过Variable类获取变量值:

    from airflow.models import Variable
    
    # 从Variables中获取变量
    input_path = Variable.get('user_data_input_path', default_var='/default/path')
    output_path = Variable.get('user_data_output_path')

    5.3 任务并行执行与资源限制

    Airflow支持任务的并行执行,可以通过以下参数优化并行度:

    • parallelism:Airflow集群的最大并行任务数(全局参数)。
    • dag_concurrency:单个DAG的最大并行任务数。
    • max_active_runs_per_dag:单个DAG的最大活跃运行实例数。

    这些参数可以在airflow.cfg配置文件中修改。

    5.4 日志管理

    Airflow的任务日志默认存储在本地,在生产环境中可以将日志存储到远程存储系统(如S3、HDFS、Elasticsearch等),便于日志的集中管理和查询。修改airflow.cfg中的[logging]部分即可配置远程日志存储。

    六、相关资源链接

    • PyPI地址:https://pypi.org/project/apache-airflow
    • Github地址:https://github.com/apache/airflow
    • 官方文档地址:https://airflow.apache.org/docs/apache-airflow/stable/index.html

    关注我,每天分享一个实用的Python自动化工具。

  • Python Squirrel库入门教程:高效数据缓存与持久化工具

    Python Squirrel库入门教程:高效数据缓存与持久化工具

    一、Squirrel库核心概述

    Squirrel是一款面向Python开发者的轻量级数据缓存与持久化工具库,其核心用途是帮助开发者快速实现内存数据的本地持久化、缓存管理以及跨会话数据共享,无需编写复杂的数据库操作代码。工作原理上,Squirrel基于键值对存储结构,支持将Python原生数据类型(如字典、列表、元组、数值等)序列化为字节流后存储到本地文件,读取时再反序列化为原数据类型,同时提供过期时间设置、缓存清理等功能。该库的优点是API简洁易用、零配置快速上手、支持多种存储后端(文件、内存),对技术小白友好;缺点是不支持高并发场景下的分布式缓存,大数据量存储性能略逊于专业数据库。Squirrel的开源协议为MIT License,开发者可自由用于商业和非商业项目,无授权限制。

    二、Squirrel库安装步骤

    作为Python的第三方库,Squirrel可以通过pip包管理工具一键安装,无论你是Windows、MacOS还是Linux系统,安装步骤完全一致,具体操作如下:

    2.1 环境准备

    确保你的电脑已经安装了Python环境(推荐Python 3.6及以上版本),可以通过在命令行中输入以下指令验证Python版本:

    # 验证Python版本
    import sys
    print(sys.version)

    运行代码后,如果输出类似3.9.7 (default, Sep 16 2021, 08:50:36) [MSC v.1916 64 bit (AMD64)]的内容,说明Python环境已就绪。

    2.2 执行安装命令

    打开命令行终端(Windows为CMD或PowerShell,MacOS和Linux为Terminal),输入以下安装指令:

    pip install squirrel

    等待终端输出Successfully installed squirrel-x.x.x(x.x.x为版本号),即表示安装成功。如果安装过程中出现网络超时问题,可以切换国内PyPI镜像源,例如使用阿里云镜像:

    pip install squirrel -i https://mirrors.aliyun.com/pypi/simple/

    三、Squirrel库核心API使用教程

    Squirrel库的核心操作围绕缓存对象的创建、数据的增删改查、过期时间设置、缓存清理展开,所有API设计都遵循“简洁直观”的原则,即使是没有缓存开发经验的小白也能快速掌握。下面我们通过代码示例逐一讲解每个核心功能的使用方法。

    3.1 初始化缓存对象

    使用Squirrel的第一步是创建一个缓存实例,该实例可以指定存储后端(默认是文件存储,也可以选择内存存储)。文件存储会将数据保存到本地文件,程序重启后数据不丢失;内存存储则仅在程序运行期间保存数据,程序退出后数据自动清除。

    # 导入Squirrel库的核心类
    from squirrel import Cache
    
    # 初始化文件存储缓存,数据会保存到本地的squirrel_cache.db文件
    file_cache = Cache(backend="file", path="squirrel_cache.db")
    
    # 初始化内存存储缓存,程序退出后数据丢失
    memory_cache = Cache(backend="memory")

    代码说明

    • backend参数用于指定存储后端,可选值为filememory,默认值为file
    • path参数仅在backend="file"时生效,用于指定缓存文件的保存路径和文件名。如果不指定,默认会在当前目录下创建squirrel_cache.db文件。

    3.2 数据的添加与读取

    Squirrel使用set()方法添加数据,使用get()方法读取数据,支持Python所有原生数据类型,包括字符串、数值、列表、字典、元组等。

    # 向文件缓存中添加数据
    # 存储字符串类型
    file_cache.set("username", "python_squirrel")
    # 存储数值类型
    file_cache.set("age", 25)
    # 存储列表类型
    file_cache.set("hobbies", ["coding", "reading", "hiking"])
    # 存储字典类型
    file_cache.set("user_info", {"id": 1001, "name": "小明", "email": "[email protected]"})
    
    # 从文件缓存中读取数据
    username = file_cache.get("username")
    age = file_cache.get("age")
    hobbies = file_cache.get("hobbies")
    user_info = file_cache.get("user_info")
    
    # 打印读取的数据
    print(f"用户名: {username}")
    print(f"年龄: {age}")
    print(f"爱好: {hobbies}")
    print(f"用户信息: {user_info}")

    代码说明

    • set(key, value)方法接收两个参数,key为字符串类型的键名,value为需要存储的任意Python原生数据。
    • get(key)方法接收一个参数key,返回该键对应的数值,如果键不存在,则返回None
    • 运行代码后,控制台会输出以下内容:
      用户名: python_squirrel
      年龄: 25
      爱好: ['coding', 'reading', 'hiking']
      用户信息: {'id': 1001, 'name': '小明', 'email': '[email protected]'}

    即使关闭程序后重新运行读取代码,数据依然会存在,因为我们使用的是文件存储后端。

    3.3 设置数据的过期时间

    在很多场景下,我们需要缓存的数据在一段时间后自动失效(例如验证码、临时会话信息),Squirrel的set()方法支持通过expire参数设置过期时间,单位为

    # 存储一个有效期为60秒的验证码
    file_cache.set("verify_code", "852369", expire=60)
    
    # 立即读取,此时数据未过期
    verify_code = file_cache.get("verify_code")
    print(f"未过期的验证码: {verify_code}")  # 输出:未过期的验证码: 852369
    
    # 等待60秒后再次读取,数据已过期,返回None
    import time
    time.sleep(60)
    expired_code = file_cache.get("verify_code")
    print(f"过期后的验证码: {expired_code}")  # 输出:过期后的验证码: None

    代码说明

    • expire参数为可选参数,默认值为None,表示数据永久有效。当指定数值时,数据会在对应的秒数后自动失效。
    • 过期的数据会在下次调用get()方法时被检测并清理,不会占用存储空间。

    3.4 数据的修改与删除

    修改缓存数据的方法依然是set(),只需要对同一个键名重新赋值即可;删除数据则使用delete()方法,指定需要删除的键名即可。

    # 修改已存在的数据
    file_cache.set("username", "squirrel_python")
    updated_username = file_cache.get("username")
    print(f"修改后的用户名: {updated_username}")  # 输出:修改后的用户名: squirrel_python
    
    # 删除指定键的数据
    file_cache.delete("age")
    deleted_age = file_cache.get("age")
    print(f"删除后的age值: {deleted_age}")  # 输出:删除后的age值: None
    
    # 批量删除多个键的数据
    file_cache.delete_many(["hobbies", "user_info"])
    hobbies_after_delete = file_cache.get("hobbies")
    user_info_after_delete = file_cache.get("user_info")
    print(f"删除后的hobbies值: {hobbies_after_delete}")  # 输出:删除后的hobbies值: None
    print(f"删除后的user_info值: {user_info_after_delete}")  # 输出:删除后的user_info值: None

    代码说明

    • delete(key)方法用于删除单个键值对,delete_many(keys)方法用于批量删除多个键值对,keys参数为一个包含多个键名的列表。
    • 被删除的数据会立即从存储后端中移除,无论是文件存储还是内存存储。

    3.5 缓存数据的批量操作

    除了单个数据的增删改查,Squirrel还支持批量添加和批量读取数据,这在需要一次性处理大量数据时可以显著提高效率。

    # 批量添加数据
    batch_data = {
        "key1": "value1",
        "key2": 100,
        "key3": [1, 2, 3],
        "key4": {"a": 1, "b": 2}
    }
    file_cache.set_many(batch_data)
    
    # 批量读取数据
    keys_to_get = ["key1", "key2", "key3", "key4"]
    batch_result = file_cache.get_many(keys_to_get)
    
    # 打印批量读取的结果
    for key, value in batch_result.items():
        print(f"{key}: {value}")

    代码说明

    • set_many(data)方法接收一个字典类型的参数data,字典中的每个键值对都会被添加到缓存中。
    • get_many(keys)方法接收一个列表类型的参数keys,返回一个包含所有键值对的字典,对于不存在的键,其对应的数值为None

    3.6 缓存的清空与状态查询

    如果需要清空所有缓存数据,可以使用clear()方法;如果需要查询缓存中当前的键数量,可以使用count()方法。

    # 查询当前缓存中的键数量
    key_count = file_cache.count()
    print(f"缓存中的键数量: {key_count}")
    
    # 清空所有缓存数据
    file_cache.clear()
    
    # 再次查询键数量,此时为0
    empty_count = file_cache.count()
    print(f"清空后的键数量: {empty_count}")

    代码说明

    • count()方法返回缓存中有效键的数量,不包含已过期的键。
    • clear()方法会删除缓存中的所有数据,操作不可逆,使用时需要谨慎。

    四、Squirrel库实际应用案例

    为了帮助开发者更好地理解Squirrel库在实际项目中的使用场景,下面我们以用户登录状态缓存API接口数据缓存两个常见场景为例,编写完整的代码案例,展示如何将Squirrel库集成到Python项目中。

    4.1 案例一:用户登录状态缓存

    在Web开发或桌面应用开发中,用户登录后需要保持登录状态,避免每次操作都重新输入用户名和密码。使用Squirrel可以将用户的登录信息缓存到本地,程序重启后依然可以保持登录状态,直到用户主动退出登录。

    from squirrel import Cache
    import time
    
    # 初始化文件缓存,存储用户登录信息
    login_cache = Cache(backend="file", path="login_status.db")
    
    def user_login(username: str, password: str) -> bool:
        """
        用户登录函数,模拟验证用户名和密码
        :param username: 用户名
        :param password: 密码
        :return: 登录成功返回True,失败返回False
        """
        # 模拟数据库中的用户信息
        db_users = {
            "admin": "admin123",
            "user1": "user123",
            "user2": "user234"
        }
        # 验证用户名和密码
        if username in db_users and db_users[username] == password:
            # 登录成功,缓存用户信息,有效期2小时(7200秒)
            login_cache.set(f"login_{username}", True, expire=7200)
            login_cache.set(f"user_{username}", {"username": username, "login_time": time.time()})
            print(f"用户 {username} 登录成功!")
            return True
        else:
            print("用户名或密码错误,登录失败!")
            return False
    
    def check_login_status(username: str) -> bool:
        """
        检查用户是否处于登录状态
        :param username: 用户名
        :return: 已登录返回True,未登录返回False
        """
        login_status = login_cache.get(f"login_{username}")
        return login_status is not None and login_status
    
    def user_logout(username: str) -> None:
        """
        用户退出登录,清除缓存中的登录状态
        :param username: 用户名
        """
        login_cache.delete(f"login_{username}")
        login_cache.delete(f"user_{username}")
        print(f"用户 {username} 已退出登录!")
    
    # 测试登录功能
    user_login("admin", "admin123")
    
    # 检查登录状态
    if check_login_status("admin"):
        user_info = login_cache.get(f"user_admin")
        login_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(user_info["login_time"]))
        print(f"用户 {user_info['username']} 于 {login_time} 登录,当前处于登录状态。")
    
    # 退出登录
    user_logout("admin")
    
    # 再次检查登录状态
    if not check_login_status("admin"):
        print(f"用户 admin 已退出登录。")

    代码说明

    • user_login()函数模拟用户登录验证,验证通过后将登录状态和用户信息缓存到本地,有效期为2小时。
    • check_login_status()函数通过读取缓存中的登录状态键,判断用户是否处于登录状态。
    • user_logout()函数通过删除缓存中的登录状态键,实现用户退出登录功能。
    • 该案例适用于桌面应用、CLI工具等需要保持用户登录状态的场景,无需依赖数据库即可实现状态持久化。

    4.2 案例二:API接口数据缓存

    在调用第三方API接口时,频繁请求会导致接口限流、响应速度慢等问题。使用Squirrel可以将API返回的数据缓存到本地,在有效期内重复请求时直接读取缓存数据,从而提高程序的响应速度,减少对API接口的请求次数。

    from squirrel import Cache
    import requests
    import time
    
    # 初始化文件缓存,存储API接口数据
    api_cache = Cache(backend="file", path="api_cache.db")
    
    def get_weather_data(city: str) -> dict:
        """
        获取城市天气数据,优先读取缓存,缓存失效后调用API接口
        :param city: 城市名称
        :return: 天气数据字典
        """
        # 定义缓存键名
        cache_key = f"weather_{city}"
        # 尝试从缓存中读取数据
        cached_data = api_cache.get(cache_key)
        if cached_data is not None:
            print(f"从缓存中读取{city}的天气数据...")
            return cached_data
    
        # 缓存失效,调用API接口获取数据(这里使用模拟API)
        print(f"调用API获取{city}的天气数据...")
        # 模拟API请求延迟
        time.sleep(2)
        # 模拟API返回的数据
        api_data = {
            "city": city,
            "temperature": 22,
            "weather": "sunny",
            "humidity": 45,
            "update_time": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
        }
    
        # 将API返回的数据缓存到本地,有效期10分钟(600秒)
        api_cache.set(cache_key, api_data, expire=600)
        return api_data
    
    # 第一次调用,缓存未命中,调用API
    weather_beijing = get_weather_data("北京")
    print(f"北京天气: {weather_beijing}")
    
    # 第二次调用,缓存命中,直接读取缓存
    weather_beijing_cached = get_weather_data("北京")
    print(f"北京天气(缓存): {weather_beijing_cached}")

    代码说明

    • get_weather_data()函数实现了“缓存优先”的逻辑,首先尝试从缓存中读取数据,如果缓存存在且未过期,则直接返回缓存数据;如果缓存不存在或已过期,则调用API获取数据,并将数据缓存到本地。
    • 该案例中设置的缓存有效期为10分钟,意味着10分钟内重复调用该函数获取同一城市的天气数据,不会触发API请求,从而减少了API调用次数,提高了程序响应速度。
    • 实际项目中,可以将模拟API替换为真实的天气API接口,例如高德地图天气API、和风天气API等。

    五、Squirrel库相关资源链接

    • Pypi地址:https://pypi.org/project/Squirrel
    • Github地址:https://github.com/xxxxx/xxxxxx
    • 官方文档地址:https://www.xxxxx.com/xxxxxx

    关注我,每天分享一个实用的Python自动化工具。

  • Python实用工具Upgini:零代码特征工程与数据集增强教程

    Python实用工具Upgini:零代码特征工程与数据集增强教程

    一、Upgini库核心概述

    Upgini是一款面向数据科学和机器学习领域的Python自动化特征工程库,核心用途是帮助开发者快速为结构化数据集生成高质量特征、对接公开数据源完成数据增强,从而提升机器学习模型的预测性能。其工作原理是基于用户输入的目标数据集(含时间戳和目标列),通过内置的特征挖掘算法、统计变换以及外部数据源对接能力,自动生成数千个潜在特征,并通过特征重要性评估筛选出最优特征子集。

    该库的优点十分突出:无需手动编写复杂的特征工程代码,支持时间序列数据的特征生成,可无缝对接多个公开数据集,内置特征筛选机制降低冗余;缺点则是对非结构化数据支持有限,部分高级功能需要依赖网络连接获取外部数据,且在超大规模数据集上的运行效率有待优化。Upgini采用Apache License 2.0开源协议,允许商用和二次开发,完全免费且对开发者友好。

    二、Upgini库安装与环境配置

    2.1 基础安装步骤

    Upgini支持通过Python官方包管理工具pip进行安装,兼容Python 3.7及以上版本,建议在虚拟环境中安装以避免依赖冲突。对于技术小白来说,操作步骤非常简单,打开命令行终端,输入以下命令即可完成安装:

    # 基础版安装
    pip install upgini
    
    # 完整版安装(包含所有依赖,推荐新手使用)
    pip install "upgini[full]"

    安装完成后,可通过以下代码验证是否安装成功:

    import upgini
    print(f"Upgini版本:{upgini.__version__}")

    若终端输出对应的版本号(如0.2.15),则说明安装成功;若出现报错,可尝试升级pip后重新安装,升级命令如下:

    pip install --upgrade pip

    2.2 环境依赖说明

    Upgini的运行依赖多个常见的数据科学库,包括pandas(数据处理)、numpy(数值计算)、scikit-learn(特征评估)、lightgbm(默认特征重要性模型)等。安装upgini[full]时会自动安装这些依赖,无需手动下载。对于新手来说,建议使用Anaconda环境,该环境预装了大部分数据科学库,能进一步降低安装难度。

    三、Upgini核心功能与使用示例

    Upgini的核心功能围绕自动化特征生成数据集增强展开,其使用流程遵循“导入数据→配置特征生成器→生成并筛选特征→导出结果”的步骤,全程无需手动编写特征工程代码。以下通过具体案例演示核心功能的使用方法。

    3.1 数据准备:导入示例数据集

    在使用Upgini之前,需要准备一份结构化数据集,数据集需包含目标列(待预测的变量)和时间戳列(可选,用于时间序列特征生成)。本文以经典的房价预测数据集为例,该数据集包含房屋面积、卧室数量、建造年份等基础特征,目标是预测房屋价格。

    首先导入pandas库加载数据集:

    import pandas as pd
    
    # 加载本地房价数据集(新手可直接使用sklearn的示例数据集)
    from sklearn.datasets import fetch_california_housing
    housing = fetch_california_housing()
    df = pd.DataFrame(data=housing.data, columns=housing.feature_names)
    df["MedHouseVal"] = housing.target  # 目标列:房屋中位数价格
    df["timestamp"] = pd.date_range(start="2020-01-01", periods=len(df), freq="D")  # 添加时间戳列
    print(df.head())
    print(f"数据集形状:{df.shape}")

    代码说明:

    • 首先从sklearn导入加利福尼亚房价数据集,该数据集是机器学习领域的经典数据集,包含20640条样本和8个基础特征。
    • 为数据集添加目标列MedHouseVal(房屋中位数价格)和时间戳列timestamp,时间戳列是Upgini生成时间相关特征的关键,例如“近30天平均房屋面积”“年度房价波动”等。
    • 最后打印数据集的前5行和形状,确认数据加载成功。

    3.2 初始化特征生成器

    Upgini的核心类是FeatureEnricher,该类负责特征生成、筛选和增强的全流程。初始化时需要指定目标列名称、时间戳列名称(可选)以及特征筛选的评估指标。代码示例如下:

    from upgini import FeatureEnricher, SearchKey
    from upgini.metadata import CVType
    
    # 初始化特征生成器
    enricher = FeatureEnricher(
        search_keys={
            # 定义时间戳列,用于生成时间相关特征
            "timestamp": SearchKey.DATE
        },
        # 指定目标列
        target_column="MedHouseVal",
        # 交叉验证类型:时间序列交叉验证(适合预测任务)
        cv=CVType.TIME_SERIES,
        # 特征筛选指标:均方根误差(RMSE)
        eval_metric="rmse"
    )

    代码说明:

    • search_keys参数用于定义数据集中的关键列类型,这里将timestamp列指定为日期类型(SearchKey.DATE),Upgini会基于该列生成时间窗口特征。
    • target_column参数指定待预测的目标列,特征生成器会围绕该列筛选对预测最有帮助的特征。
    • cv参数设置交叉验证类型,时间序列任务推荐使用CVType.TIME_SERIES,避免数据泄露;若为普通分类/回归任务,可使用CVType.RANDOM
    • eval_metric参数设置特征筛选的评估指标,回归任务常用rmse(均方根误差)或mae(平均绝对误差),分类任务常用auc(曲线下面积)。

    3.3 自动生成与筛选特征

    初始化特征生成器后,调用fit方法即可基于输入的数据集自动生成特征。该过程会分为三步:首先分析基础特征的分布和相关性,然后生成数千个潜在特征(包括统计特征、时间窗口特征、组合特征等),最后基于指定的评估指标筛选出最优特征子集。代码示例如下:

    # 基于训练集生成特征
    enricher.fit(df, eval_set=[(df, "validation")])

    代码说明:

    • fit方法的第一个参数是训练数据集,第二个参数eval_set指定验证集,这里使用同一数据集作为验证集(新手可直接使用该设置)。
    • 运行该代码后,Upgini会在终端输出特征生成的进度,包括生成的特征总数、筛选后的特征数、以及特征对模型性能的提升幅度。例如:“生成了3250个特征,筛选出120个最优特征,验证集RMSE降低了18.5%”。
    • 特征生成过程的时间取决于数据集大小,小型数据集(万级样本)通常在1-2分钟内完成,大型数据集可能需要更长时间。

    3.4 查看生成的特征与重要性

    特征生成完成后,可通过get_features()方法查看筛选后的特征列表,通过feature_importance_属性查看特征的重要性排名,这有助于开发者理解哪些特征对预测任务最有帮助。代码示例如下:

    # 查看筛选后的特征列表
    generated_features = enricher.get_features()
    print("生成的最优特征列表:")
    print(generated_features[["feature_name", "importance"]].head(10))
    
    # 可视化特征重要性(推荐新手使用)
    import matplotlib.pyplot as plt
    feature_importance = enricher.feature_importance_
    feature_importance = feature_importance.sort_values(by="importance", ascending=False).head(10)
    plt.figure(figsize=(12, 6))
    plt.bar(feature_importance["feature_name"], feature_importance["importance"])
    plt.xticks(rotation=45, ha="right")
    plt.title("Top 10 Most Important Features")
    plt.tight_layout()
    plt.show()

    代码说明:

    • get_features()方法返回一个DataFrame,包含特征名称、重要性得分、数据类型等信息,head(10)用于查看前10个最重要的特征。
    • feature_importance_属性同样返回一个DataFrame,通过排序和可视化可以直观地看到特征的重要性排名,例如“房屋面积的30天滑动平均值”“年度建造房屋数量的增长率”等特征可能会排在前列。
    • 可视化部分使用matplotlib库绘制柱状图,新手若未安装该库,可通过pip install matplotlib命令安装。

    3.5 应用生成的特征增强数据集

    筛选出最优特征后,调用transform方法即可将这些特征添加到原始数据集中,生成增强后的数据集,用于后续的机器学习模型训练。代码示例如下:

    # 生成增强后的数据集
    enhanced_df = enricher.transform(df)
    print(f"原始数据集形状:{df.shape}")
    print(f"增强后数据集形状:{enhanced_df.shape}")
    print("增强后数据集的前5行:")
    print(enhanced_df.head())

    代码说明:

    • transform方法接收原始数据集,返回添加了生成特征的新数据集。例如原始数据集是(20640, 10),增强后可能变成(20640, 130),新增了120个特征。
    • 打印增强后数据集的形状和前5行,可直观看到新增的特征列,这些列的名称通常包含明确的含义,例如“MedInc_rolling_mean_30d”(收入中位数的30天滑动平均值)、“AveRooms_yearly_growth”(平均房间数的年度增长率)等。

    3.6 对接外部数据源增强数据

    Upgini的高级功能之一是对接外部公开数据源(如天气数据、人口统计数据、经济指标数据等),为数据集补充更多维度的特征。该功能需要网络连接,且部分数据源需要注册API密钥(新手可先使用免费数据源)。以下以对接天气数据源为例,演示外部数据增强的方法:

    from upgini import ExternalDataset
    
    # 加载外部天气数据集(示例:美国加州天气数据)
    weather_dataset = ExternalDataset.from_csv(
        "https://example.com/california_weather_2020_2023.csv",  # 外部数据源URL
        search_keys={"date": SearchKey.DATE},  # 外部数据的时间戳列
        features=["temperature", "rainfall", "humidity"]  # 需要提取的特征
    )
    
    # 初始化特征生成器并添加外部数据集
    enricher_with_external = FeatureEnricher(
        search_keys={"timestamp": SearchKey.DATE},
        target_column="MedHouseVal",
        cv=CVType.TIME_SERIES,
        eval_metric="rmse"
    )
    
    # 添加外部数据集
    enricher_with_external.add_external_datasets([weather_dataset])
    
    # 生成包含外部特征的增强数据集
    enhanced_df_with_external = enricher_with_external.fit_transform(df, eval_set=[(df, "validation")])
    print(f"添加外部数据后数据集形状:{enhanced_df_with_external.shape}")

    代码说明:

    • ExternalDataset.from_csv方法用于加载外部CSV格式的数据集,需要指定数据源的URL、关键列类型和需要提取的特征列。
    • add_external_datasets方法将外部数据集添加到特征生成器中,Upgini会自动将外部数据与原始数据按照时间戳列进行关联。
    • fit_transform方法是fittransform的组合,可一次性完成特征生成和数据集增强,添加外部数据后,生成的特征会包含天气相关的维度,例如“温度与房屋面积的乘积”“降雨天数与房价的相关性特征”等,进一步提升模型的预测能力。

    四、Upgini实战案例:房价预测模型性能提升

    为了直观展示Upgini的效果,本文构建一个对比实验:分别使用原始数据集和Upgini增强后的数据集训练LightGBM回归模型,对比模型的预测性能。

    4.1 实验准备:划分训练集与测试集

    首先将增强前后的数据集划分为训练集和测试集,时间序列任务需按照时间顺序划分,避免数据泄露:

    from sklearn.model_selection import train_test_split
    
    # 原始数据集划分
    X_original = df.drop(["MedHouseVal", "timestamp"], axis=1)
    y_original = df["MedHouseVal"]
    X_train_original, X_test_original, y_train_original, y_test_original = train_test_split(
        X_original, y_original, test_size=0.2, shuffle=False  # 时间序列任务shuffle=False
    )
    
    # 增强后数据集划分
    X_enhanced = enhanced_df_with_external.drop(["MedHouseVal", "timestamp"], axis=1)
    y_enhanced = enhanced_df_with_external["MedHouseVal"]
    X_train_enhanced, X_test_enhanced, y_train_enhanced, y_test_enhanced = train_test_split(
        X_enhanced, y_enhanced, test_size=0.2, shuffle=False
    )

    代码说明:

    • 原始数据集删除目标列和时间戳列后作为特征矩阵X_original,目标列作为标签y_original
    • 增强后数据集的处理方式与原始数据集一致,特征矩阵X_enhanced包含原始特征和生成的特征。
    • shuffle=False确保按照时间顺序划分训练集和测试集,符合时间序列预测的业务场景。

    4.2 训练LightGBM模型并评估性能

    分别使用原始特征和增强特征训练LightGBM模型,评估指标采用均方根误差(RMSE)和决定系数(R²),R²越接近1表示模型拟合效果越好:

    import lightgbm as lgb
    from sklearn.metrics import mean_squared_error, r2_score
    
    # 定义模型训练函数
    def train_and_evaluate(X_train, X_test, y_train, y_test, model_name):
        # 初始化LightGBM回归模型
        model = lgb.LGBMRegressor(
            n_estimators=100,
            learning_rate=0.1,
            random_state=42
        )
        # 训练模型
        model.fit(X_train, y_train)
        # 预测测试集
        y_pred = model.predict(X_test)
        # 计算评估指标
        rmse = mean_squared_error(y_test, y_pred, squared=False)
        r2 = r2_score(y_test, y_pred)
        print(f"=== {model_name} ===")
        print(f"测试集RMSE:{rmse:.4f}")
        print(f"测试集R²:{r2:.4f}")
        print("-" * 30)
    
    # 使用原始特征训练模型
    train_and_evaluate(X_train_original, X_test_original, y_train_original, y_test_original, "原始特征模型")
    
    # 使用增强特征训练模型
    train_and_evaluate(X_train_enhanced, X_test_enhanced, y_train_enhanced, y_test_enhanced, "增强特征模型")

    代码说明:

    • 定义train_and_evaluate函数,封装模型训练、预测和评估的流程,方便重复调用。
    • LightGBM模型的参数设置为默认值,确保实验的公平性。
    • 运行该代码后,终端会输出两个模型的评估指标,通常情况下,增强特征模型的RMSE会显著低于原始特征模型,R²会显著高于原始特征模型。例如:原始特征模型的RMSE为0.6523,R²为0.5812;增强特征模型的RMSE为0.4215,R²为0.7986,模型性能提升明显。

    4.3 实验结论分析

    通过对比实验可以发现,Upgini生成的特征能够有效提升机器学习模型的预测性能,原因主要有三点:

    1. 特征维度扩展:生成了大量人工难以想到的特征,覆盖了统计、时间、组合等多个维度,丰富了数据的信息密度。
    2. 特征质量优化:通过内置的筛选机制剔除了冗余特征和噪声特征,保留了对目标列最具预测价值的特征。
    3. 外部数据补充:对接外部数据源后,引入了与业务场景相关的额外信息,进一步提升了模型的泛化能力。

    对于技术小白来说,无需掌握复杂的特征工程理论,仅需几行代码即可实现模型性能的大幅提升,这正是Upgini的核心价值所在。

    五、Upgini常见问题与解决方案

    在使用Upgini的过程中,新手可能会遇到一些常见问题,以下列出对应的解决方案:

    5.1 问题1:特征生成速度过慢

    • 原因:数据集规模过大,或本地计算资源不足。
    • 解决方案
    1. 对数据集进行抽样,使用部分数据进行特征生成,例如df_sample = df.sample(n=10000, random_state=42)
    2. 减少生成特征的数量,通过设置FeatureEnrichermax_features参数,例如max_features=50
    3. 升级硬件配置,增加内存和CPU核心数,或使用GPU加速(需安装对应的LightGBM GPU版本)。

    5.2 问题2:特征生成后模型性能未提升

    • 原因:目标列与特征的相关性较弱,或数据集存在严重的缺失值、异常值。
    • 解决方案
    1. 检查原始数据集的质量,使用df.isnull().sum()查看缺失值,使用df.describe()查看异常值,提前进行数据清洗。
    2. 调整FeatureEnricher的参数,例如更换评估指标(eval_metric)、调整交叉验证类型(cv)。
    3. 添加更多的外部数据源,补充与目标列相关的业务信息。

    5.3 问题3:外部数据源加载失败

    • 原因:网络连接问题,或数据源URL无效、权限不足。
    • 解决方案
    1. 检查网络连接,确保能够正常访问外部数据源的URL。
    2. 下载外部数据集到本地,使用ExternalDataset.from_csv("local_file_path.csv")加载本地文件。
    3. 确认数据源的权限,部分商用数据源需要注册API密钥并在代码中配置。

    六、Upgini相关资源链接

    • Pypi地址:https://pypi.org/project/Upgini
    • Github地址:https://github.com/upgini/upgini
    • 官方文档地址:https://docs.upgini.com

    关注我,每天分享一个实用的Python自动化工具。

  • Python实用工具库excalibur:PDF表格提取与数据处理实战教程

    Python实用工具库excalibur:PDF表格提取与数据处理实战教程

    一、excalibur库核心概述

    excalibur是一款基于Python开发的PDF表格提取与数据处理工具库,其核心工作原理是依托计算机视觉技术与OCR(光学字符识别)算法,对PDF文件中的表格区域进行精准定位、单元格分割与内容提取,最终将提取的表格数据转换为Excel、CSV等易处理的结构化格式。该库的优点在于操作简单、对扫描版PDF兼容性强,支持批量处理多份文件;缺点是对复杂嵌套表格的识别精度有待提升,处理大文件时耗时较长。excalibur采用MIT开源许可证,允许开发者自由使用、修改和分发,无商业使用限制。

    二、excalibur库安装与环境配置

    2.1 安装前置依赖

    excalibur的运行依赖于Tesseract OCR引擎和Poppler PDF处理库,不同操作系统的安装方式有所差异:

    1. Windows系统
      • 安装Tesseract OCR:前往UB-Mannheim/tesseract下载对应版本的安装包,安装时需勾选“Add to PATH”选项,或手动将安装路径(如C:\Program Files\Tesseract-OCR)添加到系统环境变量。
      • 安装Poppler:下载Poppler Windows压缩包,解压后将bin目录路径添加到系统环境变量。
    2. macOS系统
      打开终端,通过Homebrew执行以下命令安装依赖:
      bash brew install tesseract brew install poppler
    3. Linux系统(以Ubuntu为例)
      打开终端,执行以下命令安装依赖:
      bash sudo apt-get update sudo apt-get install tesseract-ocr sudo apt-get install poppler-utils

    2.2 安装excalibur库

    完成依赖安装后,使用pip命令即可安装excalibur库,命令如下:

    pip install excalibur

    安装完成后,可在Python终端中执行以下代码验证安装是否成功:

    import excalibur
    print(f"excalibur库版本:{excalibur.__version__}")

    若终端输出库的版本号,则说明安装成功;若出现“ModuleNotFoundError”,需检查依赖是否安装完整,或重新执行pip安装命令。

    三、excalibur库核心功能与基础用法

    excalibur库的核心功能分为单PDF表格提取批量PDF处理提取结果导出,以下结合代码示例详细讲解每个功能的使用方法。

    3.1 单PDF文件表格提取

    excalibur提取单PDF文件表格的核心步骤为:初始化提取器、加载PDF文件、定位表格区域、提取表格内容。以下是完整代码示例:

    from excalibur.pdf_processing import PDFTableExtractor
    
    # 1. 初始化PDF表格提取器
    extractor = PDFTableExtractor()
    
    # 2. 加载目标PDF文件(替换为你的PDF文件路径)
    pdf_path = "example_table.pdf"
    extractor.load_pdf(pdf_path)
    
    # 3. 定位并提取PDF中的所有表格
    tables = extractor.extract_tables()
    
    # 4. 遍历输出提取的表格内容
    for idx, table in enumerate(tables):
        print(f"===== 提取的第{idx+1}个表格 =====")
        # 打印表格的行数和列数
        print(f"表格行数:{len(table)}, 列数:{len(table[0]) if table else 0}")
        # 打印表格的每一行数据
        for row in table:
            print(row)

    代码说明

    • PDFTableExtractor():初始化表格提取器对象,该对象包含PDF加载、表格定位、内容提取等核心方法。
    • load_pdf(pdf_path):加载指定路径的PDF文件,支持相对路径和绝对路径。
    • extract_tables():自动识别PDF中的所有表格,返回一个列表,列表中的每个元素是一个二维列表,对应一个表格的行和列数据。
    • 最后通过循环遍历提取的表格,输出每个表格的行数、列数和具体内容。

    3.2 自定义表格提取参数

    默认情况下,excalibur会提取PDF中的所有表格,但在实际应用中,我们可能需要提取指定页码的表格,或调整识别精度。以下是自定义参数的代码示例:

    from excalibur.pdf_processing import PDFTableExtractor
    
    # 初始化提取器并设置自定义参数
    extractor = PDFTableExtractor(
        min_confidence=0.7,  # 设置最小识别置信度,低于该值的表格将被过滤
        lang="eng+chi_sim"   # 设置OCR识别语言,支持英文和简体中文
    )
    
    # 加载PDF文件
    pdf_path = "multi_page_table.pdf"
    extractor.load_pdf(pdf_path)
    
    # 提取指定页码的表格(页码从0开始计数,提取第2页和第3页的表格)
    target_pages = [1, 2]
    tables = extractor.extract_tables(pages=target_pages)
    
    # 输出提取结果
    for idx, table in enumerate(tables):
        print(f"第{target_pages[idx]+1}页表格内容:")
        for row in table:
            print(row)

    代码说明

    • min_confidence:设置表格识别的最小置信度,取值范围为0-1,值越高,识别的表格精度越高,但可能会过滤掉部分模糊表格。
    • lang:设置OCR识别的语言,默认值为“eng”,添加“chi_sim”后可支持简体中文识别,需确保Tesseract OCR已安装对应的语言包。
    • pages参数:指定需要提取表格的页码,传入一个整数列表,列表中的元素为页码索引(从0开始)。

    3.3 提取结果导出为Excel/CSV文件

    excalibur支持将提取的表格数据直接导出为Excel或CSV格式,方便后续数据处理。以下是导出功能的代码示例:

    from excalibur.pdf_processing import PDFTableExtractor
    from excalibur.utils import export_tables
    
    # 提取PDF表格
    extractor = PDFTableExtractor()
    extractor.load_pdf("example_table.pdf")
    tables = extractor.extract_tables()
    
    # 1. 导出为Excel文件
    excel_path = "extracted_tables.xlsx"
    export_tables(
        tables=tables,
        output_path=excel_path,
        file_format="xlsx"
    )
    print(f"表格已成功导出到Excel文件:{excel_path}")
    
    # 2. 导出为CSV文件
    csv_path = "extracted_tables.csv"
    export_tables(
        tables=tables,
        output_path=csv_path,
        file_format="csv",
        encoding="utf-8"  # 设置CSV文件编码,避免中文乱码
    )
    print(f"表格已成功导出到CSV文件:{csv_path}")

    代码说明

    • export_tables():excalibur提供的工具函数,用于将提取的表格列表导出为指定格式的文件。
    • file_format参数:可选值为“xlsx”和“csv”,分别对应Excel和CSV格式。
    • encoding参数:仅在导出CSV文件时有效,设置为“utf-8”可解决中文乱码问题。

    四、批量处理多个PDF文件实战

    在实际工作中,我们经常需要处理多个PDF文件,excalibur结合Python的文件操作功能,可轻松实现批量处理。以下是批量提取多个PDF表格并导出的完整案例。

    4.1 批量处理代码实现

    import os
    from excalibur.pdf_processing import PDFTableExtractor
    from excalibur.utils import export_tables
    
    # 定义PDF文件夹路径和输出文件夹路径
    pdf_folder = "pdf_files"  # 存放待处理PDF的文件夹
    output_folder = "extracted_results"  # 存放导出结果的文件夹
    
    # 创建输出文件夹(若不存在)
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)
    
    # 初始化表格提取器
    extractor = PDFTableExtractor(min_confidence=0.6, lang="eng+chi_sim")
    
    # 遍历PDF文件夹中的所有PDF文件
    for filename in os.listdir(pdf_folder):
        if filename.endswith(".pdf"):
            # 拼接PDF文件的完整路径
            pdf_path = os.path.join(pdf_folder, filename)
            print(f"正在处理文件:{filename}")
    
            try:
                # 加载并提取表格
                extractor.load_pdf(pdf_path)
                tables = extractor.extract_tables()
    
                if not tables:
                    print(f"文件{filename}中未检测到表格,跳过导出")
                    continue
    
                # 生成输出文件名(与PDF文件名一致,后缀改为xlsx)
                output_filename = os.path.splitext(filename)[0] + ".xlsx"
                output_path = os.path.join(output_folder, output_filename)
    
                # 导出表格到Excel文件
                export_tables(tables, output_path, file_format="xlsx")
                print(f"文件{filename}处理完成,结果已保存到:{output_path}")
    
            except Exception as e:
                print(f"处理文件{filename}时出错:{str(e)}")
                continue
    
    print("所有PDF文件处理完成!")

    代码说明

    • os.listdir(pdf_folder):遍历指定文件夹中的所有文件,筛选出后缀为“.pdf”的文件进行处理。
    • os.path.splitext(filename)[0]:获取PDF文件的文件名(不含后缀),用于生成对应的Excel文件名。
    • try-except块:捕获处理过程中的异常(如文件损坏、权限不足等),避免单个文件处理失败导致整个批量任务终止。

    4.2 批量处理注意事项

    1. 确保pdf_folder文件夹中仅存放需要处理的PDF文件,避免其他类型文件干扰。
    2. 处理大文件或大量文件时,建议设置合理的min_confidence值,平衡识别精度和处理速度。
    3. 若PDF文件包含加密或权限限制,需先解除限制后再进行处理,否则会抛出“PermissionError”异常。

    五、复杂表格提取优化技巧

    对于嵌套表格、合并单元格表格等复杂结构,excalibur的默认识别效果可能不佳,以下是几种优化技巧,帮助提升复杂表格的提取精度。

    5.1 调整单元格分割阈值

    excalibur通过调整单元格分割阈值,可优化合并单元格的识别效果,代码示例如下:

    from excalibur.pdf_processing import PDFTableExtractor
    
    # 初始化提取器并调整分割阈值
    extractor = PDFTableExtractor(
        cell_split_threshold=0.8,  # 单元格分割阈值,值越高越容易识别合并单元格
        min_confidence=0.6
    )
    
    # 加载包含合并单元格的PDF
    extractor.load_pdf("complex_table.pdf")
    tables = extractor.extract_tables()
    
    # 输出优化后的提取结果
    for table in tables:
        for row in table:
            print(row)

    代码说明

    • cell_split_threshold:单元格分割阈值,取值范围为0-1,值越高,提取器越倾向于将相邻的单元格视为独立单元格,适用于合并单元格较多的表格。

    5.2 结合人工校对修正提取结果

    对于识别误差较大的表格,可通过人工校对修正提取结果,以下是修正数据的代码示例:

    from excalibur.pdf_processing import PDFTableExtractor
    
    # 提取表格
    extractor = PDFTableExtractor()
    extractor.load_pdf("complex_table.pdf")
    tables = extractor.extract_tables()
    
    # 假设第一个表格存在识别误差,手动修正
    corrected_table = []
    for row_idx, row in enumerate(tables[0]):
        corrected_row = row.copy()
        # 修正第2行第3列的数据
        if row_idx == 1:
            corrected_row[2] = "修正后的数据"
        # 修正第4行第1列的数据
        if row_idx == 3:
            corrected_row[0] = "2024-01-01"
        corrected_table.append(corrected_row)
    
    # 替换原表格中的错误数据
    tables[0] = corrected_table
    
    # 导出修正后的表格
    from excalibur.utils import export_tables
    export_tables(tables, "corrected_tables.xlsx", file_format="xlsx")

    代码说明:通过遍历提取的表格数据,定位错误数据的位置并手动修正,再将修正后的表格导出为Excel文件,适用于对数据精度要求较高的场景。

    六、excalibur库常见问题与解决方案

    6.1 OCR识别中文乱码

    问题现象:提取的表格中中文内容显示为乱码或方框。
    解决方案

    1. 确保Tesseract OCR已安装简体中文语言包(下载地址:tesseract-ocr/tessdata),将chi_sim.traineddata文件放入Tesseract OCR的tessdata目录。
    2. 初始化提取器时,设置lang="eng+chi_sim"参数。

    6.2 无法识别扫描版PDF表格

    问题现象:提取扫描版PDF时,返回的表格列表为空。
    解决方案

    1. 检查Tesseract OCR是否安装正确,可在终端执行tesseract --version验证。
    2. 降低min_confidence参数值(如设置为0.5),提高提取器对模糊表格的识别灵敏度。

    6.3 处理大文件时内存溢出

    问题现象:处理几十MB的大PDF文件时,程序抛出“MemoryError”异常。
    解决方案

    1. 分页码提取表格,避免一次性加载整个PDF文件。
    2. 关闭其他占用内存的程序,或增加Python进程的内存限制。

    七、excalibur库相关资源

    • Pypi地址:https://pypi.org/project/excalibur
    • Github地址:https://github.com/xxxxx/xxxxxx
    • 官方文档地址:https://www.xxxxx.com/xxxxxx

    关注我,每天分享一个实用的Python自动化工具。

  • Python实用工具:rows库快速入门与实战指南

    Python实用工具:rows库快速入门与实战指南

    一、rows库核心概述

    rows库是一款轻量级的Python数据处理工具,专注于结构化数据的读取、转换与导出,支持CSV、JSON、HTML、SQLite等多种常见数据格式,无需手动编写格式解析代码即可实现数据的无缝处理。其工作原理是通过统一的Table对象抽象各类数据源,将不同格式的数据转化为一致的内存数据结构,再提供简洁的API完成数据操作。

    该库的优点是API简洁易用、零配置开箱即用、格式兼容性强;缺点是对超大规模数据集的处理效率较低,且高级数据清洗功能需依赖其他库。rows的开源协议为GNU Lesser General Public License v3.0 (LGPLv3),允许自由使用、修改和分发。

    二、rows库安装与环境准备

    2.1 安装方式

    rows库可通过Python官方包管理工具pip直接安装,适用于Python 3.6及以上版本,在命令行中执行以下命令即可完成安装:

    pip install rows

    若需要支持更多数据格式(如Excel、Parquet),可安装扩展依赖包:

    pip install rows[all]

    2.2 验证安装

    安装完成后,可在Python交互式环境中验证是否安装成功,输入以下代码无报错则说明安装正常:

    import rows
    print(rows.__version__)

    执行后会输出当前rows库的版本号,例如0.6.1

    三、rows库核心API与基础用法

    rows库的核心操作围绕数据读取数据操作数据导出三个环节展开,所有操作都基于统一的Table对象,下面分步骤详细讲解。

    3.1 数据读取:从多种格式加载数据

    rows库支持自动识别数据源格式,无需指定格式类型即可读取,以下是常见格式的读取示例。

    3.1.1 读取CSV文件

    首先准备一个示例CSV文件students.csv,内容如下:

    name,age,gender,score
    Alice,18,Female,92
    Bob,19,Male,85
    Charlie,20,Male,78
    Diana,18,Female,95

    使用rows库读取该文件的代码如下:

    import rows
    
    # 读取CSV文件
    table = rows.import_from_csv("students.csv")
    
    # 查看Table对象的字段名
    print(table.field_names)
    # 输出:('name', 'age', 'gender', 'score')
    
    # 遍历数据行
    for row in table:
        print(f"姓名:{row.name},年龄:{row.age},成绩:{row.score}")

    代码说明

    • rows.import_from_csv()函数接收文件路径作为参数,返回一个Table对象。
    • Table对象的field_names属性存储了数据的列名,返回一个元组。
    • 遍历Table对象时,每一个元素都是一个行对象,可通过列名直接访问对应的值。

    3.1.2 读取JSON文件

    准备示例JSON文件students.json,内容如下:

    [
        {"name": "Alice", "age": 18, "gender": "Female", "score": 92},
        {"name": "Bob", "age": 19, "gender": "Male", "score": 85},
        {"name": "Charlie", "age": 20, "gender": "Male", "score": 78},
        {"name": "Diana", "age": 18, "gender": "Female", "score": 95}
    ]

    读取JSON文件的代码与读取CSV类似,仅需替换函数名:

    import rows
    
    # 读取JSON文件
    table = rows.import_from_json("students.json")
    
    # 访问指定行的指定字段
    print(f"第一个学生的成绩:{table[0].score}")
    # 输出:第一个学生的成绩:92

    代码说明

    • rows.import_from_json()专门用于读取JSON格式数据。
    • Table对象支持通过索引访问指定行,与列表的索引用法一致。

    3.1.3 读取HTML表格

    rows库还能直接解析HTML页面中的表格数据,例如有一个students.html文件,内容如下:

    <!DOCTYPE html>
    <html lang="en">
    <head>
        <meta charset="UTF-8">
        <title>学生信息表</title>
    </head>
    <body>
        <table id="student-table">
            <thead>
                <tr>
                    <th>name</th>
                    <th>age</th>
                    <th>gender</th>
                    <th>score</th>
                </tr>
            </thead>
            <tbody>
                <tr>
                    <td>Alice</td>
                    <td>18</td>
                    <td>Female</td>
                    <td>92</td>
                </tr>
                <tr>
                    <td>Bob</td>
                    <td>19</td>
                    <td>Male</td>
                    <td>85</td>
                </tr>
            </tbody>
        </table>
    </body>
    </html>

    读取HTML表格的代码如下:

    import rows
    
    # 读取HTML文件中的表格
    table = rows.import_from_html("students.html")
    
    # 查看数据行数
    print(f"表格共有 {len(table)} 行数据")
    # 输出:表格共有 2 行数据

    代码说明

    • rows.import_from_html()默认读取HTML中第一个表格,若需读取指定表格,可通过idclass筛选,例如rows.import_from_html("students.html", id_="student-table")

    3.2 数据操作:筛选、排序与转换

    获取Table对象后,可对数据进行筛选、排序等操作,rows库支持原生Python语法结合自身API完成这些操作。

    3.2.1 数据筛选

    筛选出成绩大于90分的学生,代码如下:

    import rows
    
    # 读取CSV数据
    table = rows.import_from_csv("students.csv")
    
    # 筛选成绩>90的学生
    high_score_students = [row for row in table if row.score > 90]
    
    # 输出筛选结果
    for student in high_score_students:
        print(f"高分学生:{student.name},成绩:{student.score}")

    代码说明

    • 利用Python列表推导式遍历Table对象,结合条件判断实现数据筛选。
    • 筛选结果是一个包含行对象的列表,可直接遍历访问字段。

    3.2.2 数据排序

    对学生数据按年龄升序排列,代码如下:

    import rows
    
    table = rows.import_from_csv("students.csv")
    
    # 按年龄升序排序
    sorted_table = rows.sort(table, key="age")
    
    # 输出排序后的结果
    for row in sorted_table:
        print(f"姓名:{row.name},年龄:{row.age}")

    代码说明

    • rows.sort()函数接收两个关键参数:table为待排序的Table对象,key为排序依据的字段名。
    • 若需降序排序,可添加reverse=True参数,例如rows.sort(table, key="score", reverse=True)

    3.2.3 数据转换

    Table对象转换为Python字典列表,方便与其他库(如pandas)配合使用,代码如下:

    import rows
    
    table = rows.import_from_csv("students.csv")
    
    # 转换为字典列表
    dict_list = rows.export_to_dicts(table)
    
    print(dict_list)
    # 输出:[{'name': 'Alice', 'age': 18, 'gender': 'Female', 'score': 92}, ...]

    代码说明

    • rows.export_to_dicts()函数将Table对象转换为字典列表,每个字典的键为字段名,值为对应的数据。

    3.3 数据导出:保存为多种格式

    处理完成的数据可通过rows库导出为CSV、JSON、SQLite等格式,满足不同场景的需求。

    3.3.1 导出为CSV文件

    将筛选后的高分学生数据导出为新的CSV文件,代码如下:

    import rows
    
    table = rows.import_from_csv("students.csv")
    high_score_students = [row for row in table if row.score > 90]
    
    # 将列表转换为Table对象
    new_table = rows.Table(rows.fields_from_table(table), high_score_students)
    
    # 导出为CSV文件
    rows.export_to_csv(new_table, "high_score_students.csv")

    代码说明

    • 先通过列表推导式筛选数据,再用rows.Table()将列表转换为Table对象,其中rows.fields_from_table(table)用于获取原表的字段结构。
    • rows.export_to_csv()接收Table对象和目标文件路径,完成导出。

    3.3.2 导出为JSON文件

    将排序后的数据导出为JSON文件,代码如下:

    import rows
    
    table = rows.import_from_csv("students.csv")
    sorted_table = rows.sort(table, key="score", reverse=True)
    
    # 导出为JSON文件
    rows.export_to_json(sorted_table, "sorted_students.json")

    代码说明

    • rows.export_to_json()函数的用法与导出CSV类似,直接传入Table对象和目标路径即可。

    3.3.3 导出为SQLite数据库

    将学生数据导出为SQLite数据库表,方便后续的数据库操作,代码如下:

    import rows
    
    table = rows.import_from_csv("students.csv")
    
    # 导出为SQLite数据库,表名为students
    rows.export_to_sqlite(table, "students.db", table_name="students")

    代码说明

    • rows.export_to_sqlite()函数接收三个参数:Table对象、数据库文件路径、数据库表名。
    • 执行后会生成一个students.db文件,可使用SQLite工具或Python的sqlite3库连接查询。

    四、rows库实战案例:多格式数据整合分析

    本案例模拟一个实际场景:从CSV、JSON、HTML三种不同格式的文件中读取学生数据,合并后进行统一分析,最后导出为SQLite数据库。

    4.1 准备数据源

    1. CSV数据源students_1.csv
      csv name,age,gender,score Eve,20,Female,88 Frank,19,Male,90
    2. JSON数据源students_2.json
      json [ {"name": "Grace", "age": 18, "gender": "Female", "score": 93}, {"name": "Henry", "age": 21, "gender": "Male", "score": 82} ]
    3. HTML数据源students_3.html
      html ¨K51K

    4.2 数据读取与合并

    编写代码读取三个数据源并合并为一个Table对象:

    import rows
    
    # 读取不同格式的数据源
    table_csv = rows.import_from_csv("students_1.csv")
    table_json = rows.import_from_json("students_2.json")
    table_html = rows.import_from_html("students_3.html")
    
    # 合并所有数据行
    all_rows = list(table_csv) + list(table_json) + list(table_html)
    
    # 创建合并后的Table对象
    merged_table = rows.Table(rows.fields_from_table(table_csv), all_rows)
    
    # 查看合并后的数据行数
    print(f"合并后共有 {len(merged_table)} 条学生数据")

    代码说明

    • 分别读取三种格式的数据,得到三个独立的Table对象。
    • 将每个Table对象转换为列表,再通过列表相加实现数据行合并。
    • 利用第一个表的字段结构创建新的Table对象,确保合并后的数据结构一致。

    4.3 数据分析与处理

    对合并后的数据进行以下分析:

    1. 计算所有学生的平均成绩
    2. 筛选出年龄小于20岁的学生
    3. 按成绩降序排序
    import rows
    
    # 延续上一步的merged_table
    # 1. 计算平均成绩
    total_score = sum(row.score for row in merged_table)
    average_score = total_score / len(merged_table)
    print(f"所有学生的平均成绩:{average_score:.2f}")
    
    # 2. 筛选年龄小于20岁的学生
    young_students = [row for row in merged_table if row.age < 20]
    young_table = rows.Table(rows.fields_from_table(merged_table), young_students)
    print(f"年龄小于20岁的学生共有 {len(young_table)} 人")
    
    # 3. 按成绩降序排序
    sorted_table = rows.sort(merged_table, key="score", reverse=True)
    print("成绩排名前三的学生:")
    for i in range(3):
        print(f"第{i+1}名:{sorted_table[i].name},成绩:{sorted_table[i].score}")

    代码说明

    • 利用生成器表达式计算成绩总和,再除以数据行数得到平均成绩。
    • 筛选年龄小于20岁的学生后,转换为Table对象以便后续导出。
    • 按成绩降序排序后,通过索引获取前三名学生的信息。

    4.4 结果导出

    将排序后的完整数据导出为SQLite数据库,将年龄小于20岁的学生数据导出为CSV文件:

    import rows
    
    # 延续上一步的sorted_table和young_table
    # 导出排序后的数据到SQLite
    rows.export_to_sqlite(sorted_table, "merged_students.db", table_name="all_students")
    
    # 导出年轻学生数据到CSV
    rows.export_to_csv(young_table, "young_students.csv")
    
    print("数据导出完成!")

    代码说明

    • 排序后的完整数据存入SQLite数据库,方便后续的查询和管理。
    • 年轻学生数据导出为CSV文件,便于快速查看和分享。

    五、rows库常见问题与解决方案

    5.1 数据类型自动识别错误

    问题:读取CSV文件时,数字字段被识别为字符串类型,导致无法进行数值计算。
    解决方案:使用rows.transform函数手动指定字段类型,示例代码如下:

    import rows
    from rows.fields import IntegerField, FloatField
    
    # 定义字段类型
    class StudentTable(rows.Table):
        name = rows.fields.TextField
        age = IntegerField
        gender = rows.fields.TextField
        score = FloatField
    
    # 读取CSV并应用字段类型
    table = rows.import_from_csv("students.csv", force_types=StudentTable)
    
    # 验证类型
    print(type(table[0].score))  # 输出:<class 'float'>

    5.2 不支持的文件格式

    问题:尝试读取Excel文件时,提示“未找到对应的导入函数”。
    解决方案:安装扩展依赖rows[excel],然后使用rows.import_from_xlsx()函数读取,示例代码如下:

    pip install rows[excel]
    import rows
    table = rows.import_from_xlsx("students.xlsx")

    5.3 大规模数据处理效率低

    问题:读取超大CSV文件时,内存占用过高,程序运行缓慢。
    解决方案:rows库不适合处理超大规模数据,建议结合pandas库使用,先用rows读取数据转换为字典列表,再传入pandas的DataFrame

    import rows
    import pandas as pd
    
    table = rows.import_from_csv("large_students.csv")
    df = pd.DataFrame(rows.export_to_dicts(table))
    # 使用pandas进行高效处理

    六、rows库相关资源

    • PyPI地址:https://pypi.org/project/rows
    • Github地址:https://github.com/turicas/rows
    • 官方文档地址:https://rows.readthedocs.io/

    关注我,每天分享一个实用的Python自动化工具。

  • Python实用工具:Camelot库——轻松提取PDF表格数据的完整指南

    Python实用工具:Camelot库——轻松提取PDF表格数据的完整指南

    一、Camelot库核心概述

    Camelot是一款专为从PDF文件中精确提取表格数据而生的Python库,它能将PDF里的表格转换为Pandas DataFrame或CSV、JSON等格式,极大降低了PDF表格数据处理的门槛。其工作原理是通过两种核心算法(Lattice和Stream)识别表格:Lattice适用于有清晰边框线的表格,通过检测线条来定位单元格;Stream适用于无边框表格,依靠文本的位置和间距来划分单元格。

    该库的优点十分突出:提取精度高、支持自定义配置、输出格式灵活、完全免费开源;缺点则是对扫描版PDF(图片型PDF)无效,仅支持文本型PDF,且对复杂嵌套表格的处理能力有限。Camelot采用MIT License开源协议,允许开发者自由使用、修改和分发,无商业使用限制。

    二、Camelot库安装步骤

    Camelot的安装分为基础安装和依赖补充两个部分,因为它依赖于Ghostscript等第三方工具,不同操作系统的安装流程略有差异,以下是详细的安装指南。

    2.1 安装Ghostscript依赖

    Ghostscript是Camelot识别PDF表格的核心依赖,必须优先安装。

    1. Windows系统
      访问Ghostscript官方下载地址(https://www.ghostscript.com/releases/gsdnld.html),下载对应版本的安装包,按照安装向导完成安装。安装完成后,需要将Ghostscript的可执行文件路径添加到系统环境变量中,例如默认路径为C:\Program Files\gs\gs10.02.1\bin
    2. macOS系统
      使用Homebrew包管理器执行以下命令安装:
      bash brew install ghostscript
    3. Linux系统(Ubuntu/Debian)
      执行apt-get命令安装:
      bash sudo apt-get install ghostscript

    2.2 安装Camelot库

    完成Ghostscript安装后,通过pip命令即可安装Camelot库,建议使用Python3.6及以上版本:

    pip install camelot-py[cv]

    这里的[cv]表示安装包含OpenCV依赖的完整版,OpenCV有助于提升表格识别的准确率。安装完成后,可以在Python环境中执行以下代码验证是否安装成功:

    import camelot
    print(camelot.__version__)

    如果代码能正常输出Camelot的版本号,说明安装成功。

    三、Camelot库核心用法与代码实例

    Camelot的核心操作流程是读取PDF文件→配置提取参数→提取表格→导出/处理数据,下面将详细讲解两种核心提取算法的使用方法,并结合代码实例进行演示。

    3.1 核心概念:Lattice与Stream算法

    在使用Camelot提取表格前,需要先明确PDF表格的类型,从而选择对应的算法:

    • Lattice算法:默认算法,适用于有明确边框线的表格,例如Excel导出的PDF表格、财务报表等。该算法通过检测表格的竖线和横线来确定单元格的边界,提取精度极高。
    • Stream算法:适用于无边框线的表格,例如纯文本排版的表格、网页导出的无框PDF表格等。该算法通过分析文本块的位置、间距和对齐方式,来推断表格的结构。

    3.2 基础用法:提取单页PDF表格

    首先准备一个测试用的PDF文件(例如test_table.pdf),该文件的第1页包含一个有边框的表格。下面的代码将演示如何使用Lattice算法提取该表格。

    3.2.1 代码实例:Lattice算法提取有边框表格

    import camelot
    
    # 读取PDF文件,指定提取第1页的表格,使用Lattice算法
    tables = camelot.read_pdf(
        'test_table.pdf',  # PDF文件路径
        pages='1',         # 指定提取的页码,支持多页如'1,3,5'或范围'1-5'
        flavor='lattice'   # 指定提取算法为lattice
    )
    
    # 打印提取到的表格数量
    print(f"提取到的表格数量:{len(tables)}")
    
    # 查看第一个表格的基本信息
    print("第一个表格的基本信息:")
    print(tables[0].parsing_report)  # 输出解析报告,包含精度、页数等信息
    
    # 将表格转换为Pandas DataFrame
    df = tables[0].df
    print("\n表格数据(DataFrame格式):")
    print(df)
    
    # 将表格导出为CSV文件
    tables[0].to_csv('extracted_table.csv')
    print("\n表格已导出为extracted_table.csv")
    
    # 将表格导出为JSON文件
    tables[0].to_json('extracted_table.json')
    print("表格已导出为extracted_table.json")

    3.2.2 代码说明

    • camelot.read_pdf()是核心函数,用于读取PDF并提取表格,返回一个TableList对象,包含所有提取到的表格。
    • pages参数用于指定提取的页码,支持单页、多页和页码范围,例如pages='1-3'表示提取第1到3页的表格。
    • flavor参数指定算法类型,lattice为默认值,适用于有边框表格。
    • tables[0].parsing_report会输出解析报告,包含accuracy(提取精度)、whitespace(空白占比)、page(页码)等信息,精度越高说明提取效果越好。
    • tables[0].df将表格转换为Pandas DataFrame,方便后续的数据清洗和分析。
    • to_csv()to_json()方法可以将表格导出为对应的文件格式,便于分享和存储。

    3.2.3 代码实例:Stream算法提取无边框表格

    如果需要提取的PDF表格没有边框线,就需要使用stream算法,同时可以通过table_regions参数指定表格所在的区域,提升提取精度。

    import camelot
    
    # 读取PDF文件,使用Stream算法提取无边框表格
    tables = camelot.read_pdf(
        'test_no_border_table.pdf',
        pages='1',
        flavor='stream',
        table_regions=['20, 700, 500, 300']  # 指定表格的坐标区域:x1, y1, x2, y2
    )
    
    # 输出解析报告
    print("解析报告:")
    print(tables[0].parsing_report)
    
    # 查看表格数据
    df = tables[0].df
    print("\n无边框表格数据:")
    print(df)
    
    # 导出为Excel文件(需要安装openpyxl库)
    tables[0].to_excel('extracted_no_border_table.xlsx', index=False)
    print("\n无边框表格已导出为extracted_no_border_table.xlsx")

    3.2.4 代码说明

    • flavor='stream'指定使用Stream算法,适用于无边框表格。
    • table_regions参数的作用是限定表格的提取区域,坐标格式为[x1, y1, x2, y2],其中(x1, y1)是区域的左上角坐标,(x2, y2)是右下角坐标。该参数可以避免PDF中的其他文本干扰表格提取,大幅提升准确率。
    • to_excel()方法可以将表格导出为Excel文件,使用前需要安装openpyxl库,执行pip install openpyxl即可。

    3.3 高级用法:自定义提取参数

    Camelot提供了丰富的自定义参数,用于处理复杂的PDF表格,例如合并单元格、调整列间距、过滤空白行等。下面的代码将演示如何使用这些参数优化提取效果。

    3.3.1 代码实例:处理合并单元格与空白行

    import camelot
    
    # 读取包含合并单元格的PDF表格
    tables = camelot.read_pdf(
        'test_merge_cells.pdf',
        pages='1',
        flavor='lattice',
        strip_text='\n',  # 去除单元格内的换行符
        suppress_stdout=True  # 抑制控制台输出的冗余信息
    )
    
    # 查看原始提取的表格数据
    print("原始提取数据(含合并单元格):")
    print(tables[0].df)
    
    # 处理合并单元格:通过DataFrame的fillna方法填充合并单元格的内容
    df = tables[0].df
    # 向前填充空值,适用于垂直合并的单元格
    df = df.fillna(method='ffill', axis=0)
    # 向左填充空值,适用于水平合并的单元格
    df = df.fillna(method='ffill', axis=1)
    
    print("\n处理合并单元格后的数据:")
    print(df)
    
    # 过滤空白行:删除所有元素均为空的行
    df = df.dropna(how='all')
    print("\n过滤空白行后的数据:")
    print(df)

    3.3.2 代码说明

    • strip_text='\n'参数用于去除单元格内的换行符,使文本内容更整洁。
    • suppress_stdout=True参数可以抑制Camelot在控制台输出的冗余日志信息,让输出更简洁。
    • 合并单元格在提取后会表现为NaN值,通过Pandas的fillna()方法,使用ffill(向前填充)策略,可以快速填充合并单元格的内容,还原表格的真实结构。
    • dropna(how='all')方法用于删除所有元素均为空的行,适用于清理包含大量空白行的表格数据。

    3.3.3 代码实例:提取多页PDF中的所有表格

    如果PDF文件包含多个页面,且每个页面都有表格,可以通过pages='all'参数提取所有页面的表格,并批量导出。

    import camelot
    import os
    
    # 创建输出目录
    output_dir = 'multi_page_tables'
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    
    # 提取多页PDF中的所有表格
    tables = camelot.read_pdf(
        'multi_page_test.pdf',
        pages='all',
        flavor='lattice'
    )
    
    print(f"共提取到 {len(tables)} 个表格")
    
    # 批量导出所有表格为CSV文件
    for i, table in enumerate(tables):
        table.to_csv(os.path.join(output_dir, f'table_{i+1}.csv'))
        print(f"表格 {i+1} 已导出到 {output_dir}/table_{i+1}.csv")

    3.3.4 代码说明

    • pages='all'参数表示提取PDF文件中所有页面的表格,无需手动指定页码。
    • 通过enumerate()遍历TableList对象中的每个表格,批量导出为CSV文件,并按序号命名,方便管理。
    • 使用os.makedirs()创建输出目录,避免因目录不存在导致的导出失败。

    四、实际应用案例:PDF财务报表数据提取与分析

    下面将结合一个实际的应用场景——提取PDF格式的财务报表中的利润表数据,并进行简单的数据分析,演示Camelot库在实际工作中的使用价值。

    4.1 案例背景

    假设我们有一份名为2024_financial_report.pdf的PDF文件,其中第3页是公司的利润表,表格为有边框格式,包含“项目”“2023年”“2024年”三列数据。我们需要提取该表格数据,并分析2024年相较于2023年的营收变化情况。

    4.2 代码实例:数据提取与分析

    import camelot
    import pandas as pd
    import matplotlib.pyplot as plt
    
    # 步骤1:提取PDF中的利润表数据
    tables = camelot.read_pdf(
        '2024_financial_report.pdf',
        pages='3',
        flavor='lattice',
        strip_text='\n'
    )
    
    # 步骤2:转换为DataFrame并清洗数据
    profit_df = tables[0].df
    # 设置列名:假设表格第一行是表头
    profit_df.columns = profit_df.iloc[0]
    profit_df = profit_df.drop(0, axis=0)
    # 重置索引
    profit_df = profit_df.reset_index(drop=True)
    # 过滤掉空行
    profit_df = profit_df.dropna(how='all')
    
    print("清洗后的利润表数据:")
    print(profit_df)
    
    # 步骤3:数据类型转换(将金额列转换为数值类型)
    # 假设金额列名为“2023年”和“2024年”
    profit_df['2023年'] = pd.to_numeric(profit_df['2023年'], errors='coerce')
    profit_df['2024年'] = pd.to_numeric(profit_df['2024年'], errors='coerce')
    
    # 步骤4:分析营收变化——筛选“营业收入”行
    revenue_row = profit_df[profit_df['项目'].str.contains('营业收入', na=False)]
    if not revenue_row.empty:
        revenue_2023 = revenue_row['2023年'].values[0]
        revenue_2024 = revenue_row['2024年'].values[0]
        revenue_growth = (revenue_2024 - revenue_2023) / revenue_2023 * 100
    
        print(f"\n2023年营业收入:{revenue_2023:.2f} 万元")
        print(f"2024年营业收入:{revenue_2024:.2f} 万元")
        print(f"营业收入增长率:{revenue_growth:.2f}%")
    
        # 步骤5:可视化营收变化
        plt.rcParams['font.sans-serif'] = ['SimHei']  # 解决中文显示问题
        plt.figure(figsize=(8, 5))
        years = ['2023年', '2024年']
        revenues = [revenue_2023, revenue_2024]
        plt.bar(years, revenues, color=['#3498db', '#e74c3c'])
        plt.title('2023-2024年营业收入对比')
        plt.ylabel('营业收入(万元)')
        for i, v in enumerate(revenues):
            plt.text(i, v + 100, f'{v:.2f}', ha='center')
        plt.savefig('revenue_comparison.png', dpi=300, bbox_inches='tight')
        plt.show()
    else:
        print("\n未找到营业收入相关数据")

    4.3 案例说明

    1. 数据提取:使用Lattice算法提取PDF第3页的利润表数据,通过strip_text='\n'清理单元格内的换行符。
    2. 数据清洗:将表格的第一行设为列名,删除表头行和空行,确保数据结构整洁。
    3. 类型转换:将金额列从字符串类型转换为数值类型,以便进行数学计算,errors='coerce'参数可以将无法转换的值设为NaN
    4. 数据分析:通过筛选包含“营业收入”的行,计算2024年相较于2023年的营收增长率。
    5. 数据可视化:使用Matplotlib绘制柱状图,直观展示两年的营业收入对比情况,并解决中文显示问题。

    这个案例充分体现了Camelot库在实际工作中的价值——从PDF中快速提取结构化数据,结合Pandas和Matplotlib完成数据分析与可视化,大大提升了工作效率。

    五、Camelot库常见问题与解决方案

    在使用Camelot的过程中,可能会遇到一些常见问题,下面列出了这些问题的解决方案:

    1. 问题1:提取到的表格为空或不完整
      • 解决方案:检查PDF是否为文本型PDF(扫描版PDF无法提取);使用table_regions参数指定表格区域;尝试切换latticestream算法;调整edge_tol参数(边缘容差)或row_tol参数(行容差)。
    2. 问题2:报错“Ghostscript not found”
      • 解决方案:确认Ghostscript已正确安装,并将其路径添加到系统环境变量中;重启Python环境后重试。
    3. 问题3:合并单元格处理不彻底
      • 解决方案:提取数据后,使用Pandas的fillna()方法手动填充NaN值;对于复杂的合并单元格,可以结合df.replace()方法进行处理。
    4. 问题4:多页PDF提取效率低
      • 解决方案:避免使用pages='all'提取不必要的页面,手动指定需要提取的页码;关闭suppress_stdout=False查看详细日志,定位耗时较长的页面。

    六、相关资源链接

    • Pypi地址:https://pypi.org/project/camelot-py
    • Github地址:https://github.com/camelot-dev/camelot
    • 官方文档地址:https://camelot-py.readthedocs.io/en/master/

    关注我,每天分享一个实用的Python自动化工具。

  • Python实用工具:deepdish库使用教程

    Python实用工具:deepdish库使用教程

    import deepdish as dd
    import numpy as np
    from sklearn.linear_model import SGDClassifier

    初始化随机梯度下降分类器(支持增量训练)

    model = SGDClassifier(loss=”log_loss”, max_iter=100, random_state=42, warm_start=True)

    分块加载训练集数据,每块 1000 个样本

    chunk_size = 1000
    print(“开始分块训练模型…”)
    for chunk in dd.iterate(“image_classification_dataset.h5″, group=”train.features”, chunks=(chunk_size, 784)):
    # 获取对应块的标签
    chunk_labels = dd.io.load(“image_classification_dataset.h5″, group=”train.labels”)[chunk.index[0]:chunk.index[0]+chunk_size]
    # 增量训练模型
    model.partial_fit(chunk, chunk_labels, classes=np.arange(10))

    在测试集上评估模型

    test_data = dd.io.load(“image_classification_dataset.h5″, group=”test”)
    test_pred = model.predict(test_data[“features”])
    test_accuracy = accuracy_score(test_data[“labels”], test_pred)
    print(f”分块训练后测试集准确率: {test_accuracy:.4f}”)

    关注我,每天分享一个实用的Python自动化工具。

  • Python实用工具img2dataset:大规模图像数据集高效构建指南

    Python实用工具img2dataset:大规模图像数据集高效构建指南

    一、img2dataset库核心概述

    img2dataset是一款专为大规模图像数据集构建设计的Python工具库,其核心用途是从图像URL列表中批量下载、处理并存储图像数据,广泛应用于计算机视觉领域的模型训练数据准备工作。该库的工作原理是通过多线程/多进程并行处理URL队列,支持断点续传、图像格式转换、分辨率调整等功能,同时能够生成配套的元数据文件,便于后续的数据管理与模型训练。

    在优缺点方面,img2dataset的优势十分突出:一是并行处理机制大幅提升下载效率,能够轻松应对百万级以上的URL列表;二是支持多种输出格式(如webdataset、files、parquet等),适配不同的训练框架需求;三是内置图像过滤功能,可自动剔除损坏、低分辨率的无效图像。其缺点主要在于对网络环境要求较高,大规模下载时容易受带宽限制,且部分高级功能需要依赖额外的第三方库。该库采用Apache License 2.0开源协议,允许商用与二次开发,完全满足开发者的使用需求。

    二、img2dataset安装方法

    img2dataset支持通过Python包管理工具pip直接安装,同时也可以从GitHub源码编译安装,两种方式分别适用于不同的使用场景,以下是详细的安装步骤。

    2.1 pip快速安装

    这是最简便的安装方式,适用于大多数用户,只需在命令行中执行以下命令即可完成安装:

    pip install img2dataset

    安装完成后,可通过以下Python代码验证是否安装成功:

    import img2dataset
    print(f"img2dataset版本:{img2dataset.__version__}")

    运行上述代码,如果控制台输出对应的版本号,说明安装成功;若出现ModuleNotFoundError,则需要检查pip环境是否配置正确,或尝试升级pip后重新安装。

    2.2 源码编译安装

    如果需要使用最新的开发版本,或者对源码进行自定义修改,可以选择从GitHub克隆源码并编译安装,步骤如下:

    1. 克隆GitHub仓库
    git clone https://github.com/rom1504/img2dataset.git
    cd img2dataset
    1. 安装依赖并编译
    pip install -r requirements.txt
    pip install -e .

    这种安装方式的优势在于可以随时通过git pull获取最新的功能更新,适合对功能有定制化需求的开发者。

    三、img2dataset基础使用教程

    img2dataset的使用方式分为命令行调用Python脚本调用两种,其中脚本调用的灵活性更高,便于嵌入到自动化数据处理流程中。本节将以Python脚本调用为主,结合实例讲解核心功能的使用方法。

    3.1 核心参数说明

    在使用img2dataset之前,需要先了解其核心参数的含义,这些参数决定了数据下载与处理的行为,关键参数如下表所示:

    | 参数名称 | 数据类型 | 作用说明 | 默认值 |
    |-|-|-|–|
    | url_list | str | 存储图像URL的文件路径或文本内容 | 无(必填) |
    | output_format | str | 输出格式,可选webdataset/files/parquet等 | webdataset |
    | output_folder | str | 输出文件的存储目录 | dataset |
    | thread_count | int | 并行下载的线程数 | 256 |
    | image_size | int | 图像缩放后的目标分辨率 | 256 |
    | resize_only_if_bigger | bool | 是否仅当原图大于目标分辨率时才缩放 | True |
    | skip_reencode | bool | 是否跳过图像重新编码 | True |
    | save_additional_columns | list | 需要保存的额外元数据列 | [] |

    3.2 从URL列表下载图像(基础实例)

    本实例将演示如何从一个包含图像URL的文本文件中批量下载图像,并保存为webdataset格式。

    步骤1:准备URL列表文件

    首先创建一个名为urls.txt的文本文件,每行存储一个图像URL和对应的元数据(如标签),格式如下:

    https://example.com/image1.jpg label1
    https://example.com/image2.jpg label2
    https://example.com/image3.jpg label3

    其中,URL与元数据之间用空格分隔,元数据可以根据需求添加多列。

    步骤2:编写Python下载脚本

    创建名为download_images.py的Python文件,代码如下:

    from img2dataset import download
    
    # 配置下载参数
    params = {
        "url_list": "urls.txt",  # URL列表文件路径
        "output_folder": "my_image_dataset",  # 输出目录
        "output_format": "webdataset",  # 输出格式
        "thread_count": 128,  # 并行线程数,根据机器性能调整
        "image_size": 512,  # 图像缩放至512x512
        "resize_only_if_bigger": True,  # 仅缩放大于512的图像
        "skip_reencode": False,  # 重新编码为JPEG格式
        "save_additional_columns": ["label"],  # 保存标签列作为元数据
        "number_sample_per_shard": 1000,  # 每个分片存储1000张图像
        "retries": 3,  # 下载失败时重试次数
    }
    
    # 执行下载任务
    download(**params)

    步骤3:运行脚本并查看结果

    在命令行中执行以下命令运行脚本:

    python download_images.py

    脚本运行后,会在当前目录下生成my_image_dataset文件夹,结构如下:

    my_image_dataset/
    ├── 00000.tar
    ├── 00001.tar
    └── ...

    每个.tar文件是一个数据分片,包含1000张图像及其元数据,可直接用于PyTorch、TensorFlow等框架的模型训练。

    3.3 直接使用URL列表字符串(进阶实例)

    除了从文件读取URL列表,还可以直接将URL列表以字符串的形式传入参数,适用于动态生成URL的场景,代码示例如下:

    from img2dataset import download
    
    # 动态生成URL列表字符串
    url_str = """https://example.com/img1.jpg cat
    https://example.com/img2.jpg dog
    https://example.com/img3.jpg bird
    """
    
    # 配置参数
    params = {
        "url_list": url_str,  # 直接传入URL字符串
        "output_folder": "dynamic_dataset",
        "output_format": "files",  # 以单个文件形式存储
        "image_size": 256,
        "thread_count": 64,
    }
    
    # 执行下载
    download(**params)

    该脚本运行后,dynamic_dataset文件夹下会按类别生成子文件夹,并存储对应的图像文件,适合需要人工查看图像的场景。

    3.4 图像过滤与质量控制

    img2dataset内置了图像质量过滤功能,可以自动剔除无效图像,例如损坏的文件、分辨率过低的图像等。以下是添加过滤条件的脚本示例:

    from img2dataset import download
    
    params = {
        "url_list": "urls.txt",
        "output_folder": "filtered_dataset",
        "output_format": "parquet",
        "image_size": 384,
        "min_image_size": 128,  # 剔除宽度或高度小于128的图像
        "max_image_area": 1000000,  # 剔除面积超过100万像素的图像
        "timeout": 10,  # 下载超时时间(秒)
        "verify_hash": False,  # 是否验证图像哈希值
        "skip_downloaded": True,  # 跳过已下载的图像(断点续传)
    }
    
    download(**params)

    通过设置min_image_sizemax_image_area参数,可以精准控制保留的图像质量,避免低质量数据影响模型训练效果。

    四、img2dataset高级应用案例

    本节将结合实际应用场景,讲解img2dataset的高级用法,包括与其他数据处理库的结合、大规模分布式下载等。

    4.1 与Pandas结合处理元数据

    在实际项目中,图像的元数据通常存储在CSV文件中,我们可以使用Pandas读取CSV文件,提取URL和元数据,再传递给img2dataset进行下载。以下是完整的案例代码:

    import pandas as pd
    from img2dataset import download
    
    # 1. 使用Pandas读取CSV元数据文件
    df = pd.read_csv("metadata.csv")
    # 假设CSV包含列:url, label, category
    print(f"元数据文件共包含 {len(df)} 条记录")
    
    # 2. 将DataFrame转换为img2dataset支持的URL字符串格式
    url_list = []
    for idx, row in df.iterrows():
        url = row["url"]
        label = row["label"]
        category = row["category"]
        # 格式:URL 标签 类别
        url_list.append(f"{url} {label} {category}")
    url_str = "\n".join(url_list)
    
    # 3. 配置下载参数
    params = {
        "url_list": url_str,
        "output_folder": "pandas_dataset",
        "output_format": "webdataset",
        "thread_count": 256,
        "image_size": 512,
        "save_additional_columns": ["label", "category"],  # 保存多列元数据
    }
    
    # 4. 执行下载
    download(**params)

    该案例适用于元数据较为复杂的场景,通过Pandas可以灵活地筛选、清洗元数据,再传递给img2dataset进行批量下载。

    4.2 分布式大规模数据集下载

    当需要处理千万级以上的URL列表时,单台机器的性能可能无法满足需求,此时可以使用img2dataset的分布式下载功能,借助多台机器并行处理任务。核心思路是将URL列表分割为多个分片,分配给不同的机器分别下载,最后合并结果。

    步骤1:分割URL列表

    使用以下Python代码将大型URL文件分割为多个小文件:

    def split_url_file(input_file, chunk_size=100000):
        """
        将URL文件分割为多个分片
        :param input_file: 输入URL文件路径
        :param chunk_size: 每个分片的记录数
        """
        with open(input_file, "r", encoding="utf-8") as f:
            lines = f.readlines()
    
        total_chunks = (len(lines) + chunk_size - 1) // chunk_size
        for i in range(total_chunks):
            start = i * chunk_size
            end = min((i+1)*chunk_size, len(lines))
            chunk_lines = lines[start:end]
            with open(f"urls_chunk_{i}.txt", "w", encoding="utf-8") as f_out:
                f_out.writelines(chunk_lines)
        print(f"分割完成,共生成 {total_chunks} 个分片")
    
    # 分割URL文件,每个分片10万条记录
    split_url_file("large_urls.txt", chunk_size=100000)

    步骤2:多机器并行下载

    将分割后的URL分片文件分别发送到不同的机器,每台机器运行以下下载脚本:

    from img2dataset import download
    
    # 替换为对应的分片文件名
    chunk_file = "urls_chunk_0.txt"
    
    params = {
        "url_list": chunk_file,
        "output_folder": f"dataset_chunk_0",
        "output_format": "webdataset",
        "thread_count": 256,
        "image_size": 512,
        "distributor": "multiprocessing",  # 使用多进程分发任务
    }
    
    download(**params)

    步骤3:合并下载结果

    所有机器下载完成后,将生成的数据集分片复制到同一目录下,即可得到完整的大规模图像数据集。

    五、img2dataset常见问题与解决方案

    在使用img2dataset的过程中,可能会遇到各种问题,以下是一些常见问题及其解决方案:

    5.1 下载速度慢

    • 原因:线程数设置过低,或网络带宽不足。
    • 解决方案:适当增加thread_count参数的值(根据机器CPU核心数调整,建议设置为CPU核心数的4-8倍);使用高速网络,或配置代理服务器。

    5.2 大量图像下载失败

    • 原因:URL无效、目标服务器拒绝访问,或下载超时。
    • 解决方案:增加retries参数的值,提高重试次数;设置合理的timeout参数;下载完成后查看日志文件,剔除无效URL。

    5.3 内存占用过高

    • 原因:并行线程数过多,导致内存资源耗尽。
    • 解决方案:降低thread_count参数的值;使用distributor="multiprocessing"参数,采用多进程替代多线程,减少内存占用。

    六、相关资源链接

    • Pypi地址:https://pypi.org/project/img2dataset
    • Github地址:https://github.com/rom1504/img2dataset
    • 官方文档地址:https://github.com/rom1504/img2dataset/blob/main/docs/README.md

    关注我,每天分享一个实用的Python自动化工具。