Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

fightBoxing/daft-platform

Folders and files

NameName
Last commit message
Last commit date

Latest commit

History

2 Commits

Repository files navigation

Daft 多模态数据处理平台

基于 Daft 引擎构建的多模态数据处理低代码平台,提供 Web SQL 工作台、Schema 浏览器、函数文档、AI UDF 等功能。

功能概览

模块 说明
SQL 工作台 Web 端 SQL 编辑器,支持自动补全、语法高亮、执行历史
Schema 浏览器 Catalog → Schema → Table 三级树形元数据浏览
函数文档 219+ 内置函数分类查看、搜索、自动补全
AI UDF 16 个预置 AI 函数:图像识别、OCR、情感分析、PII 脱敏等
元数据服务 支持 Apache Gravitino(分布式)和 SQLite(本地降级)双后端
DAG 编排 可视化数据处理流程编排(实验性)

项目结构

daft-platform/ # 项目根目录
├── README.md
├── requirements.txt
├── Makefile # make dev / make test / make run
├── .gitignore
└── daft_platform/ # Python 包
 ├── app.py # FastAPI 应用入口
 ├── config.py # 配置管理(环境变量)
 ├── models/ # Pydantic 请求/响应模型
 │ ├── sql.py # SQL 执行相关
 │ ├── datasource.py # 数据源/元数据相关
 │ └── function.py # 函数注册表相关
 ├── routers/ # API 路由层
 │ ├── sql.py # /api/sql/*
 │ ├── datasource.py # /api/datasource/*
 │ ├── function.py # /api/function/*
 │ └── settings.py # /api/settings/*
 ├── services/ # 业务逻辑层
 │ ├── session_manager.py # Daft Session 单例管理
 │ ├── sql_executor.py # SQL 执行 + 错误翻译
 │ ├── sql_history.py # 执行历史(SQLite)
 │ ├── function_registry.py # 函数注册表
 │ ├── builtin_udfs.py # 16 个预置 AI UDF
 │ ├── metadata_service.py # 元数据抽象接口 + 工厂函数
 │ ├── gravitino_metadata.py # Gravitino REST API 实现
 │ └── local_metadata.py # SQLite 本地降级实现
 ├── templates/ # Jinja2 前端页面
 │ ├── index.html # SQL 工作台主页
 │ └── dag.html # DAG 编排页
 ├── static/ # 静态资源(JS/CSS)
 ├── tests/ # 单元测试(pytest)
 ├── test_data/ # 测试用图片(dog_*.jpg)
 ├── test_data.csv # 测试用 CSV
 ├── deploy/ # 部署配置
 │ └── gravitino-pod.yaml # Gravitino K8s 部署
 └── data/ # 运行时数据(.gitignore)

技术栈

  • 后端: Python 3.10+, FastAPI, Uvicorn, Pydantic v2
  • 计算引擎: Daft (Rust + Python)
  • 元数据: Apache Gravitino / SQLite (降级)
  • 前端: Jinja2 模板 + 原生 JavaScript
  • 测试: pytest

快速开始

1. 前置条件

  • Python 3.10+
  • Daft 引擎(需从源码编译)
# 编译安装 Daft 引擎
git clone https://github.com/Eventual-Inc/Daft.git && cd Daft
make .venv && make build
source .venv/bin/activate

2. 克隆本项目并安装依赖

git clone https://github.com/fightBoxing/daft-platform.git
cd daft-platform
pip install -r requirements.txt
# 可选:AI UDF 额外依赖
pip install easyocr # OCR 文字识别
pip install imagehash # 图像感知哈希
pip install face_recognition # 人脸检测(或 opencv-python)

3. 启动服务

# 开发模式(自动重载)
make dev
# 生产模式
make run
# 或直接用 uvicorn
uvicorn daft_platform.app:app --host 0.0.0.0 --port 8080

启动后访问:


配置

所有配置通过环境变量管理,前缀 DAFT_PLATFORM_:

