support openGauss vector store (#4819)

This commit is contained in:
Dechao Sun
2025-05-28 10:49:06 +08:00
committed by GitHub
parent b4ecfb0b79
commit 63028dacb2
9 changed files with 674 additions and 1 deletions

View File

@@ -0,0 +1,218 @@
# 数据库的默认账号和密码仅首次运行时设置有效
# 如果修改了账号密码,记得改数据库和项目连接参数,别只改一处~
# 该配置文件只是给快速启动,测试使用。正式使用,记得务必修改账号密码,以及调整合适的知识库参数,共享内存等。
# 如何无法访问 dockerhub 和 git可以用阿里云阿里云没有arm包
version: '3.3'
services:
# db
gs:
image: opengauss/opengauss:7.0.0-RC1 # docker hub
container_name: gs
restart: always
# ports: # 生产环境建议不要暴露
# - 5432:5432
networks:
- fastgpt
environment:
# 这里的配置只有首次运行生效。修改后,重启镜像是不会生效的。需要把持久化数据删除再重启,才有效果
- GS_USER=username
- GS_PASSWORD=password
- GS_DB=postgres
volumes:
- ./opengauss/data:/var/lib/opengauss/data
healthcheck:
test: ['CMD-SHELL', 'netstat -lntp | grep tcp6 > /dev/null 2>&1']
interval: 10s
timeout: 10s
retries: 10
mongo:
image: mongo:5.0.18 # dockerhub
# image: registry.cn-hangzhou.aliyuncs.com/fastgpt/mongo:5.0.18 # 阿里云
# image: mongo:4.4.29 # cpu不支持AVX时候使用
container_name: mongo
restart: always
# ports:
# - 27017:27017
networks:
- fastgpt
command: mongod --keyFile /data/mongodb.key --replSet rs0
environment:
- MONGO_INITDB_ROOT_USERNAME=myusername
- MONGO_INITDB_ROOT_PASSWORD=mypassword
volumes:
- ./mongo/data:/data/db
entrypoint:
- bash
- -c
- |
openssl rand -base64 128 > /data/mongodb.key
chmod 400 /data/mongodb.key
chown 999:999 /data/mongodb.key
echo 'const isInited = rs.status().ok === 1
if(!isInited){
rs.initiate({
_id: "rs0",
members: [
{ _id: 0, host: "mongo:27017" }
]
})
}' > /data/initReplicaSet.js
# 启动MongoDB服务
exec docker-entrypoint.sh "$$@" &
# 等待MongoDB服务启动
until mongo -u myusername -p mypassword --authenticationDatabase admin --eval "print('waited for connection')"; do
echo "Waiting for MongoDB to start..."
sleep 2
done
# 执行初始化副本集的脚本
mongo -u myusername -p mypassword --authenticationDatabase admin /data/initReplicaSet.js
# 等待docker-entrypoint.sh脚本执行的MongoDB服务进程
wait $$!
redis:
image: redis:7.2-alpine
container_name: redis
# ports:
# - 6379:6379
networks:
- fastgpt
restart: always
command: |
redis-server --requirepass mypassword --loglevel warning --maxclients 10000 --appendonly yes --save 60 10 --maxmemory 4gb --maxmemory-policy noeviction
healthcheck:
test: ['CMD', 'redis-cli', '-a', 'mypassword', 'ping']
interval: 10s
timeout: 3s
retries: 3
start_period: 30s
volumes:
- ./redis/data:/data
# fastgpt
sandbox:
container_name: sandbox
image: ghcr.io/labring/fastgpt-sandbox:v4.9.7-fix2 # git
# image: registry.cn-hangzhou.aliyuncs.com/fastgpt/fastgpt-sandbox:v4.9.7-fix2 # 阿里云
networks:
- fastgpt
restart: always
fastgpt-mcp-server:
container_name: fastgpt-mcp-server
image: ghcr.io/labring/fastgpt-mcp_server:v4.9.7-fix2 # git
# image: registry.cn-hangzhou.aliyuncs.com/fastgpt/fastgpt-mcp_server:v4.9.7-fix2 # 阿里云
ports:
- 3005:3000
networks:
- fastgpt
restart: always
environment:
- FASTGPT_ENDPOINT=http://fastgpt:3000
fastgpt:
container_name: fastgpt
image: ghcr.io/labring/fastgpt:v4.9.7-fix2 # git
# image: registry.cn-hangzhou.aliyuncs.com/fastgpt/fastgpt:v4.9.7-fix2 # 阿里云
# image: swr.cn-north-4.myhuaweicloud.com/ddn-k8s/ghcr.io/labring/fastgpt:v4.8.4-linuxarm64 # openGauss在arm架构上性能更好
ports:
- 3000:3000
networks:
- fastgpt
depends_on:
- mongo
- gs
- sandbox
restart: always
environment:
# 前端外部可访问的地址,用于自动补全文件资源路径。例如 https:fastgpt.cn不能填 localhost。这个值可以不填不填则发给模型的图片会是一个相对路径而不是全路径模型可能伪造Host。
- FE_DOMAIN=
# root 密码,用户名为: root。如果需要修改 root 密码,直接修改这个环境变量,并重启即可。
- DEFAULT_ROOT_PSW=1234
# AI Proxy 的地址,如果配了该地址,优先使用
- AIPROXY_API_ENDPOINT=http://aiproxy:3000
# AI Proxy 的 Admin Token与 AI Proxy 中的环境变量 ADMIN_KEY
- AIPROXY_API_TOKEN=aiproxy
# 数据库最大连接数
- DB_MAX_LINK=30
# 登录凭证密钥
- TOKEN_KEY=any
# root的密钥常用于升级时候的初始化请求
- ROOT_KEY=root_key
# 文件阅读加密
- FILE_TOKEN_KEY=filetoken
# MongoDB 连接参数. 用户名myusername,密码mypassword。
- MONGODB_URI=mongodb://myusername:mypassword@mongo:27017/fastgpt?authSource=admin
# openGauss 连接参数
- OPENGAUSS_URL=opengauss://gaussdb:Huawei12%23%24@gs:9999/test
# Redis 连接参数
- REDIS_URL=redis://default:mypassword@redis:6379
# sandbox 地址
- SANDBOX_URL=http://sandbox:3000
# 日志等级: debug, info, warn, error
- LOG_LEVEL=info
- STORE_LOG_LEVEL=warn
# 工作流最大运行次数
- WORKFLOW_MAX_RUN_TIMES=1000
# 批量执行节点,最大输入长度
- WORKFLOW_MAX_LOOP_TIMES=100
# 自定义跨域,不配置时,默认都允许跨域(多个域名通过逗号分割)
- ALLOWED_ORIGINS=
# 是否开启IP限制默认不开启
- USE_IP_LIMIT=false
# 对话文件过期天数
- CHAT_FILE_EXPIRE_TIME=7
volumes:
- ./config.json:/app/data/config.json
# AI Proxy
aiproxy:
image: ghcr.io/labring/aiproxy:v0.1.7
# image: registry.cn-hangzhou.aliyuncs.com/labring/aiproxy:v0.1.7 # 阿里云
container_name: aiproxy
restart: unless-stopped
depends_on:
aiproxy_pg:
condition: service_healthy
networks:
- fastgpt
environment:
# 对应 fastgpt 里的AIPROXY_API_TOKEN
- ADMIN_KEY=aiproxy
# 错误日志详情保存时间(小时)
- LOG_DETAIL_STORAGE_HOURS=1
# 数据库连接地址
- SQL_DSN=postgres://postgres:aiproxy@aiproxy_pg:5432/aiproxy
# 最大重试次数
- RETRY_TIMES=3
# 不需要计费
- BILLING_ENABLED=false
# 不需要严格检测模型
- DISABLE_MODEL_CONFIG=true
healthcheck:
test: ['CMD', 'curl', '-f', 'http://localhost:3000/api/status']
interval: 5s
timeout: 5s
retries: 10
aiproxy_pg:
image: pgvector/pgvector:0.8.0-pg15 # docker hub
# image: registry.cn-hangzhou.aliyuncs.com/fastgpt/pgvector:v0.8.0-pg15 # 阿里云
restart: unless-stopped
container_name: aiproxy_pg
volumes:
- ./aiproxy_pg:/var/lib/postgresql/data
networks:
- fastgpt
environment:
TZ: Asia/Shanghai
POSTGRES_USER: postgres
POSTGRES_DB: aiproxy
POSTGRES_PASSWORD: aiproxy
healthcheck:
test: ['CMD', 'pg_isready', '-U', 'postgres', '-d', 'aiproxy']
interval: 5s
timeout: 5s
retries: 10
networks:
fastgpt:

1
env.d.ts vendored
View File

@@ -15,6 +15,7 @@ declare global {
MONGODB_LOG_URI?: string;
PG_URL: string;
OCEANBASE_URL: string;
OPENGAUSS_URL: string;
MILVUS_ADDRESS: string;
MILVUS_TOKEN: string;
SANDBOX_URL: string;

View File

@@ -124,6 +124,13 @@ export type PgSearchRawType = {
collection_id: string;
score: number;
};
export type GsSearchRawType = {
id: string;
collection_id: string;
score: number;
};
export type PushDatasetDataChunkProps = {
q: string; // embedding content
a?: string; // bonus content

View File

@@ -3,5 +3,6 @@ export const DatasetVectorTableName = 'modeldata';
export const PG_ADDRESS = process.env.PG_URL;
export const OCEANBASE_ADDRESS = process.env.OCEANBASE_URL;
export const OPENGAUSS_ADDRESS = process.env.OPENGAUSS_URL;
export const MILVUS_ADDRESS = process.env.MILVUS_ADDRESS;
export const MILVUS_TOKEN = process.env.MILVUS_TOKEN;

View File

@@ -1,10 +1,11 @@
/* vector crud */
import { PgVectorCtrl } from './pg';
import { ObVectorCtrl } from './oceanbase';
import { GsVectorCtrl } from './opengauss';
import { getVectorsByText } from '../../core/ai/embedding';
import { type DelDatasetVectorCtrlProps, type InsertVectorProps } from './controller.d';
import { type EmbeddingModelItemType } from '@fastgpt/global/core/ai/model.d';
import { MILVUS_ADDRESS, PG_ADDRESS, OCEANBASE_ADDRESS } from './constants';
import { MILVUS_ADDRESS, PG_ADDRESS, OCEANBASE_ADDRESS, OPENGAUSS_ADDRESS } from './constants';
import { MilvusCtrl } from './milvus';
import { setRedisCache, getRedisCache, delRedisCache, CacheKeyEnum } from '../redis/cache';
import { throttle } from 'lodash';
@@ -14,6 +15,7 @@ const getVectorObj = () => {
if (PG_ADDRESS) return new PgVectorCtrl();
if (OCEANBASE_ADDRESS) return new ObVectorCtrl();
if (MILVUS_ADDRESS) return new MilvusCtrl();
if (OPENGAUSS_ADDRESS) return new GsVectorCtrl();
return new PgVectorCtrl();
};

View File

@@ -0,0 +1,188 @@
import { delay } from '@fastgpt/global/common/system/utils';
import { addLog } from '../../system/log';
import { Pool } from 'pg';
import type { QueryResultRow } from 'pg';
import { OPENGAUSS_ADDRESS } from '../constants';
export const connectGs = async (): Promise<Pool> => {
if (global.gsClient) {
return global.gsClient;
}
global.gsClient = new Pool({
connectionString: OPENGAUSS_ADDRESS,
max: Number(process.env.DB_MAX_LINK || 20),
min: 10,
keepAlive: true,
idleTimeoutMillis: 600000,
connectionTimeoutMillis: 20000,
query_timeout: 30000,
statement_timeout: 40000,
idle_in_transaction_session_timeout: 60000
});
global.gsClient.on('error', async (err) => {
addLog.error(`openGauss error`, err);
global.gsClient?.end();
global.gsClient = null;
await delay(1000);
addLog.info(`Retry connect openGauss`);
connectGs();
});
try {
await global.gsClient.connect();
console.log('openGauss connected');
return global.gsClient;
} catch (error) {
addLog.error(`openGauss connect error`, error);
global.gsClient?.end();
global.gsClient = null;
await delay(1000);
addLog.info(`Retry connect openGauss`);
return connectGs();
}
};
type WhereProps = (string | [string, string | number])[];
type GetProps = {
fields?: string[];
where?: WhereProps;
order?: { field: string; mode: 'DESC' | 'ASC' | string }[];
limit?: number;
offset?: number;
};
type DeleteProps = {
where: WhereProps;
};
type ValuesProps = { key: string; value?: string | number }[];
type UpdateProps = {
values: ValuesProps;
where: WhereProps;
};
type InsertProps = {
values: ValuesProps[];
};
class GsClass {
private getWhereStr(where?: WhereProps) {
return where
? `WHERE ${where
.map((item) => {
if (typeof item === 'string') {
return item;
}
const val = typeof item[1] === 'number' ? item[1] : `'${String(item[1])}'`;
return `${item[0]}=${val}`;
})
.join(' ')}`
: '';
}
private getUpdateValStr(values: ValuesProps) {
return values
.map((item) => {
const val =
typeof item.value === 'number'
? item.value
: `'${String(item.value).replace(/\'/g, '"')}'`;
return `${item.key}=${val}`;
})
.join(',');
}
private getInsertValStr(values: ValuesProps[]) {
return values
.map(
(items) =>
`(${items
.map((item) =>
typeof item.value === 'number'
? item.value
: `'${String(item.value).replace(/\'/g, '"')}'`
)
.join(',')})`
)
.join(',');
}
async select<T extends QueryResultRow = any>(table: string, props: GetProps) {
const sql = `SELECT ${
!props.fields || props.fields?.length === 0 ? '*' : props.fields?.join(',')
}
FROM ${table}
${this.getWhereStr(props.where)}
${
props.order
? `ORDER BY ${props.order.map((item) => `${item.field} ${item.mode}`).join(',')}`
: ''
}
LIMIT ${props.limit || 10} OFFSET ${props.offset || 0}
`;
const gs = await connectGs();
return gs.query<T>(sql);
}
async count(table: string, props: GetProps) {
const sql = `SELECT COUNT(${props?.fields?.[0] || '*'})
FROM ${table}
${this.getWhereStr(props.where)}
`;
const gs = await connectGs();
return gs.query(sql).then((res) => Number(res.rows[0]?.count || 0));
}
async delete(table: string, props: DeleteProps) {
const sql = `DELETE FROM ${table} ${this.getWhereStr(props.where)}`;
const gs = await connectGs();
return gs.query(sql);
}
async update(table: string, props: UpdateProps) {
if (props.values.length === 0) {
return {
rowCount: 0
};
}
const sql = `UPDATE ${table} SET ${this.getUpdateValStr(props.values)} ${this.getWhereStr(
props.where
)}`;
const gs = await connectGs();
return gs.query(sql);
}
async insert(table: string, props: InsertProps) {
if (props.values.length === 0) {
return {
rowCount: 0,
rows: []
};
}
const fields = props.values[0].map((item) => item.key).join(',');
const sql = `INSERT INTO ${table} (${fields}) VALUES ${this.getInsertValStr(
props.values
)} RETURNING id`;
const gs = await connectGs();
return gs.query<{ id: string }>(sql);
}
async query<T extends QueryResultRow = any>(sql: string) {
const gs = await connectGs();
const start = Date.now();
return gs.query<T>(sql).then((res) => {
const time = Date.now() - start;
if (time > 300) {
addLog.warn(`gs query time: ${time}ms, sql: ${sql}`);
}
return res;
});
}
}
export const GsClient = new GsClass();
export const Gs = global.gsClient;

View File

@@ -0,0 +1,253 @@
/* pg vector crud */
import { DatasetVectorTableName } from '../constants';
import { delay } from '@fastgpt/global/common/system/utils';
import { GsClient, connectGs } from './controller';
import { GsSearchRawType } from '@fastgpt/global/core/dataset/api';
import type {
DelDatasetVectorCtrlProps,
EmbeddingRecallCtrlProps,
EmbeddingRecallResponse,
InsertVectorControllerProps
} from '../controller.d';
import dayjs from 'dayjs';
import { addLog } from '../../system/log';
export class GsVectorCtrl {
constructor() {}
init = async () => {
try {
await connectGs();
await GsClient.query(`
CREATE EXTENSION IF NOT EXISTS vector;
CREATE TABLE IF NOT EXISTS ${DatasetVectorTableName} (
id BIGSERIAL PRIMARY KEY,
vector VECTOR(1536) NOT NULL,
team_id VARCHAR(50) NOT NULL,
dataset_id VARCHAR(50) NOT NULL,
collection_id VARCHAR(50) NOT NULL,
createtime TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
`);
await GsClient.query(
`CREATE INDEX CONCURRENTLY IF NOT EXISTS vector_index ON ${DatasetVectorTableName} USING hnsw (vector vector_ip_ops) WITH (m = 32, ef_construction = 128);`
);
await GsClient.query(
`CREATE INDEX CONCURRENTLY IF NOT EXISTS team_dataset_collection_index ON ${DatasetVectorTableName} USING btree(team_id, dataset_id, collection_id);`
);
await GsClient.query(
`CREATE INDEX CONCURRENTLY IF NOT EXISTS create_time_index ON ${DatasetVectorTableName} USING btree(createtime);`
);
addLog.info('init pg successful');
} catch (error) {
addLog.error('init pg error', error);
}
};
insert = async (props: InsertVectorControllerProps): Promise<{ insertId: string }> => {
const { teamId, datasetId, collectionId, vector, retry = 3 } = props;
try {
const { rowCount, rows } = await GsClient.insert(DatasetVectorTableName, {
values: [
[
{ key: 'vector', value: `[${vector}]` },
{ key: 'team_id', value: String(teamId) },
{ key: 'dataset_id', value: String(datasetId) },
{ key: 'collection_id', value: String(collectionId) }
]
]
});
if (rowCount === 0) {
return Promise.reject('insertDatasetData: no insert');
}
return {
insertId: rows[0].id
};
} catch (error) {
if (retry <= 0) {
return Promise.reject(error);
}
await delay(500);
return this.insert({
...props,
retry: retry - 1
});
}
};
delete = async (props: DelDatasetVectorCtrlProps): Promise<any> => {
const { teamId, retry = 2 } = props;
const teamIdWhere = `team_id='${String(teamId)}' AND`;
const where = await (() => {
if ('id' in props && props.id) return `${teamIdWhere} id=${props.id}`;
if ('datasetIds' in props && props.datasetIds) {
const datasetIdWhere = `dataset_id IN (${props.datasetIds
.map((id) => `'${String(id)}'`)
.join(',')})`;
if ('collectionIds' in props && props.collectionIds) {
return `${teamIdWhere} ${datasetIdWhere} AND collection_id IN (${props.collectionIds
.map((id) => `'${String(id)}'`)
.join(',')})`;
}
return `${teamIdWhere} ${datasetIdWhere}`;
}
if ('idList' in props && Array.isArray(props.idList)) {
if (props.idList.length === 0) return;
return `${teamIdWhere} id IN (${props.idList.map((id) => String(id)).join(',')})`;
}
return Promise.reject('deleteDatasetData: no where');
})();
if (!where) return;
try {
await GsClient.delete(DatasetVectorTableName, {
where: [where]
});
} catch (error) {
if (retry <= 0) {
return Promise.reject(error);
}
await delay(500);
return this.delete({
...props,
retry: retry - 1
});
}
};
embRecall = async (props: EmbeddingRecallCtrlProps): Promise<EmbeddingRecallResponse> => {
const {
teamId,
datasetIds,
vector,
limit,
forbidCollectionIdList,
filterCollectionIdList,
retry = 2
} = props;
// Get forbid collection
const formatForbidCollectionIdList = (() => {
if (!filterCollectionIdList) return forbidCollectionIdList;
const list = forbidCollectionIdList
.map((id) => String(id))
.filter((id) => !filterCollectionIdList.includes(id));
return list;
})();
const forbidCollectionSql =
formatForbidCollectionIdList.length > 0
? `AND collection_id NOT IN (${formatForbidCollectionIdList.map((id) => `'${id}'`).join(',')})`
: '';
// Filter by collectionId
const formatFilterCollectionId = (() => {
if (!filterCollectionIdList) return;
return filterCollectionIdList
.map((id) => String(id))
.filter((id) => !forbidCollectionIdList.includes(id));
})();
const filterCollectionIdSql = formatFilterCollectionId
? `AND collection_id IN (${formatFilterCollectionId.map((id) => `'${id}'`).join(',')})`
: '';
// Empty data
if (formatFilterCollectionId && formatFilterCollectionId.length === 0) {
return { results: [] };
}
try {
const results: any = await GsClient.query(
`BEGIN;
SET ob_hnsw_ef_search = ${global.systemEnv?.hnswEfSearch || 100};
SELECT id, collection_id, inner_product(vector, [${vector}]) AS score
FROM ${DatasetVectorTableName}
WHERE team_id='${teamId}'
AND dataset_id IN (${datasetIds.map((id) => `'${String(id)}'`).join(',')})
${filterCollectionIdSql}
${forbidCollectionSql}
ORDER BY score desc APPROXIMATE LIMIT ${limit};
COMMIT;`
);
const rows = results?.[3]?.rows as GsSearchRawType[];
if (!Array.isArray(rows)) {
return {
results: []
};
}
return {
results: rows.map((item) => ({
id: String(item.id),
collectionId: item.collection_id,
score: item.score * -1
}))
};
} catch (error) {
if (retry <= 0) {
return Promise.reject(error);
}
return this.embRecall({
...props,
retry: retry - 1
});
}
};
getVectorDataByTime = async (start: Date, end: Date) => {
const { rows } = await GsClient.query<{
id: string;
team_id: string;
dataset_id: string;
}>(`SELECT id, team_id, dataset_id
FROM ${DatasetVectorTableName}
WHERE createtime BETWEEN '${dayjs(start).format('YYYY-MM-DD HH:mm:ss')}' AND '${dayjs(
end
).format('YYYY-MM-DD HH:mm:ss')}';
`);
return rows.map((item) => ({
id: String(item.id),
teamId: item.team_id,
datasetId: item.dataset_id
}));
};
getVectorCountByTeamId = async (teamId: string) => {
const total = await GsClient.count(DatasetVectorTableName, {
where: [['team_id', String(teamId)]]
});
return total;
};
getVectorCountByDatasetId = async (teamId: string, datasetId: string) => {
const total = await GsClient.count(DatasetVectorTableName, {
where: [['team_id', String(teamId)], 'and', ['dataset_id', String(datasetId)]]
});
return total;
};
getVectorCountByCollectionId = async (
teamId: string,
datasetId: string,
collectionId: string
) => {
const total = await GsClient.count(DatasetVectorTableName, {
where: [
['team_id', String(teamId)],
'and',
['dataset_id', String(datasetId)],
'and',
['collection_id', String(collectionId)]
]
});
return total;
};
}

View File

@@ -6,6 +6,7 @@ declare global {
var pgClient: Pool | null;
var obClient: MysqlPool | null;
var milvusClient: MilvusClient | null;
var gsClient: Pool | null;
}
export type EmbeddingRecallItemType = {

View File

@@ -29,6 +29,8 @@ MONGODB_LOG_URI=mongodb://username:password@0.0.0.0:27017/fastgpt?authSource=adm
PG_URL=postgresql://username:password@host:port/postgres
# OceanBase 向量库连接参数
OCEANBASE_URL=
# openGauss 向量库连接参数
OPENGAUSS_URL=
# milvus 向量库连接参数
MILVUS_ADDRESS=
MILVUS_TOKEN=