Transactional AI v0.2:具备全栈可观测性的生产级 AI 工作流框架

作者
  • avatar
    姓名
    Nino
    职业
    Senior Tech Editor

构建可靠的 AI 代理(AI Agents)是一项公认的挑战。本周早些时候,Transactional AI v0.1 的发布旨在解决一个核心痛点:AI 代理在执行过程中由于部分失败导致系统处于“半完成”的损坏状态。虽然初始反馈非常积极,但专业开发者很快提出了企业级需求:如何跨多个 Worker 运行?能否使用 PostgreSQL 代替文件存储?如何监控生产环境中的失败?

今天,Transactional AI v0.2 正式发布,通过引入分布式锁、PostgreSQL 支持、重试策略和全方位的可观测性,回答了这些问题。在使用 n1n.ai 等高性能 API 聚合平台时,系统的稳定性直接决定了业务的成败。如果你的代理在使用 OpenAI o3 生成报告、通过 Stripe 收款后,却在发送通知时失败,系统就会陷入不一致。Transactional AI 确保在这种情况下,已执行的步骤能够自动回滚,保持数据完整性。

分布式环境下的安全性:分布式锁

在分布式系统中,竞态条件(Race Condition)是最大的敌人。假设两个 Worker 同时收到了同一个事务 ID,如果没有协调机制,它们可能会同时执行逻辑,导致重复扣费或数据污染。

Transactional AI v0.2 引入了基于 Redis 的分布式锁机制。通过使用 Redis 的 SET NX PX 原子指令,框架确保在任何给定时间内,只有一个 Worker 能够处理特定的事务。这对于在 Kubernetes 环境中水平扩展的应用至关重要。

import { Transaction, RedisStorage, RedisLock } from 'transactional-ai'

const connection = 'redis://localhost:6379'
const storage = new RedisStorage(connection)
const lock = new RedisLock(connection) // 🔒 原子分布式锁

const tx = new Transaction('order-123', storage, {
  lock: lock,
  lockTTL: 30000, // 如果 Worker 崩溃,30秒后自动释放锁
})

await tx.run()

从文件到数据库:PostgreSQL 存储适配器

虽然文件存储在本地开发时非常方便,但生产环境需要关系型数据库的 ACID 保证。新版本引入了 PostgresStorage 适配器,允许开发者维护完整的审计日志。无论是 Claude 3.5 Sonnet 的调用记录,还是复杂的 RAG(检索增强生成)流程,所有状态都持久化在数据库中。

数据库模式设计充分考虑了性能,将执行历史存储在 JSONB 列中,方便开发者通过 SQL 轻松查询失败的事务或分析步骤耗时。

CREATE TABLE transactions (
  id VARCHAR(255) PRIMARY KEY,
  status VARCHAR(50) NOT NULL,
  step_stack JSONB NOT NULL,  -- 完整的执行历史堆栈
  created_at TIMESTAMPTZ DEFAULT NOW(),
  updated_at TIMESTAMPTZ DEFAULT NOW()
);

应对不稳定的 API:智能重试策略

LLM API 偶尔会出现波动。无论是提供商的 500 错误还是速率限制,工作流都不应立即崩溃。通过 n1n.ai 接入稳定的 API 资源是第一步,而本地的重试逻辑则是双重保险。

在 v0.2 中,你可以为每个步骤定义精细的重试策略。例如,在调用 DeepSeek-V3 时,如果遇到临时网络问题,系统会自动进行指数退避重试,只有在多次尝试均失败后才会触发回滚。

await t.step('call-openai', {
  do: async () => await openai.createCompletion({...}),
  undo: async () => { /* 撤销逻辑 */ },
  retry: {
    attempts: 3,      // 最多重试 3 次
    backoffMs: 1000   // 每次重试间隔 1 秒
  }
});

全栈可观测性:12 个生命周期钩子

监控是生产系统的生命线。v0.2.1 引入了 12 个事件钩子,涵盖了从 onTransactionStartonStepTimeout 的所有关键节点。这些钩子采用了“安全设计”——即使事件处理器本身报错,也不会影响主事务的执行。

你可以轻松地将这些数据对接到 Datadog、Prometheus 或 Slack。例如,监控 OpenAI 接口的响应时间,并在超时时发送告警。

事件名称描述应用场景
onStepComplete步骤成功完成时触发性能指标统计
onStepFailed步骤失败时触发错误告警通知
onStepTimeout步骤超时时触发SLA 合规监控
onTransactionComplete整个 Saga 流程结束业务完成度分析

防止僵尸进程:步骤级超时保护

在处理 AI 请求时,最糟糕的情况是某个 API 调用无限期挂起。Transactional AI 现在支持为每个步骤设置超时时间。如果你通过 n1n.ai 调用的服务在 30 秒内没有响应,框架将强制终止该操作,触发超时事件并启动补偿(Undo)流程。

这对于成本控制和资源释放至关重要。长时间运行的 AI 进程会占用大量的内存和 Worker 槽位,通过强制超时,你可以确保系统的高吞吐量和可预测性。

测试驱动开发:内存存储与模拟锁

为了支持高效的 CI/CD 流水线,v0.2 提供了 MemoryStorageMockLock。这意味着在运行自动化测试时,你不需要启动 Redis 或 Postgres 实例。目前测试用例已从 11 个增加到 21 个,覆盖了并发锁请求、嵌套失败等极端场景。

性能基准测试

我们在 10,000 个并发事务的压力下测试了不同的存储组合:

配置方案吞吐量延迟 (p95)
FileStorage + NoOpLock500 tx/s20ms
RedisStorage + NoOpLock2,500 tx/s8ms
RedisStorage + RedisLock2,000 tx/s12ms
PostgresStorage + RedisLock1,200 tx/s18ms

对于大多数企业级应用,PostgresStorage 配合 RedisLock 是最佳选择,兼顾了数据一致性和高并发安全性。

总结与展望

Transactional AI v0.2 是 Node.js AI 生态系统的一个重要里程碑。通过将 Saga 模式引入 AI 代理开发,我们让开发者能够构建不仅智能、而且强健且可观测的系统。无论你是在使用 LangChain 进行编排,还是通过 n1n.ai 构建自定义代理,这个库都能为你的生产环境提供必要的安全保障。

立即在 n1n.ai 获取免费 API 密钥。