环境变量 默认值 说明
DAFT_PLATFORM_DEBUG true 调试模式
DAFT_PLATFORM_MAX_PREVIEW_ROWS 50 预览最大行数
DAFT_PLATFORM_MAX_RESULT_ROWS 10000 查询结果最大行数
DAFT_PLATFORM_SQL_TIMEOUT_SECONDS 300 SQL 执行超时(秒)
DAFT_PLATFORM_GRAVITINO_ENABLED true 是否启用 Gravitino 元数据服务
DAFT_PLATFORM_GRAVITINO_URI http://localhost:30090 Gravitino 服务地址
DAFT_PLATFORM_GRAVITINO_METALAKE daft_platform Gravitino metalake 名称
DAFT_PLATFORM_GRAVITINO_TIMEOUT 3 Gravitino 连接超时(秒)

AI / LLM 配置

AI UDF(如 classify_imagetext_summary)需要 LLM API:

环境变量 说明
OPENAI_API_KEY OpenAI / 兼容 API 密钥
OPENAI_BASE_URL API 地址(默认 https://api.openai.com/v1)
OPENAI_MODEL 模型名(默认 gpt-4o-mini)

也可在 Web 界面右上角 ⚙️ 设置中配置,保存到本地数据库。

元数据服务降级

启动时自动检测 Gravitino 可用性:

  • Gravitino 可用 → 使用 Gravitino REST API 管理元数据
  • Gravitino 不可用 → 自动降级到本地 SQLite(零外部依赖)

禁用 Gravitino 直接使用 SQLite:

export DAFT_PLATFORM_GRAVITINO_ENABLED=false

部署 Gravitino(可选)

# K8s 部署
kubectl apply -f daft_platform/deploy/gravitino-pod.yaml
# Docker 部署
docker run -d --name gravitino -p 30090:8090 apache/gravitino:latest

API 接口

SQL 执行

# 执行 SQL
curl -X POST http://localhost:8080/api/sql/execute \
 -H 'Content-Type: application/json' \
 -d '{"sql": "SELECT * FROM my_table LIMIT 10", "limit": 1000}'
# SQL 安全验证
curl -X POST http://localhost:8080/api/sql/validate \
 -H 'Content-Type: application/json' \
 -d '{"sql": "SELECT 1"}'
# 查看执行历史
curl http://localhost:8080/api/sql/history?limit=10

数据源管理

# 注册 JSON 数据表
curl -X POST http://localhost:8080/api/datasource/register \
 -H 'Content-Type: application/json' \
 -d '{
 "name": "users",
 "source_type": "json",
 "data": {"id": [1, 2, 3], "name": ["Alice", "Bob", "Carol"]}
 }'
# 注册文件表(CSV/Parquet/JSON)
curl -X POST http://localhost:8080/api/datasource/register \
 -H 'Content-Type: application/json' \
 -d '{"name": "sales", "source_type": "file", "path": "/data/sales.parquet"}'
# 注册 HuggingFace 数据集
curl -X POST http://localhost:8080/api/datasource/register \
 -H 'Content-Type: application/json' \
 -d '{"name": "mnist", "source_type": "huggingface", "dataset": "mnist", "limit": 1000}'
# 注册 Glob 路径(批量文件)
curl -X POST http://localhost:8080/api/datasource/register \
 -H 'Content-Type: application/json' \
 -d '{"name": "images", "source_type": "glob", "glob_pattern": "/data/photos/**/*.jpg"}'
# 查看元数据树
curl http://localhost:8080/api/datasource/tree

函数查询

# 列出所有函数
curl http://localhost:8080/api/function/list
# 按分类筛选
curl http://localhost:8080/api/function/list?category=ai
# 搜索函数
curl http://localhost:8080/api/function/search?q=image
# SQL 自动补全
curl http://localhost:8080/api/function/completions?prefix=image_

SQL 语法指南

Daft SQL 兼容标准 SQL 语法,额外支持多模态数据处理函数。

基础查询

-- 注册测试数据后查询
SELECT * FROM test_data WHERE score > 85 ORDER BY score DESC;
-- 聚合统计
SELECT category, COUNT(*) AS cnt, AVG(score) AS avg_score
FROM test_data
GROUP BY category
ORDER BY avg_score DESC;
-- CTE(公用表表达式)
WITH top_students AS (
 SELECT * FROM test_data WHERE score > 90
)
SELECT category, COUNT(*) FROM top_students GROUP BY category;
-- 窗口函数
SELECT name, score,
 RANK() OVER (ORDER BY score DESC) AS rank,
 ROW_NUMBER() OVER (PARTITION BY category ORDER BY score DESC) AS cat_rank
