fix: oceanbase insert (#5539)

This commit is contained in:
Archer
2025-08-26 17:29:42 +08:00
committed by GitHub
parent 4939271abb
commit 93e9cb675d
4 changed files with 79 additions and 16 deletions

View File

@@ -31,6 +31,7 @@ description: 'FastGPT V4.12.2 更新说明'
7. 无法完全关闭系统套餐,会存在空对象默认值,导致鉴权异常。
8. 工作流,添加团队应用,搜索无效。
9. 应用版本ref 字段错误,导致无法正常使用。
10. Oceanbase 批量插入时,未正确返回插入的 id。
## 🔨 工具更新

View File

@@ -104,7 +104,7 @@
"document/content/docs/upgrading/4-11/4111.mdx": "2025-08-07T22:49:09+08:00",
"document/content/docs/upgrading/4-12/4120.mdx": "2025-08-12T22:45:19+08:00",
"document/content/docs/upgrading/4-12/4121.mdx": "2025-08-15T22:53:06+08:00",
"document/content/docs/upgrading/4-12/4122.mdx": "2025-08-26T14:35:39+08:00",
"document/content/docs/upgrading/4-12/4122.mdx": "2025-08-26T15:13:44+08:00",
"document/content/docs/upgrading/4-8/40.mdx": "2025-08-02T19:38:37+08:00",
"document/content/docs/upgrading/4-8/41.mdx": "2025-08-02T19:38:37+08:00",
"document/content/docs/upgrading/4-8/42.mdx": "2025-08-02T19:38:37+08:00",

View File

@@ -6,6 +6,7 @@ import mysql, {
} from 'mysql2/promise';
import { addLog } from '../../system/log';
import { OCEANBASE_ADDRESS } from '../constants';
import { delay } from '@fastgpt/global/common/system/utils';
export const getClient = async (): Promise<Pool> => {
if (!OCEANBASE_ADDRESS) {
@@ -27,9 +28,22 @@ export const getClient = async (): Promise<Pool> => {
keepAliveInitialDelay: 0
});
addLog.info(`oceanbase connected`);
try {
// Test the connection with a simple query instead of calling connect()
await global.obClient.query('SELECT 1');
addLog.info(`oceanbase connected`);
return global.obClient;
} catch (error) {
addLog.error(`oceanbase connect error`, error);
return global.obClient;
global.obClient?.end();
global.obClient = null;
await delay(1000);
addLog.info(`Retry connect oceanbase`);
return getClient();
}
};
type WhereProps = (string | [string, string | number])[];
@@ -118,9 +132,9 @@ class ObClass {
`;
const client = await getClient();
return client
.query<({ count: number } & RowDataPacket)[]>(sql)
.then(([rows]) => Number(rows[0]?.count || 0));
return client.query<({ count: number } & RowDataPacket)[]>(sql).then(([res]) => {
return res[0]?.['COUNT(*)'] || 0;
});
}
async delete(table: string, props: DeleteProps) {
const sql = `DELETE FROM ${table} ${this.getWhereStr(props.where)}`;
@@ -140,24 +154,73 @@ class ObClass {
const client = await getClient();
return client.query(sql);
}
/**
* 批量插入数据并获取自增 ID
* 在 OceanBase 多副本环境下使用 LAST_INSERT_ID() 获取准确的自增 ID
*
* 原理说明:
* 1. OceanBase 的 LAST_INSERT_ID() 返回当前会话最后一次插入操作的第一个自增 ID
* 2. 批量插入时ID 是连续的first_id, first_id+1, first_id+2, ...
* 3. 这种方法在多副本环境下是可靠的,因为每个连接会话是独立的
*/
async insert(table: string, props: InsertProps) {
if (props.values.length === 0) {
return {
rowCount: 0,
rows: []
insertIds: []
};
}
const fields = props.values[0].map((item) => item.key).join(',');
const sql = `INSERT INTO ${table} (${fields}) VALUES ${this.getInsertValStr(props.values)}`;
const client = await getClient();
return client.query<ResultSetHeader>(sql).then(([result]) => {
// 获取专用连接而不是从连接池获取
const connection = await (await getClient()).getConnection();
try {
const result = await connection.query<ResultSetHeader>(sql);
if (result[0].affectedRows > 0) {
// 在同一个连接上获取LAST_INSERT_ID确保会话一致性
const [lastIdResult] = await connection.query<RowDataPacket[]>(
'SELECT LAST_INSERT_ID() as firstId'
);
const firstId = lastIdResult[0]?.firstId;
if (firstId && typeof firstId === 'number') {
const count = result[0].affectedRows;
// Generate consecutive IDs: firstId, firstId+1, firstId+2, ...
const ids = Array.from({ length: count }, (_, i) => String(firstId + i));
return {
rowCount: result[0].affectedRows,
insertIds: ids
};
}
// Fallback: try to use insertId from ResultSetHeader if LAST_INSERT_ID() fails
if (result[0].insertId) {
const startId = result[0].insertId;
const count = result[0].affectedRows;
const ids = Array.from({ length: count }, (_, i) => String(startId + i));
return {
rowCount: result[0].affectedRows,
insertIds: ids
};
}
}
return {
rowCount: result.affectedRows,
rows: [{ id: String(result.insertId) }]
rowCount: result[0].affectedRows || 0,
insertIds: []
};
});
} catch (error) {
addLog.error(`OceanBase batch insert error: ${error}`);
throw error;
} finally {
connection.release(); // 释放连接回连接池
}
}
async query<T extends QueryResult = any>(sql: string) {
const client = await getClient();

View File

@@ -1,6 +1,5 @@
/* oceanbase vector crud */
import { DatasetVectorTableName } from '../constants';
import { delay, retryFn } from '@fastgpt/global/common/system/utils';
import { ObClient } from './controller';
import { type RowDataPacket } from 'mysql2/promise';
import {
@@ -26,7 +25,6 @@ export class ObVectorCtrl {
createtime TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
`);
await ObClient.query(
`CREATE VECTOR INDEX IF NOT EXISTS vector_index ON ${DatasetVectorTableName}(vector) WITH (distance=inner_product, type=hnsw, m=32, ef_construction=128);`
);
@@ -42,6 +40,7 @@ export class ObVectorCtrl {
addLog.error('init oceanbase error', error);
}
};
insert = async (props: InsertVectorControllerProps): Promise<{ insertIds: string[] }> => {
const { teamId, datasetId, collectionId, vectors } = props;
@@ -52,7 +51,7 @@ export class ObVectorCtrl {
{ key: 'collection_id', value: String(collectionId) }
]);
const { rowCount, rows } = await ObClient.insert(DatasetVectorTableName, {
const { rowCount, insertIds } = await ObClient.insert(DatasetVectorTableName, {
values
});
@@ -61,7 +60,7 @@ export class ObVectorCtrl {
}
return {
insertIds: rows.map((row) => row.id)
insertIds
};
};
delete = async (props: DelDatasetVectorCtrlProps): Promise<any> => {