2026-05-21
把 6 次 OOM 的入库任务改到 52 秒跑完
一次 Parquet 到 Postgres 的重写:从批到流,从 Python 到 Rust。
问题
我最近处理了一个数据工程问题:一批已经清洗好的 Parquet 文件在对象存储里, 需要定期装进 Postgres,给后面的 MLE 流程使用。
痛点是大文件入库。其中一张表大约 517 万行。Python 版本在 4GB 内存的
serverless 函数里跑了 6 次,6 次都 OOM。部署也慢:Python 数据栈带着
pyarrow、pandas、数据库驱动等依赖,本地上传一次能慢到几十分钟。
当时有几个选择:
| 方案 | 能不能救 OOM | 能不能解决部署慢 | 取舍 |
|---|---|---|---|
| 4GB 升到 8GB | 可能暂时能 | 不能 | 最快,但只是把爆炸点往后推 |
| Python 改 streaming | 能 | 不能 | 改动小,仍然背着重依赖 |
| Rust 重写 | 能 | 能 | 成本更高,但一次解决运行和部署两个痛点 |
最后选 Rust,不是因为 Python 不能做,而是因为这个函数同时卡在内存和部署两个地方。 只修一个,另一个问题还会继续卡住迭代。
这次重写后的结果是:
| 指标 | 旧 Python 版本 | 新 Rust 版本 |
|---|---|---|
| 大表行数 | 5,174,400 行 | 5,174,400 行 |
| 运行结果 | 6 次尝试全部 4GB OOM | 一次跑通,约 52 秒 |
| 部署 zip | 约 77.8MB | 约 2.5MB |
| 本地上传 | 约 40 分钟 | 约 3.5 秒 |
| 运行依赖 | Python 数据栈 + 依赖包 | 单个静态 binary |
这些数字容易被理解成“Rust 比 Python 快”。我更愿意拆成两件事看:
- 执行方式:从“一次性把整批数据放进内存”改成“边读边写”。
- 部署单元:从一坨 Python 数据科学依赖,改成一个很小的静态 binary。
前者解决 OOM。后者解决部署慢、依赖重,只是恰好合并到了一次修改里。
旧链路为什么会爆
旧版逻辑大概是这样:
tables = [pq.read_table(file) for file in files]
df = pa.concat_tables(tables).to_pandas()
cursor.copy_from(StringIO(df.to_csv()), table_name)
问题在于,它不是“读一点,写一点”。它先把所有 Parquet 文件读成 Arrow table, 再合并成一张大表,再转成 pandas DataFrame,再把整张表转成一大段 CSV 字符串, 最后才交给 Postgres。
形象地说:
你要把一车沙子从 A 运到 B。旧做法不是用传送带,而是先把沙子倒进客厅, 再装进另一个桶,再倒成一整张塑料布,最后一次性拖到 B。
中间每一步都在制造一个新的大副本:
- Parquet 解码后的 Arrow 数据。
- 合并后的大 Arrow table。
- pandas DataFrame。
- CSV 字符串。
517 万行数据本身未必有多大。问题是这些副本会同时存在。4GB 内存不是一个仓库, 只是一个小盒子。盒子里同时塞几份大表,很快就会 OOM(Out of memory)。
批处理 vs 流式执行
看到 streaming,很容易以为这是把批处理改成流处理。
不是。
这里仍然是一个批任务。输入是一批已经确定的 Parquet 文件,输出是一张 Postgres 表。 它不是 Kafka 那种永不结束的实时流,也不需要每来一条事件就更新一次下游。
保留批处理的业务边界,用流式执行降低内存峰值。
批处理和流式执行不是一对反义词。它们回答两个问题。
批处理回答:这件事以什么为单位提交?
比如“这一批文件全部成功,目标表才算更新”。这就是批处理边界。
流式执行回答:程序内部怎么搬运数据?
比如“一次只拿一小批记录,转完就写走,不把整张表放进内存”。这就是执行方式。
这次改造后的链路可以简化成这样:
对象存储里的 Parquet 文件
-> 一批一批解码成 Arrow RecordBatch
-> 每批转成 CSV bytes
-> 通过一个有容量上限的队列送出去
-> Postgres COPY FROM STDIN 边收边写
几个术语先翻译一下:
Parquet 是列式存储格式。同一列的数据放在一起,压缩效率高,扫描部分列也快。
Arrow 可以理解成内存里的标准表格格式。很多数据工具都能读写它, 用它可以少做很多类型转换。
RecordBatch 是一小批 Arrow 数据。重点是“小批”:程序一次只拿住一段, 不再拿住整张表。
Streaming 是让数据像水一样流过程序。程序只握住当前这一小段,处理完就放走。
Backpressure 直译是“反压”。意思是:下游写数据库变慢,上游就自动慢下来。 中间队列有容量上限,不允许生产端无限堆数据。
Postgres COPY 是 Postgres 的批量导入通道。它比一行一行 INSERT 快得多,
少了大量 SQL 解析、网络往返和每行协议开销。批量装表时,COPY 通常应该是默认选择。
变化不在某一行代码,而在整个姿势:
旧链路是“先把所有东西变成一个巨大对象,再写入数据库”。 新链路是“让数据保持流动,下游吃多少,上游喂多少”。
Streaming 会不会让失败重试更难?
会。它要求你更认真地设计边界。
批处理的好处是边界很清楚:整批数据准备好,然后一次性写入。失败了就整批重来。 缺点也明显:如果“准备好”意味着把整批数据都放进内存,内存峰值会很高。
Streaming 的好处是内存稳定,但失败可能发生在中间:已经读了一部分,写了一部分, 然后网络断了、函数超时了、数据库连接没了。没有一致性边界,就可能留下半张表、 重复写,或者让下游读到不完整数据。
所以 streaming 不是“边读边写就完事了”。它需要几件配套设计:
- 事务边界。 这次用的是
BEGIN -> TRUNCATE -> COPY -> COMMIT。如果 COPY 中途失败, transaction 回滚,旧表不会被半成品替换。 - 幂等入口。 同一个表、同一批输入重复跑,结果应该一样。失败后可以安全重试。
- 明确的失败点。 文件过大、schema 不匹配、网络失败、COPY 失败,都要显式报错, 而不是静默跳过。
- 必要时引入 staging table。 对更复杂的场景,可以先 COPY 到临时表或 staging table, 校验行数和质量后再原子切换到目标表。
- 不要把有限批任务误写成无限流系统。 这里的 streaming 是 bounded streaming: 输入有限、输出有限、提交边界仍然是整批。
所以我更喜欢说“流式执行”,而不是只说“流处理”。这能避免把一个 bounded batch job 误解成实时流系统。
Python vs Rust
另一个常见问题是:难道 Python 就不能 streaming 吗?
能。
如果只看 OOM 这个问题,Python 也有一条很合理的修法:用 pyarrow 的 batch reader
一批一批读 Parquet,再用 psycopg2 或 asyncpg 的 COPY 通道一批一批写入 Postgres。
这样也能避免 to_pandas() 和 to_csv() 把整张表同时放进内存。
所以结论不是“Python 不适合数据工程”。Python 很适合。
这里的判断点是:这个函数除了 OOM,还有部署包大、上传慢、serverless 运行时依赖重的问题。
Python streaming 可以解决内存峰值,但不会自动解决 pyarrow + pandas 带来的部署成本。
Rust 在这里的价值,是把两件事一起解决了:
- 执行方式变成 streaming。
- 部署单元变成一个很小的 binary。
换句话说:
- 如果你的痛点只是内存峰值,先改 Python streaming 很可能是最经济的解。
- 如果你的痛点同时包括内存、依赖体积、部署速度、serverless 冷启动和运行时边界, Rust 才开始变得值得。
不要把“批 vs 流”和“Python vs Rust”搅在一起。前者是数据执行模型, 后者是运行时和交付模型。
Rust 在这里到底贡献了什么
Rust 在这个案例里的价值更具体。
第一,它让部署单元变小。Python 版本需要带上沉重的数据处理依赖。Rust 版本可以编译成一个 Linux 上直接运行的 binary。这次验证里的实际数字是:部署 zip 从约 77.8MB 降到约 2.5MB, 本地上传从约 40 分钟降到几秒。最终运行的 bootstrap binary 约 6.5MB。
第二,它让内存边界更清楚。Rust 的所有权模型会迫使你想清楚:这块数据是谁持有, 什么时候释放,能不能被复制。对很多业务逻辑来说这会显得麻烦;但对数据搬运链路, 它反而帮你把“隐形的大副本”暴露出来。
第三,它的异步生态适合这类管道。读取对象存储、转换数据、写 Postgres,本质上是 IO
和 CPU 混在一起的生产-消费模型。Rust 里的 tokio、channel、streaming sink
这些工具,可以把这个模型表达得很直接。
速度不是来自“Rust”这个名字,而是来自流式执行。
如果把 Rust 写成“先读全量,再拼全量,再写全量”,它一样会把内存吃爆。 如果把 Python 写成 streaming,它也可以解决很大一部分问题。 这次选择 Rust,是因为它在这个环境里顺手解决了部署包和运行时依赖的问题。
一些具体踩坑
内部实现不展开太多,只说几个有普遍意义的坑。
一,不要假设云函数的调用方式像普通 HTTP 客户端。
这个 custom runtime 收到的请求没有正常设置 Content-Type: application/json。
如果直接用框架里的 JSON extractor,会得到 415。最后改成读取 raw bytes,再手动 parse JSON。
serverless 平台常常有自己的运行时协议。框架默认值不一定适配。
二,自签云 API 要有 ground truth 测试。
Rust 里没有我愿意直接押注的官方 OSS SDK,所以这次自己实现了 OSS V4 签名。 这类代码最怕“看起来对,线上 403”。解决办法不是靠信心,而是用 Python 官方/成熟 SDK 生成一组签名样本,再让 Rust 单测逐字节对齐。
自己实现协议可以,但必须先找一个可信实现做锚点。
三,部署体验是架构的一部分。
很多团队讨论性能时只看运行时间。但如果每次部署都要等几十分钟,工程师就会减少部署、 减少验证、减少小步迭代。最后系统会变得更脆。
把部署包变小,不只是省时间,也是让团队重新敢于频繁验证。
可以带走的经验
这次值得复用的不是“用 Rust 重写 Python”,而是几个判断方式。
第一,遇到 OOM,先画出数据在内存里的形状。不要只看输入文件大小,要看中间会产生几份副本。 很多 OOM 不是数据本身离谱,而是程序把同一批数据换了三种形态同时拿在手上。
第二,数据搬运任务优先考虑流式执行。业务语义允许的话,就让数据边读边写, 用 batch size 和 backpressure 控制内存。
第三,保留批处理的提交边界。流式执行不等于放弃事务、一致性和幂等。对有限输入来说, 最舒服的形状往往是 bounded streaming batch。
第四,入 Postgres 时优先想 COPY,不要默认一行行 INSERT。INSERT 适合业务写入, COPY 适合批量装载。
第五,语言选择要和部署环境一起看。问题只是内存峰值,Python streaming 可能足够。 如果还卡在依赖体积、冷启动、部署速度、运行时边界,编译型语言的收益会更明显。
第六,优化的目标不是“更快”,而是“更可解释”。52 秒跑完很开心; 但更重要的是,你知道它为什么不会爆、什么时候会慢、哪里可以限流、哪里可以重试。
这是一次性能修复里最值得留下来的东西。