FROM test_data;

字符串函数

SELECT lower(name), upper(city) FROM test_data;
SELECT * FROM test_data WHERE contains(city, '');
SELECT * FROM test_data WHERE starts_with(name, '');
SELECT length(name) AS name_len FROM test_data;
SELECT replace(city, '北京', 'Beijing') FROM test_data;
SELECT split(name, '') FROM test_data; -- 按字符拆分

数值函数

SELECT abs(score - 85) AS diff FROM test_data;
SELECT round(score, 0) AS rounded FROM test_data;
SELECT ceil(score), floor(score) FROM test_data;
SELECT clip(score, 80, 95) AS clipped FROM test_data;

时间函数

SELECT current_date(), current_timestamp();
SELECT year(to_date('2024年03月15日', '%Y-%m-%d'));
SELECT date_trunc('month', current_timestamp());

列表函数

SELECT explode(split(name, '')) FROM test_data; -- 展开为多行
SELECT list_count(split(city, '')) FROM test_data;

多模态数据处理 SQL

以下为 Daft 特色的多模态数据处理能力,可在 SQL 中直接使用。

图像处理

-- 1. 注册图片数据集(Glob 方式扫描目录)
-- 在 Web 界面中:数据源 → 注册 → Glob → 路径填 /path/to/images/**/*.jpg
-- 2. 解码图片并调整尺寸
SELECT path, image_decode(path) AS img FROM images;
SELECT path, image_resize(image_decode(path), 224, 224) AS resized FROM images;
-- 3. 图像裁剪
SELECT path, image_crop(image_decode(path), 0, 0, 100, 100) AS cropped FROM images;

AI 视觉识别(需配置 LLM API Key)

-- 图像内容识别:调用 Vision API 返回中文描述
SELECT path, classify_image(path) AS description FROM images;
-- 图像元信息:宽高、格式、文件大小
SELECT path, image_info(path) AS info FROM images;
-- 主色调提取:返回 HEX 颜色值
SELECT path, dominant_color(path) AS color FROM images;
-- 图像感知哈希:用于去重/相似度比较
SELECT path, image_phash(path) AS phash FROM images;
-- 人脸检测:返回人脸数量
SELECT path, face_count(path) AS faces FROM images;
-- EXIF 元数据:相机型号、GPS、拍摄时间等
SELECT path, exif_extract(path) AS exif FROM images;
-- OCR 文字识别(支持中英文)
SELECT path, ocr_extract(path) AS text FROM document_images;

完整图像分析流水线示例

-- 假设已注册 images 表(Glob: /data/photos/**/*.jpg)
-- 分析每张图片:识别内容 + 提取信息 + 主色调 + 人脸
SELECT
 path,
 classify_image(path) AS description,
 image_info(path) AS info,
 dominant_color(path) AS color,
 face_count(path) AS faces,
 image_phash(path) AS phash
FROM images
LIMIT 20;
-- 筛选包含人脸的图片
SELECT path, face_count(path) AS faces
FROM images
WHERE face_count(path) > 0;
-- 图像去重(基于感知哈希)
SELECT phash, COUNT(*) AS cnt, MIN(path) AS sample
FROM (SELECT path, image_phash(path) AS phash FROM images)
GROUP BY phash
HAVING cnt > 1;

文本分析

-- 情感分析:返回 positive / negative / neutral
SELECT name, sentiment(name) AS mood FROM test_data;
-- 语言检测:返回 zh / en
SELECT name, detect_language(name) AS lang FROM test_data;
-- 关键词提取(TF 频率统计)
SELECT extract_keywords(description) AS keywords FROM articles;
-- 文本摘要(调用 LLM,50 字以内)
SELECT text_summary(content) AS summary FROM articles;
-- PII 脱敏(自动识别手机号/身份证/邮箱/银行卡)
SELECT mask_pii('联系方式: 13812345678, 邮箱: test@example.com') AS masked;
-- 输出: "联系方式: 138****5678, 邮箱: te***@example.com"
-- 文本相似度(Jaccard bigram)
SELECT text_similarity('你好世界', '你好中国') AS similarity;

