Skip to content

Instantly share code, notes, and snippets.

@failable
Created November 3, 2025 03:11
Show Gist options
  • Select an option

  • Save failable/2f046a2ef35698a7d0fc36e95195fcd8 to your computer and use it in GitHub Desktop.

Select an option

Save failable/2f046a2ef35698a7d0fc36e95195fcd8 to your computer and use it in GitHub Desktop.
spec

FastestAI Dataflow 系统规范

概述

FastestAI Dataflow 是一个基于 DataFrame 范式的业务数据工作流自动化系统,属于 MaybeAI 平台的核心组件。该系统实现了完整的"获取 → 分析 → 行动"业务循环,允许用户用自然语言描述数据需求,自动生成可执行的数据处理工作流。

核心理念

字段模板模式(Field Template Pattern)

Dataflow 的核心是表格构建工作流,遵循字段模板模式:

  • 基于用户意图确定最终结果表结构
  • 识别需要填充的列(字段)
  • 使用 map 操作系统性地逐列逐行填充结果表
  • 每个 map 操作使用 MCP 工具和字段模板工具填充特定字段

数据类型系统

基础数据类型

  • Scalar: 单一值(字符串、数字、布尔值等)
  • Series: 一维数据数组(类似 pandas Series)
  • DataFrame: 二维表格数据,具有命名列

数据类型标记法

  • scalar:data_name - 单一值
  • series:data_name - 一维数据
  • dataframe:data_name - 表格数据

数据格式化语法

使用 ${} 语法在字符串中格式化数据:

  • ${scalar:data_name}
  • ${series:data_name}
  • ${dataframe:data_name}

核心操作类型

1. 工具调用操作(Tool Call Operations)

直接调用工具进行数据转换:

{
  "type": "tool_call",
  "tool": {
    "id": "tool_name",
    "arguments": [...]
  }
}
  • 输入: null、scalar、series 或 dataframe
  • 输出: 任意数据类型(取决于工具)

2. 映射操作(Map Operations)

将工具应用到 series 中的每个项目或 dataframe 中的每一行:

{
  "type": "map",
  "tool": {
    "id": "tool_name",
    "arguments": [
      {
        "name": "param_name",
        "type": "map_item",
        "value": "dataframe:source_data.column_name"
      }
    ]
  }
}

这是字段模板模式的核心机制,用于逐列逐行填充结果表。

3. 应用操作(Apply Operations)

应用自定义函数(通常是 LLM 调用)进行数据聚合或转换:

{
  "type": "apply",
  "function": {
    "type": "llm_call",
    "prompt": "分析以下数据: ${dataframe:input_data}"
  }
}

重要限制: LLM 调用只能用于文本分析和生成单一文本值,不能调用其他 MCP 工具或生成结构化输出。

4. 条件分支操作(SwitchFlow)

基于输入数据评估条件进行路由:

{
  "type": "switch",
  "cases": [
    {
      "conditions": {
        "type": "and",
        "conditions": [
          {
            "operator": "scalar_equal",
            "scalar_equal": {
              "input": "scalar:user_type",
              "value": "premium"
            }
          }
        ]
      },
      "next_flow": "premium_processing_flow"
    }
  ],
  "default_flow": "standard_processing_flow"
}

支持多种条件操作符:相等、大小比较、字符串匹配等。

DataFrame 操作工具

系统提供全面的 DataFrame 操作工具:

  • 数据初始化: 从结构化数据创建 DataFrame
  • 数据过滤: 使用各种操作符应用单个或多个条件
  • 公式应用: 应用类似 Excel 的公式到 DataFrame 列
  • 数据合并:
    • 垂直连接多个 DataFrame
    • 使用指定连接键合并 DataFrame
    • 智能模糊合并(使用 LLM 分析)
  • 数据排序: 按一个或多个列排序

参数类型系统

预定义参数(Predefined Arguments)

设计时确定的静态值:

{
  "name": "pages",
  "type": "predefined", 
  "value": 10,
  "description": "您想检索多少个结果?"
}

可以引用变量:"value": "variable:scalar:target_city"

输出引用参数(From Output Arguments)

引用先前输出的参数:

{
  "name": "queries",
  "type": "from_output",
  "value": "dataframe:generated_queries"
}

映射项参数(Map Item Arguments)

从映射操作中正在处理的数据提取的值:

{
  "name": "query",
  "type": "map_item", 
  "value": "dataframe:twitter_keywords.keyword"
}

