diff --git a/document/content/docs/upgrading/4-12/4122.mdx b/document/content/docs/upgrading/4-12/4122.mdx index b4ed6d6c0..a83ebdb67 100644 --- a/document/content/docs/upgrading/4-12/4122.mdx +++ b/document/content/docs/upgrading/4-12/4122.mdx @@ -31,6 +31,7 @@ description: 'FastGPT V4.12.2 更新说明' 7. 无法完全关闭系统套餐,会存在空对象默认值,导致鉴权异常。 8. 工作流,添加团队应用,搜索无效。 9. 应用版本,ref 字段错误,导致无法正常使用。 +10. Oceanbase 批量插入时,未正确返回插入的 id。 ## 🔨 工具更新 diff --git a/document/data/doc-last-modified.json b/document/data/doc-last-modified.json index f8c6fe58f..c2c4e7c52 100644 --- a/document/data/doc-last-modified.json +++ b/document/data/doc-last-modified.json @@ -104,7 +104,7 @@ "document/content/docs/upgrading/4-11/4111.mdx": "2025-08-07T22:49:09+08:00", "document/content/docs/upgrading/4-12/4120.mdx": "2025-08-12T22:45:19+08:00", "document/content/docs/upgrading/4-12/4121.mdx": "2025-08-15T22:53:06+08:00", - "document/content/docs/upgrading/4-12/4122.mdx": "2025-08-26T14:35:39+08:00", + "document/content/docs/upgrading/4-12/4122.mdx": "2025-08-26T15:13:44+08:00", "document/content/docs/upgrading/4-8/40.mdx": "2025-08-02T19:38:37+08:00", "document/content/docs/upgrading/4-8/41.mdx": "2025-08-02T19:38:37+08:00", "document/content/docs/upgrading/4-8/42.mdx": "2025-08-02T19:38:37+08:00", diff --git a/packages/service/common/vectorDB/oceanbase/controller.ts b/packages/service/common/vectorDB/oceanbase/controller.ts index 322bbb9ee..20dd65b8e 100644 --- a/packages/service/common/vectorDB/oceanbase/controller.ts +++ b/packages/service/common/vectorDB/oceanbase/controller.ts @@ -6,6 +6,7 @@ import mysql, { } from 'mysql2/promise'; import { addLog } from '../../system/log'; import { OCEANBASE_ADDRESS } from '../constants'; +import { delay } from '@fastgpt/global/common/system/utils'; export const getClient = async (): Promise => { if (!OCEANBASE_ADDRESS) { @@ -27,9 +28,22 @@ export const getClient = async (): Promise => { keepAliveInitialDelay: 0 }); - addLog.info(`oceanbase connected`); + try { + // Test the connection with a simple query instead of calling connect() + await global.obClient.query('SELECT 1'); + addLog.info(`oceanbase connected`); + return global.obClient; + } catch (error) { + addLog.error(`oceanbase connect error`, error); - return global.obClient; + global.obClient?.end(); + global.obClient = null; + + await delay(1000); + addLog.info(`Retry connect oceanbase`); + + return getClient(); + } }; type WhereProps = (string | [string, string | number])[]; @@ -118,9 +132,9 @@ class ObClass { `; const client = await getClient(); - return client - .query<({ count: number } & RowDataPacket)[]>(sql) - .then(([rows]) => Number(rows[0]?.count || 0)); + return client.query<({ count: number } & RowDataPacket)[]>(sql).then(([res]) => { + return res[0]?.['COUNT(*)'] || 0; + }); } async delete(table: string, props: DeleteProps) { const sql = `DELETE FROM ${table} ${this.getWhereStr(props.where)}`; @@ -140,24 +154,73 @@ class ObClass { const client = await getClient(); return client.query(sql); } + /** + * 批量插入数据并获取自增 ID + * 在 OceanBase 多副本环境下使用 LAST_INSERT_ID() 获取准确的自增 ID + * + * 原理说明: + * 1. OceanBase 的 LAST_INSERT_ID() 返回当前会话最后一次插入操作的第一个自增 ID + * 2. 批量插入时,ID 是连续的:first_id, first_id+1, first_id+2, ... + * 3. 这种方法在多副本环境下是可靠的,因为每个连接会话是独立的 + */ async insert(table: string, props: InsertProps) { if (props.values.length === 0) { return { rowCount: 0, - rows: [] + insertIds: [] }; } const fields = props.values[0].map((item) => item.key).join(','); const sql = `INSERT INTO ${table} (${fields}) VALUES ${this.getInsertValStr(props.values)}`; - const client = await getClient(); - return client.query(sql).then(([result]) => { + // 获取专用连接而不是从连接池获取 + const connection = await (await getClient()).getConnection(); + + try { + const result = await connection.query(sql); + + if (result[0].affectedRows > 0) { + // 在同一个连接上获取LAST_INSERT_ID,确保会话一致性 + const [lastIdResult] = await connection.query( + 'SELECT LAST_INSERT_ID() as firstId' + ); + const firstId = lastIdResult[0]?.firstId; + + if (firstId && typeof firstId === 'number') { + const count = result[0].affectedRows; + // Generate consecutive IDs: firstId, firstId+1, firstId+2, ... + const ids = Array.from({ length: count }, (_, i) => String(firstId + i)); + + return { + rowCount: result[0].affectedRows, + insertIds: ids + }; + } + + // Fallback: try to use insertId from ResultSetHeader if LAST_INSERT_ID() fails + if (result[0].insertId) { + const startId = result[0].insertId; + const count = result[0].affectedRows; + const ids = Array.from({ length: count }, (_, i) => String(startId + i)); + + return { + rowCount: result[0].affectedRows, + insertIds: ids + }; + } + } + return { - rowCount: result.affectedRows, - rows: [{ id: String(result.insertId) }] + rowCount: result[0].affectedRows || 0, + insertIds: [] }; - }); + } catch (error) { + addLog.error(`OceanBase batch insert error: ${error}`); + throw error; + } finally { + connection.release(); // 释放连接回连接池 + } } async query(sql: string) { const client = await getClient(); diff --git a/packages/service/common/vectorDB/oceanbase/index.ts b/packages/service/common/vectorDB/oceanbase/index.ts index 79592a413..bd55a2f2c 100644 --- a/packages/service/common/vectorDB/oceanbase/index.ts +++ b/packages/service/common/vectorDB/oceanbase/index.ts @@ -1,6 +1,5 @@ /* oceanbase vector crud */ import { DatasetVectorTableName } from '../constants'; -import { delay, retryFn } from '@fastgpt/global/common/system/utils'; import { ObClient } from './controller'; import { type RowDataPacket } from 'mysql2/promise'; import { @@ -26,7 +25,6 @@ export class ObVectorCtrl { createtime TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); `); - await ObClient.query( `CREATE VECTOR INDEX IF NOT EXISTS vector_index ON ${DatasetVectorTableName}(vector) WITH (distance=inner_product, type=hnsw, m=32, ef_construction=128);` ); @@ -42,6 +40,7 @@ export class ObVectorCtrl { addLog.error('init oceanbase error', error); } }; + insert = async (props: InsertVectorControllerProps): Promise<{ insertIds: string[] }> => { const { teamId, datasetId, collectionId, vectors } = props; @@ -52,7 +51,7 @@ export class ObVectorCtrl { { key: 'collection_id', value: String(collectionId) } ]); - const { rowCount, rows } = await ObClient.insert(DatasetVectorTableName, { + const { rowCount, insertIds } = await ObClient.insert(DatasetVectorTableName, { values }); @@ -61,7 +60,7 @@ export class ObVectorCtrl { } return { - insertIds: rows.map((row) => row.id) + insertIds }; }; delete = async (props: DelDatasetVectorCtrlProps): Promise => {