音频 & 文件处理

-- 音频时长(秒,支持 mp3/wav/flac)
SELECT path, audio_duration(path) AS duration_sec FROM audio_files;
-- 文件 MD5 哈希
SELECT path, file_md5(path) AS md5 FROM files;
-- 文件转 Base64
SELECT path, to_base64(path) AS b64 FROM files;

向量距离计算

-- 余弦相似度(用于 Embedding 比较)
SELECT cosine_similarity(emb1, emb2) AS sim FROM embedding_pairs;
-- 欧氏距离
SELECT euclidean_distance(emb1, emb2) AS dist FROM embedding_pairs;
-- 点积
SELECT dot_product(emb1, emb2) AS dp FROM embedding_pairs;

URL & 网络

-- 从 URL 下载内容
SELECT url_download(url_col) AS content FROM web_sources;
-- 解析 URL 各部分
SELECT url_parse(url_col) AS parsed FROM web_sources;
-- 猜测文件 MIME 类型
SELECT guess_mime_type(path) AS mime FROM files;

内置 AI UDF 完整列表

函数名 输入 输出 说明 额外依赖
classify_image(path) 文件路径 String LLM Vision API 图像识别 LLM API Key
image_info(path) 文件路径 Struct 宽高/格式/文件大小 Pillow
dominant_color(path) 文件路径 String 主色调 HEX 值 Pillow
image_phash(path) 文件路径 String 感知哈希(去重用) imagehash / 降级 MD5
face_count(path) 文件路径 Int32 人脸数量 face_recognition / OpenCV
exif_extract(path) 文件路径 String EXIF 元数据 JSON Pillow
ocr_extract(path) 文件路径 String OCR 文字识别 easyocr / pytesseract
sentiment(text) 文本 String 情感分析
detect_language(text) 文本 String 语言检测 (zh/en)
extract_keywords(text) 文本 String 关键词提取
text_summary(text) 文本 String LLM 文本摘要 LLM API Key
mask_pii(text) 文本 String PII 脱敏
text_similarity(a, b) 两段文本 Float64 Jaccard 相似度
audio_duration(path) 文件路径 Float64 音频时长(秒) mutagen / wave
file_md5(path) 文件路径 String 文件 MD5 哈希
to_base64(path) 文件路径 String 文件转 Base64

标记 "无" 的函数无需额外安装,开箱即用。


测试

# 运行全部测试
make test
# 运行单个测试文件
python -m pytest daft_platform/tests/test_sql_executor.py -v
# 运行单个测试方法
python -m pytest daft_platform/tests/test_routers.py::test_register_and_list_table -v

测试覆盖:

测试文件 覆盖模块
test_models.py Pydantic 模型验证、health 端点
test_sql_executor.py SQL 安全验证、错误翻译
test_sql_history.py 执行历史 CRUD
test_function_registry.py 函数列表/搜索/补全
test_metadata_service.py LocalMetadataService 完整 CRUD
test_session_manager.py Session 单例、表注册
test_routers.py API 路由集成测试

安全机制

SQL 安全验证

执行前自动拦截危险操作:

DROP TABLE → ❌ 拦截
DELETE FROM → ❌ 拦截
TRUNCATE → ❌ 拦截
ALTER TABLE → ❌ 拦截
SELECT / WITH → ✅ 允许

错误翻译

Daft 引擎的 Rust 层错误自动翻译为中文友好提示:

原始错误 翻译 建议
column 'foo' not found 列 'foo' 不存在于当前表中 请检查列名拼写
table 'bar' not found 表 'bar' 不存在 请先注册该表
Type mismatch 数据类型不匹配 使用 CAST() 转换
Division by zero 除零错误 添加 CASE WHEN 判断
Out of memory 内存不足 添加 LIMIT 限制

已知问题

详见 KNOWN_ISSUES.md,包含开发过程中的踩坑记录和解决方案。

License

本项目基于 Daft 引擎开发,遵循 Apache 2.0 License。

About

Multimodal data processing platform built on Daft engine — SQL workbench, AI UDFs, metadata management

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

Contributors

AltStyle によって変換されたページ (->オリジナル) /