限制: 一个工具的参数列表最多只能有一个 map_item

变量管理系统

变量是 dataflow 的配置属性,定义一次,全局使用。支持数据类型前缀:

{
  "variables": [
    {
      "name": "variable:scalar:target_language",
      "default_value": "zh", 
      "description": "要搜索的语言?"
    },
    {
      "name": "variable:series:search_keywords",
      "default_value": ["AI", "机器学习", "数据科学"],
      "description": "您想搜索什么关键词?"
    },
    {
      "name": "variable:dataframe:sample_data",
      "default_value": [
        {"name": "Alice", "age": 30, "city": "New York"}
      ],
      "description": "您想使用什么样本数据?"
    }
  ]
}

变量使用场景

  • 用户输入的初始种子数据
  • 已知配置列表
  • 静态参考数据
  • 测试数据或示例

结果表系统

结果表定义

关键要求: 每个 dataflow 必须定义至少一个结果表。

{
  "result_tables": [
    {
      "name": "result:dataframe:analysis_results",
      "columns": [
        {
          "name": "entity_id",
          "description": "被分析实体的唯一标识符"
        },
        {
          "name": "score", 
          "description": "0到100的分析得分"
        }
      ],
      "description": "所有处理实体的最终分析结果"
    }
  ]
}

结果更新

流程可以指定如何更新结果表:

{
  "result_updates": [
    {
      "type": "upsert",
      "table_name": "result:dataframe:analysis_results",
      "column_mapping": {
        "entity_id": "dataframe:processed_data.id",
        "score": "dataframe:processed_data.updated_score"
      }
    }
  ]
}

数据转换与引用语法

JSON 到 DataFrame 的转换

MCP 工具的 JSON 输出自动扁平化为 DataFrame 列:

{
  "status": "success",
  "results": [
    {"title": "文章1", "url": "https://example1.com"},
    {"title": "文章2", "url": "https://example2.com"}
  ]
}

扁平化后的 DataFrame 列:

  • status - 包含 "success"
  • results.title - 包含文章标题
  • results.url - 包含文章URL

列引用语法

  • series:series_name - 整个 series
  • dataframe:table_name.column_name - dataframe 中的特定列
  • dataframe:table_name.nested.parent.child.column_name - 嵌套字段
  • variable:series:series_name - 变量中的 series
  • variable:dataframe:table_name.column_name - 变量中 dataframe 的列

设计模式与最佳实践

核心模式:字段模板模式

这是构建结果表的基本方法:

初始数据 → DataFrame
Map (填充字段1) → 填充了字段1的 DataFrame  
Map (填充字段2) → 填充了字段1和2的 DataFrame
Map (填充字段3) → 填充了字段1、2、3的 DataFrame
...
结果表更新 → 完整的结构化表格

常见工作流模式

模式1:数据收集与处理

工具调用(收集数据)→ DataFrame
Map(使用字段模板丰富/处理每个项目)→ 增强的 DataFrame
Apply(分析/汇总)→ 最终结果

模式2:多源数据集成

工具调用A → DataFrame A
工具调用B → DataFrame B  
Map(使用字段模板合并/丰富)→ 集成的 DataFrame
Apply(分析/比较)→ 综合分析

最佳实践原则

  1. 字段模板模式优先: 使用 map 操作逐列构建结果表
  2. 结果表必需: 每个 dataflow 必须定义至少一个结果表
  3. 业务流集成: 将 result_updates 添加到自然产生最终结果的流程中
  4. 批量操作: 使用 map 操作而非多个单独的工具调用
  5. 类型一致性: 确保流程之间的数据类型兼容性
  6. 错误处理: 设计流程以抗部分失败
  7. 可扩展性: 考虑大数据集的处理能力

系统特性

依赖管理

  • 基于数据流的隐式依赖关系
  • 系统自动根据数据依赖确定执行顺序
  • 无需显式定义依赖关系

执行特性

  • Map 操作可以并行化
  • 支持流式处理大数据集
  • 内存高效的数据处理

集成能力

  • 与 MCP (Model Context Protocol) 工具完全集成
  • 支持多种外部数据源和API
  • 灵活的工具扩展机制

这个 dataflow 系统为 MaybeAI 平台提供了强大的数据处理能力,通过标准化的字段模板模式和结构化的操作类型,实现了复杂业务数据工作流的自动化处理。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment