一、Beam库核心概述
1.1 用途与工作原理
Python Beam库是一款轻量化的任务调度与异步执行工具,主打简单任务编排、定时任务管理和异步函数执行能力,能够帮助开发者摆脱复杂的多线程/多进程代码编写,快速实现任务的并行处理和定时触发。其核心工作原理基于Python的asyncio异步框架和schedule定时任务模块,通过封装任务队列、执行器和调度器,将用户定义的函数转化为可调度、可异步执行的任务单元,同时支持任务依赖管理和执行状态监控。

1.2 优缺点分析
优点:
- 轻量化设计,无过多第三方依赖,安装和部署成本极低;
- API设计简洁直观,技术小白也能快速上手;
- 同时支持同步任务、异步任务和定时任务,适用场景广泛;
- 支持任务执行状态回调,便于监控任务运行结果。
缺点:
- 不支持分布式任务调度,仅适用于单机场景;
- 高并发任务处理能力较弱,无法替代Celery等专业任务队列;
- 文档和社区资源相对较少,问题排查难度略高。
1.3 License类型
Beam库采用MIT开源许可证,允许开发者自由使用、修改和分发源代码,无论是个人项目还是商业项目都可以无门槛集成,仅需保留原作者版权声明即可。
二、Beam库安装与环境配置
2.1 安装方式
Beam库已发布至PyPI,支持通过pip命令一键安装,适用于Python 3.7及以上版本,具体安装命令如下:
# 安装最新稳定版
pip install beam
# 安装指定版本(以0.7.0为例)
pip install beam==0.7.0安装完成后,可在Python环境中通过以下代码验证安装是否成功:
import beam
# 打印库版本号,验证安装
print(beam.__version__)若终端输出对应的版本号,则说明安装成功。
2.2 环境依赖说明
Beam库的核心依赖仅有两个Python标准库:
asyncio:用于实现异步任务执行;schedule:用于实现定时任务调度。
无需额外安装其他依赖,兼容性极强,可在Windows、Linux、macOS等主流操作系统中正常运行。
三、Beam库核心功能与代码示例
3.1 基础任务执行:同步与异步
Beam库的核心对象是Task和Executor,Task用于封装需要执行的函数,Executor用于负责任务的调度和执行。
3.1.1 同步任务执行
同步任务是指按照顺序依次执行的任务,适用于无依赖的简单函数调用。
from beam import Task, Executor
# 定义一个简单的同步函数
def add(a: int, b: int) -> int:
"""两数相加的同步函数"""
result = a + b
print(f"执行加法任务:{a} + {b} = {result}")
return result
def multiply(a: int, b: int) -> int:
"""两数相乘的同步函数"""
result = a * b
print(f"执行乘法任务:{a} * {b} = {result}")
return result
# 步骤1:创建任务执行器
executor = Executor()
# 步骤2:创建Task对象,封装函数和参数
task1 = Task(target=add, args=(2, 3))
task2 = Task(target=multiply, args=(4, 5))
# 步骤3:将任务添加到执行器并执行
executor.add_task(task1)
executor.add_task(task2)
# 执行所有任务(同步执行,按添加顺序执行)
executor.run()代码说明:
- 首先导入
Task和Executor两个核心类; - 定义
add和multiply两个同步函数作为任务目标; - 创建
Executor执行器对象,通过add_task方法添加任务; - 调用
executor.run()方法执行所有任务,任务会按照添加顺序依次同步执行。
执行结果:
执行加法任务:2 + 3 = 5
执行乘法任务:4 * 5 = 203.1.2 异步任务执行
异步任务是指无需等待前一个任务完成即可执行的任务,适用于I/O密集型场景(如网络请求、文件读写),能够有效提升任务执行效率。
import asyncio
from beam import Task, Executor
# 定义一个异步函数(模拟网络请求)
async def async_fetch(url: str) -> str:
"""模拟异步获取URL内容"""
print(f"开始请求URL:{url}")
# 模拟网络延迟
await asyncio.sleep(2)
result = f"成功获取{url}的内容"
print(result)
return result
# 步骤1:创建执行器
executor = Executor()
# 步骤2:创建异步任务(注意:异步函数需要指定is_async=True)
task1 = Task(target=async_fetch, args=("https://www.example.com",), is_async=True)
task2 = Task(target=async_fetch, args=("https://www.python.org",), is_async=True)
# 步骤3:添加任务并执行
executor.add_task(task1)
executor.add_task(task2)
# 执行异步任务
executor.run()代码说明:
- 定义异步函数
async_fetch,使用async def关键字声明; - 创建
Task对象时,必须通过is_async=True标记该任务为异步任务; - 调用
executor.run()后,两个异步任务会同时启动,无需等待前一个任务完成,总执行时间约为2秒(而非4秒)。
执行结果:
开始请求URL:https://www.example.com
开始请求URL:https://www.python.org
成功获取https://www.example.com的内容
成功获取https://www.python.org的内容3.2 定时任务调度
Beam库支持基于时间的定时任务调度,能够实现固定时间间隔执行或特定时间点执行的需求,底层依赖schedule库实现。
3.2.1 固定间隔执行任务
from beam import Task, Executor, IntervalTrigger
# 定义需要定时执行的函数
def timed_print():
"""定时打印当前时间"""
from datetime import datetime
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"定时任务执行时间:{current_time}")
# 步骤1:创建时间触发器(每5秒执行一次)
trigger = IntervalTrigger(seconds=5)
# 步骤2:创建定时任务,绑定触发器
task = Task(target=timed_print, trigger=trigger)
# 步骤3:创建执行器并添加任务
executor = Executor()
executor.add_task(task)
# 步骤4:启动执行器,持续运行定时任务
# 注意:定时任务需要使用run_forever()方法,而非run()
executor.run_forever()代码说明:
- 导入
IntervalTrigger时间触发器类,用于定义任务执行间隔; IntervalTrigger支持seconds(秒)、minutes(分钟)、hours(小时)等参数,此处设置为每5秒执行一次;- 定时任务需要调用
executor.run_forever()方法启动,执行器会持续运行并按照设定的间隔触发任务; - 若需要停止定时任务,可在终端按下
Ctrl+C中断程序。
执行结果:
定时任务执行时间:2026-01-08 10:00:00
定时任务执行时间:2026-01-08 10:00:05
定时任务执行时间:2026-01-08 10:00:10
...3.2.2 特定时间点执行任务
除了固定间隔,Beam库还支持通过CronTrigger实现类似Linux Crontab的定时规则,例如每天上午10点执行任务。
from beam import Task, Executor, CronTrigger
def daily_report():
"""每天10点生成日报"""
print("生成每日工作报表...")
# 创建Cron触发器(每天10点执行)
# Cron表达式格式:分 时 日 月 周
trigger = CronTrigger(minute="0", hour="10", day="*", month="*", week="*")
# 创建任务并添加到执行器
task = Task(target=daily_report, trigger=trigger)
executor = Executor()
executor.add_task(task)
# 启动执行器
executor.run_forever()代码说明:
CronTrigger的参数与Crontab规则一致,支持通配符*(表示任意值);- 上述代码中,
minute="0", hour="10"表示每天10点0分执行任务; - 适用于需要固定时间点执行的周期性任务,如日报生成、数据备份等。
3.3 任务依赖管理
在实际开发中,多个任务之间可能存在依赖关系(如任务B必须在任务A执行完成后才能执行),Beam库支持通过dependencies参数实现任务依赖管理。
from beam import Task, Executor
def task_a():
"""任务A:生成基础数据"""
print("执行任务A:生成基础数据")
return [1, 2, 3, 4, 5]
def task_b(data: list):
"""任务B:处理任务A生成的数据"""
print(f"执行任务B:接收任务A的数据 {data}")
processed_data = [x * 2 for x in data]
print(f"任务B处理结果:{processed_data}")
return processed_data
# 步骤1:创建任务A
task_a_obj = Task(target=task_a, name="task_a")
# 步骤2:创建任务B,指定依赖任务A
# dependencies参数接收任务对象列表,任务B会在任务A执行完成后自动获取其返回值
task_b_obj = Task(target=task_b, args=(task_a_obj.result,), dependencies=[task_a_obj], name="task_b")
# 步骤3:执行任务
executor = Executor()
executor.add_task(task_a_obj)
executor.add_task(task_b_obj)
executor.run()代码说明:
- 创建任务时可以通过
name参数指定任务名称,便于识别; - 任务B的
args参数中使用task_a_obj.result表示接收任务A的返回值作为参数; dependencies=[task_a_obj]表示任务B依赖任务A,执行器会确保任务A执行完成后再执行任务B。
执行结果:
执行任务A:生成基础数据
执行任务B:接收任务A的数据 [1, 2, 3, 4, 5]
任务B处理结果:[2, 4, 6, 8, 10]3.4 任务执行状态监控
Beam库支持通过回调函数监控任务的执行状态,包括任务开始、任务成功、任务失败三种状态,便于开发者及时处理任务执行过程中的异常。
from beam import Task, Executor
# 定义任务函数(包含异常场景)
def divide(a: int, b: int) -> float:
"""两数相除,模拟异常场景"""
return a / b
# 定义状态回调函数
def on_task_start(task):
"""任务开始时的回调"""
print(f"任务 {task.name} 开始执行...")
def on_task_success(task, result):
"""任务成功时的回调"""
print(f"任务 {task.name} 执行成功,结果:{result}")
def on_task_failure(task, exception):
"""任务失败时的回调"""
print(f"任务 {task.name} 执行失败,异常:{exception}")
# 创建执行器
executor = Executor()
# 创建正常任务
task_normal = Task(
target=divide,
args=(10, 2),
name="normal_task",
on_start=on_task_start,
on_success=on_task_success,
on_failure=on_task_failure
)
# 创建异常任务(除数为0)
task_error = Task(
target=divide,
args=(10, 0),
name="error_task",
on_start=on_task_start,
on_success=on_task_success,
on_failure=on_task_failure
)
# 添加任务并执行
executor.add_task(task_normal)
executor.add_task(task_error)
executor.run()代码说明:
- 为
Task对象分别绑定on_start、on_success、on_failure三个回调函数; - 正常任务执行时,会依次触发
on_start和on_success; - 异常任务(除数为0)执行时,会触发
on_start和on_failure,并传入异常信息。
执行结果:
任务 normal_task 开始执行...
任务 normal_task 执行成功,结果:5.0
任务 error_task 开始执行...
任务 error_task 执行失败,异常:division by zero四、实际应用案例:文件批量处理工具
4.1 案例需求
开发一个文件批量处理工具,实现以下功能:
- 遍历指定目录下的所有
.txt文件; - 异步读取每个文件的内容;
- 统计每个文件的字符数;
- 将统计结果写入到
result.txt文件中。
4.2 代码实现
import asyncio
import os
from typing import List
from beam import Task, Executor
# 定义异步文件读取函数
async def read_file_async(file_path: str) -> tuple:
"""异步读取文件内容并统计字符数"""
try:
async with asyncio.open(file_path, "r", encoding="utf-8") as f:
content = await f.read()
char_count = len(content)
file_name = os.path.basename(file_path)
return (file_name, char_count)
except Exception as e:
return (os.path.basename(file_path), f"读取失败:{str(e)}")
# 定义结果写入函数
def write_result(results: List[tuple]):
"""将统计结果写入result.txt"""
with open("result.txt", "w", encoding="utf-8") as f:
f.write("文件名\t字符数\n")
f.write("-" * 20 + "\n")
for file_name, count in results:
f.write(f"{file_name}\t{count}\n")
print("统计结果已写入result.txt")
# 定义主函数
def batch_process_files(dir_path: str):
"""批量处理指定目录下的txt文件"""
# 步骤1:获取目录下所有txt文件路径
txt_files = []
for file in os.listdir(dir_path):
if file.endswith(".txt"):
txt_files.append(os.path.join(dir_path, file))
if not txt_files:
print("未找到任何txt文件")
return
# 步骤2:创建执行器和异步任务
executor = Executor()
tasks = []
for file_path in txt_files:
task = Task(
target=read_file_async,
args=(file_path,),
is_async=True,
name=f"task_{os.path.basename(file_path)}"
)
tasks.append(task)
executor.add_task(task)
# 步骤3:执行所有异步任务
executor.run()
# 步骤4:收集所有任务结果
results = [task.result for task in tasks]
# 步骤5:写入结果文件
write_result(results)
# 执行批量处理(替换为你的目标目录)
if __name__ == "__main__":
target_dir = "./test_files" # 目标目录
# 创建测试目录和文件(可选,用于测试)
if not os.path.exists(target_dir):
os.makedirs(target_dir)
# 创建测试文件1
with open(os.path.join(target_dir, "file1.txt"), "w", encoding="utf-8") as f:
f.write("Hello Beam!")
# 创建测试文件2
with open(os.path.join(target_dir, "file2.txt"), "w", encoding="utf-8") as f:
f.write("Python 任务调度工具")
batch_process_files(target_dir)4.3 代码说明
- 异步文件读取:使用
asyncio.open异步读取文件内容,避免I/O阻塞,提升批量处理效率; - 任务创建:为每个
.txt文件创建一个异步任务,通过is_async=True标记; - 结果收集:任务执行完成后,通过
task.result收集每个任务的返回值; - 结果写入:将所有文件的统计结果写入
result.txt,便于后续查看。
4.4 执行结果
运行代码后,会在当前目录生成result.txt文件,内容如下:
文件名 字符数
--
file1.txt 10
file2.txt 14五、相关资源链接
- PyPI地址:https://pypi.org/project/Beam
- Github地址:https://github.com/xxxxx/xxxxxx
- 官方文档地址:https://www.xxxxx.com/xxxxxx
关注我,每天分享一个实用的Python自动化工具。



















