FastestAI Dataflow 是一个基于 DataFrame 范式的业务数据工作流自动化系统,属于 MaybeAI 平台的核心组件。该系统实现了完整的"获取 → 分析 → 行动"业务循环,允许用户用自然语言描述数据需求,自动生成可执行的数据处理工作流。
Dataflow 的核心是表格构建工作流,遵循字段模板模式:
- 基于用户意图确定最终结果表结构
- 识别需要填充的列(字段)
- 使用 map 操作系统性地逐列逐行填充结果表
- 每个 map 操作使用 MCP 工具和字段模板工具填充特定字段
- Scalar: 单一值(字符串、数字、布尔值等)
- Series: 一维数据数组(类似 pandas Series)
- DataFrame: 二维表格数据,具有命名列
scalar:data_name- 单一值series:data_name- 一维数据dataframe:data_name- 表格数据
使用 ${} 语法在字符串中格式化数据:
${scalar:data_name}${series:data_name}${dataframe:data_name}
直接调用工具进行数据转换:
{
"type": "tool_call",
"tool": {
"id": "tool_name",
"arguments": [...]
}
}
- 输入: null、scalar、series 或 dataframe
- 输出: 任意数据类型(取决于工具)
将工具应用到 series 中的每个项目或 dataframe 中的每一行:
{
"type": "map",
"tool": {
"id": "tool_name",
"arguments": [
{
"name": "param_name",
"type": "map_item",
"value": "dataframe:source_data.column_name"
}
]
}
}
这是字段模板模式的核心机制,用于逐列逐行填充结果表。
应用自定义函数(通常是 LLM 调用)进行数据聚合或转换:
{
"type": "apply",
"function": {
"type": "llm_call",
"prompt": "分析以下数据: ${dataframe:input_data}"
}
}
重要限制: LLM 调用只能用于文本分析和生成单一文本值,不能调用其他 MCP 工具或生成结构化输出。
基于输入数据评估条件进行路由:
{
"type": "switch",
"cases": [
{
"conditions": {
"type": "and",
"conditions": [
{
"operator": "scalar_equal",
"scalar_equal": {
"input": "scalar:user_type",
"value": "premium"
}
}
]
},
"next_flow": "premium_processing_flow"
}
],
"default_flow": "standard_processing_flow"
}
支持多种条件操作符:相等、大小比较、字符串匹配等。
系统提供全面的 DataFrame 操作工具:
- 数据初始化: 从结构化数据创建 DataFrame
- 数据过滤: 使用各种操作符应用单个或多个条件
- 公式应用: 应用类似 Excel 的公式到 DataFrame 列
- 数据合并:
- 垂直连接多个 DataFrame
- 使用指定连接键合并 DataFrame
- 智能模糊合并(使用 LLM 分析)
- 数据排序: 按一个或多个列排序
设计时确定的静态值:
{
"name": "pages",
"type": "predefined",
"value": 10,
"description": "您想检索多少个结果?"
}
可以引用变量:"value": "variable:scalar:target_city"
引用先前输出的参数:
{
"name": "queries",
"type": "from_output",
"value": "dataframe:generated_queries"
}
从映射操作中正在处理的数据提取的值:
{
"name": "query",
"type": "map_item",
"value": "dataframe:twitter_keywords.keyword"
}
限制: 一个工具的参数列表最多只能有一个 map_item。
变量是 dataflow 的配置属性,定义一次,全局使用。支持数据类型前缀:
{
"variables": [
{
"name": "variable:scalar:target_language",
"default_value": "zh",
"description": "要搜索的语言?"
},
{
"name": "variable:series:search_keywords",
"default_value": ["AI", "机器学习", "数据科学"],
"description": "您想搜索什么关键词?"
},
{
"name": "variable:dataframe:sample_data",
"default_value": [
{"name": "Alice", "age": 30, "city": "New York"}
],
"description": "您想使用什么样本数据?"
}
]
}
- 用户输入的初始种子数据
- 已知配置列表
- 静态参考数据
- 测试数据或示例
关键要求: 每个 dataflow 必须定义至少一个结果表。
{
"result_tables": [
{
"name": "result:dataframe:analysis_results",
"columns": [
{
"name": "entity_id",
"description": "被分析实体的唯一标识符"
},
{
"name": "score",
"description": "0到100的分析得分"
}
],
"description": "所有处理实体的最终分析结果"
}
]
}
流程可以指定如何更新结果表:
{
"result_updates": [
{
"type": "upsert",
"table_name": "result:dataframe:analysis_results",
"column_mapping": {
"entity_id": "dataframe:processed_data.id",
"score": "dataframe:processed_data.updated_score"
}
}
]
}
MCP 工具的 JSON 输出自动扁平化为 DataFrame 列:
{
"status": "success",
"results": [
{"title": "文章1", "url": "https://example1.com"},
{"title": "文章2", "url": "https://example2.com"}
]
}
扁平化后的 DataFrame 列:
status- 包含 "success"results.title- 包含文章标题results.url- 包含文章URL
series:series_name- 整个 seriesdataframe:table_name.column_name- dataframe 中的特定列dataframe:table_name.nested.parent.child.column_name- 嵌套字段variable:series:series_name- 变量中的 seriesvariable:dataframe:table_name.column_name- 变量中 dataframe 的列
这是构建结果表的基本方法:
初始数据 → DataFrame
Map (填充字段1) → 填充了字段1的 DataFrame
Map (填充字段2) → 填充了字段1和2的 DataFrame
Map (填充字段3) → 填充了字段1、2、3的 DataFrame
...
结果表更新 → 完整的结构化表格
工具调用(收集数据)→ DataFrame
Map(使用字段模板丰富/处理每个项目)→ 增强的 DataFrame
Apply(分析/汇总)→ 最终结果
工具调用A → DataFrame A
工具调用B → DataFrame B
Map(使用字段模板合并/丰富)→ 集成的 DataFrame
Apply(分析/比较)→ 综合分析
- 字段模板模式优先: 使用 map 操作逐列构建结果表
- 结果表必需: 每个 dataflow 必须定义至少一个结果表
- 业务流集成: 将 result_updates 添加到自然产生最终结果的流程中
- 批量操作: 使用 map 操作而非多个单独的工具调用
- 类型一致性: 确保流程之间的数据类型兼容性
- 错误处理: 设计流程以抗部分失败
- 可扩展性: 考虑大数据集的处理能力
- 基于数据流的隐式依赖关系
- 系统自动根据数据依赖确定执行顺序
- 无需显式定义依赖关系
- Map 操作可以并行化
- 支持流式处理大数据集
- 内存高效的数据处理
- 与 MCP (Model Context Protocol) 工具完全集成
- 支持多种外部数据源和API
- 灵活的工具扩展机制
这个 dataflow 系统为 MaybeAI 平台提供了强大的数据处理能力,通过标准化的字段模板模式和结构化的操作类型,实现了复杂业务数据工作流的自动化处理。