Dewei Zhai

2026-05-21

把 6 次 OOM 的入库任务改到 52 秒跑完

一次 Parquet 到 Postgres 的重写:从批到流,从 Python 到 Rust。

问题

我最近处理了一个数据工程问题:一批已经清洗好的 Parquet 文件在对象存储里, 需要定期装进 Postgres,给后面的 MLE 流程使用。

痛点是大文件入库。其中一张表大约 517 万行。Python 版本在 4GB 内存的 serverless 函数里跑了 6 次,6 次都 OOM。部署也慢:Python 数据栈带着 pyarrowpandas、数据库驱动等依赖,本地上传一次能慢到几十分钟。

当时有几个选择:

方案能不能救 OOM能不能解决部署慢取舍
4GB 升到 8GB可能暂时能不能最快,但只是把爆炸点往后推
Python 改 streaming不能改动小,仍然背着重依赖
Rust 重写成本更高,但一次解决运行和部署两个痛点

最后选 Rust,不是因为 Python 不能做,而是因为这个函数同时卡在内存和部署两个地方。 只修一个,另一个问题还会继续卡住迭代。

这次重写后的结果是:

指标旧 Python 版本新 Rust 版本
大表行数5,174,400 行5,174,400 行
运行结果6 次尝试全部 4GB OOM一次跑通,约 52 秒
部署 zip约 77.8MB约 2.5MB
本地上传约 40 分钟约 3.5 秒
运行依赖Python 数据栈 + 依赖包单个静态 binary

这些数字容易被理解成“Rust 比 Python 快”。我更愿意拆成两件事看:

  • 执行方式:从“一次性把整批数据放进内存”改成“边读边写”。
  • 部署单元:从一坨 Python 数据科学依赖,改成一个很小的静态 binary。

前者解决 OOM。后者解决部署慢、依赖重,只是恰好合并到了一次修改里。

旧链路为什么会爆

旧版逻辑大概是这样:

tables = [pq.read_table(file) for file in files]
df = pa.concat_tables(tables).to_pandas()
cursor.copy_from(StringIO(df.to_csv()), table_name)

问题在于,它不是“读一点,写一点”。它先把所有 Parquet 文件读成 Arrow table, 再合并成一张大表,再转成 pandas DataFrame,再把整张表转成一大段 CSV 字符串, 最后才交给 Postgres。

形象地说:

你要把一车沙子从 A 运到 B。旧做法不是用传送带,而是先把沙子倒进客厅, 再装进另一个桶,再倒成一整张塑料布,最后一次性拖到 B。

中间每一步都在制造一个新的大副本:

  • Parquet 解码后的 Arrow 数据。
  • 合并后的大 Arrow table。
  • pandas DataFrame。
  • CSV 字符串。

517 万行数据本身未必有多大。问题是这些副本会同时存在。4GB 内存不是一个仓库, 只是一个小盒子。盒子里同时塞几份大表,很快就会 OOM(Out of memory)。

批处理 vs 流式执行

看到 streaming,很容易以为这是把批处理改成流处理。

不是。

这里仍然是一个批任务。输入是一批已经确定的 Parquet 文件,输出是一张 Postgres 表。 它不是 Kafka 那种永不结束的实时流,也不需要每来一条事件就更新一次下游。

保留批处理的业务边界,用流式执行降低内存峰值。

批处理和流式执行不是一对反义词。它们回答两个问题。

批处理回答:这件事以什么为单位提交?

比如“这一批文件全部成功,目标表才算更新”。这就是批处理边界。

流式执行回答:程序内部怎么搬运数据?

比如“一次只拿一小批记录,转完就写走,不把整张表放进内存”。这就是执行方式。

这次改造后的链路可以简化成这样:

对象存储里的 Parquet 文件
  -> 一批一批解码成 Arrow RecordBatch
  -> 每批转成 CSV bytes
  -> 通过一个有容量上限的队列送出去
  -> Postgres COPY FROM STDIN 边收边写

几个术语先翻译一下:

Parquet 是列式存储格式。同一列的数据放在一起,压缩效率高,扫描部分列也快。

Arrow 可以理解成内存里的标准表格格式。很多数据工具都能读写它, 用它可以少做很多类型转换。

RecordBatch 是一小批 Arrow 数据。重点是“小批”:程序一次只拿住一段, 不再拿住整张表。

Streaming 是让数据像水一样流过程序。程序只握住当前这一小段,处理完就放走。

Backpressure 直译是“反压”。意思是:下游写数据库变慢,上游就自动慢下来。 中间队列有容量上限,不允许生产端无限堆数据。

Postgres COPY 是 Postgres 的批量导入通道。它比一行一行 INSERT 快得多, 少了大量 SQL 解析、网络往返和每行协议开销。批量装表时,COPY 通常应该是默认选择。

变化不在某一行代码,而在整个姿势:

旧链路是“先把所有东西变成一个巨大对象,再写入数据库”。 新链路是“让数据保持流动,下游吃多少,上游喂多少”。

Streaming 会不会让失败重试更难?

会。它要求你更认真地设计边界。

批处理的好处是边界很清楚:整批数据准备好,然后一次性写入。失败了就整批重来。 缺点也明显:如果“准备好”意味着把整批数据都放进内存,内存峰值会很高。

Streaming 的好处是内存稳定,但失败可能发生在中间:已经读了一部分,写了一部分, 然后网络断了、函数超时了、数据库连接没了。没有一致性边界,就可能留下半张表、 重复写,或者让下游读到不完整数据。

所以 streaming 不是“边读边写就完事了”。它需要几件配套设计:

  • 事务边界。 这次用的是 BEGIN -> TRUNCATE -> COPY -> COMMIT。如果 COPY 中途失败, transaction 回滚,旧表不会被半成品替换。
  • 幂等入口。 同一个表、同一批输入重复跑,结果应该一样。失败后可以安全重试。
  • 明确的失败点。 文件过大、schema 不匹配、网络失败、COPY 失败,都要显式报错, 而不是静默跳过。
  • 必要时引入 staging table。 对更复杂的场景,可以先 COPY 到临时表或 staging table, 校验行数和质量后再原子切换到目标表。
  • 不要把有限批任务误写成无限流系统。 这里的 streaming 是 bounded streaming: 输入有限、输出有限、提交边界仍然是整批。

所以我更喜欢说“流式执行”,而不是只说“流处理”。这能避免把一个 bounded batch job 误解成实时流系统。

Python vs Rust

另一个常见问题是:难道 Python 就不能 streaming 吗?

能。

如果只看 OOM 这个问题,Python 也有一条很合理的修法:用 pyarrow 的 batch reader 一批一批读 Parquet,再用 psycopg2asyncpg 的 COPY 通道一批一批写入 Postgres。 这样也能避免 to_pandas()to_csv() 把整张表同时放进内存。

所以结论不是“Python 不适合数据工程”。Python 很适合。

这里的判断点是:这个函数除了 OOM,还有部署包大、上传慢、serverless 运行时依赖重的问题。 Python streaming 可以解决内存峰值,但不会自动解决 pyarrow + pandas 带来的部署成本。

Rust 在这里的价值,是把两件事一起解决了:

  • 执行方式变成 streaming。
  • 部署单元变成一个很小的 binary。

换句话说:

  • 如果你的痛点只是内存峰值,先改 Python streaming 很可能是最经济的解。
  • 如果你的痛点同时包括内存、依赖体积、部署速度、serverless 冷启动和运行时边界, Rust 才开始变得值得。

不要把“批 vs 流”和“Python vs Rust”搅在一起。前者是数据执行模型, 后者是运行时和交付模型。

Rust 在这里到底贡献了什么

Rust 在这个案例里的价值更具体。

第一,它让部署单元变小。Python 版本需要带上沉重的数据处理依赖。Rust 版本可以编译成一个 Linux 上直接运行的 binary。这次验证里的实际数字是:部署 zip 从约 77.8MB 降到约 2.5MB, 本地上传从约 40 分钟降到几秒。最终运行的 bootstrap binary 约 6.5MB。

第二,它让内存边界更清楚。Rust 的所有权模型会迫使你想清楚:这块数据是谁持有, 什么时候释放,能不能被复制。对很多业务逻辑来说这会显得麻烦;但对数据搬运链路, 它反而帮你把“隐形的大副本”暴露出来。

第三,它的异步生态适合这类管道。读取对象存储、转换数据、写 Postgres,本质上是 IO 和 CPU 混在一起的生产-消费模型。Rust 里的 tokio、channel、streaming sink 这些工具,可以把这个模型表达得很直接。

速度不是来自“Rust”这个名字,而是来自流式执行。

如果把 Rust 写成“先读全量,再拼全量,再写全量”,它一样会把内存吃爆。 如果把 Python 写成 streaming,它也可以解决很大一部分问题。 这次选择 Rust,是因为它在这个环境里顺手解决了部署包和运行时依赖的问题。

一些具体踩坑

内部实现不展开太多,只说几个有普遍意义的坑。

一,不要假设云函数的调用方式像普通 HTTP 客户端。

这个 custom runtime 收到的请求没有正常设置 Content-Type: application/json。 如果直接用框架里的 JSON extractor,会得到 415。最后改成读取 raw bytes,再手动 parse JSON。

serverless 平台常常有自己的运行时协议。框架默认值不一定适配。

二,自签云 API 要有 ground truth 测试。

Rust 里没有我愿意直接押注的官方 OSS SDK,所以这次自己实现了 OSS V4 签名。 这类代码最怕“看起来对,线上 403”。解决办法不是靠信心,而是用 Python 官方/成熟 SDK 生成一组签名样本,再让 Rust 单测逐字节对齐。

自己实现协议可以,但必须先找一个可信实现做锚点。

三,部署体验是架构的一部分。

很多团队讨论性能时只看运行时间。但如果每次部署都要等几十分钟,工程师就会减少部署、 减少验证、减少小步迭代。最后系统会变得更脆。

把部署包变小,不只是省时间,也是让团队重新敢于频繁验证。

可以带走的经验

这次值得复用的不是“用 Rust 重写 Python”,而是几个判断方式。

第一,遇到 OOM,先画出数据在内存里的形状。不要只看输入文件大小,要看中间会产生几份副本。 很多 OOM 不是数据本身离谱,而是程序把同一批数据换了三种形态同时拿在手上。

第二,数据搬运任务优先考虑流式执行。业务语义允许的话,就让数据边读边写, 用 batch size 和 backpressure 控制内存。

第三,保留批处理的提交边界。流式执行不等于放弃事务、一致性和幂等。对有限输入来说, 最舒服的形状往往是 bounded streaming batch。

第四,入 Postgres 时优先想 COPY,不要默认一行行 INSERT。INSERT 适合业务写入, COPY 适合批量装载。

第五,语言选择要和部署环境一起看。问题只是内存峰值,Python streaming 可能足够。 如果还卡在依赖体积、冷启动、部署速度、运行时边界,编译型语言的收益会更明显。

第六,优化的目标不是“更快”,而是“更可解释”。52 秒跑完很开心; 但更重要的是,你知道它为什么不会爆、什么时候会慢、哪里可以限流、哪里可以重试。

这是一次性能修复里最值得留下来的东西。


想聊聊?和我的助理辩一辩,或者给我留个言