Files
FastGPT/packages/service/common/vectorDB/oceanbase/controller.ts
T
Archer 64f70a41c1 feat: vector integrationTest;feat: ob quantization (#6366)
* feat(vectordb): add OceanBase HNSW quantization (HNSW_SQ/HNSW_BQ) (#6348)

Support OceanBase vector index quantization via VECTOR_VQ_LEVEL:
- 32 (default): hnsw + inner_product
- 8: hnsw_sq + inner_product (2-3x memory savings)
- 1: hnsw_bq + cosine (~15x memory savings)

HNSW_BQ requires cosine distance per OceanBase docs.
Tested on OceanBase 4.3.5.5 (BP5).

Closes #6202

* feat: add test inclusion for vectorDB tests in vitest configuration (#6358)

* feat: add test inclusion for vectorDB tests in vitest configuration

* refactor: update vectorDB README and setup for environment configuration

- Enhanced README to clarify the use of factory pattern for vectorDB integration tests.
- Updated instructions for setting up environment variables from a local file.
- Removed obsolete PG integration test file and adjusted test execution instructions.
- Improved structure explanation for shared test data and factory functions.

* perf: integrationTest

* feat: vector integration

---------

Co-authored-by: ZHANG Yixin <hi.yixinz@gmail.com>
Co-authored-by: Jingchao <alswlx@gmail.com>
2026-02-02 18:48:25 +08:00

243 lines
7.1 KiB
TypeScript

import mysql, {
type Pool,
type QueryResult,
type RowDataPacket,
type ResultSetHeader
} from 'mysql2/promise';
import { addLog } from '../../system/log';
import { OCEANBASE_ADDRESS, SEEKDB_ADDRESS } from '../constants';
import { delay } from '@fastgpt/global/common/system/utils';
type WhereProps = (string | [string, string | number])[];
type GetProps = {
fields?: string[];
where?: WhereProps;
order?: { field: string; mode: 'DESC' | 'ASC' | string }[];
limit?: number;
offset?: number;
};
type DeleteProps = {
where: WhereProps;
};
type ValuesProps = { key: string; value?: string | number }[];
type UpdateProps = {
values: ValuesProps;
where: WhereProps;
};
type InsertProps = {
values: ValuesProps[];
};
export class ObClass {
controllerType: 'oceanbase' | 'seekdb';
constructor({ type }: { type: 'oceanbase' | 'seekdb' }) {
this.controllerType = type;
}
private async getClient(): Promise<Pool> {
const address = this.controllerType === 'oceanbase' ? OCEANBASE_ADDRESS : SEEKDB_ADDRESS;
if (!address) {
return Promise.reject('OCEANBASE_ADDRESS || SEEKDB_ADDRESS is not set');
}
if (global.obClient) {
return global.obClient;
}
global.obClient = mysql.createPool({
uri: address,
waitForConnections: true,
connectionLimit: Number(process.env.DB_MAX_LINK || 20),
connectTimeout: 20000,
idleTimeout: 60000,
queueLimit: 0,
enableKeepAlive: true,
keepAliveInitialDelay: 0
});
try {
// Test the connection with a simple query instead of calling connect()
await global.obClient.query('SELECT 1');
addLog.info(`[${this.controllerType}] connect`);
return global.obClient;
} catch (error) {
addLog.error(`[${this.controllerType}] connect error`, error);
global.obClient?.end();
global.obClient = null;
await delay(1000);
addLog.info(`[${this.controllerType}] retry connect`);
return this.getClient();
}
}
private getWhereStr(where?: WhereProps) {
return where
? `WHERE ${where
.map((item) => {
if (typeof item === 'string') {
return item;
}
const val = typeof item[1] === 'number' ? item[1] : `'${String(item[1])}'`;
return `${item[0]}=${val}`;
})
.join(' ')}`
: '';
}
private getUpdateValStr(values: ValuesProps) {
return values
.map((item) => {
const val =
typeof item.value === 'number'
? item.value
: `'${String(item.value).replace(/\'/g, '"')}'`;
return `${item.key}=${val}`;
})
.join(',');
}
private getInsertValStr(values: ValuesProps[]) {
return values
.map(
(items) =>
`(${items
.map((item) =>
typeof item.value === 'number'
? item.value
: `'${String(item.value).replace(/\'/g, '"')}'`
)
.join(',')})`
)
.join(',');
}
async select<T extends QueryResult = any>(table: string, props: GetProps) {
const sql = `SELECT ${
!props.fields || props.fields?.length === 0 ? '*' : props.fields?.join(',')
}
FROM ${table}
${this.getWhereStr(props.where)}
${
props.order
? `ORDER BY ${props.order.map((item) => `${item.field} ${item.mode}`).join(',')}`
: ''
}
LIMIT ${props.limit || 10} OFFSET ${props.offset || 0}
`;
const client = await this.getClient();
return client.query<T>(sql);
}
async count(table: string, props: GetProps) {
const sql = `SELECT COUNT(${props?.fields?.[0] || '*'})
FROM ${table}
${this.getWhereStr(props.where)}
`;
const client = await this.getClient();
return client.query<({ count: number } & RowDataPacket)[]>(sql).then(([res]) => {
return res[0]?.['COUNT(*)'] || 0;
});
}
async delete(table: string, props: DeleteProps) {
const sql = `DELETE FROM ${table} ${this.getWhereStr(props.where)}`;
const client = await this.getClient();
return client.query(sql);
}
async update(table: string, props: UpdateProps) {
if (props.values.length === 0) {
return {
rowCount: 0
};
}
const sql = `UPDATE ${table} SET ${this.getUpdateValStr(props.values)} ${this.getWhereStr(
props.where
)}`;
const client = await this.getClient();
return client.query(sql);
}
/**
* 批量插入数据并获取自增 ID
* 在 OceanBase 多副本环境下使用 LAST_INSERT_ID() 获取准确的自增 ID
*
* 原理说明:
* 1. OceanBase 的 LAST_INSERT_ID() 返回当前会话最后一次插入操作的第一个自增 ID
* 2. 批量插入时,ID 是连续的:first_id, first_id+1, first_id+2, ...
* 3. 这种方法在多副本环境下是可靠的,因为每个连接会话是独立的
*/
async insert(table: string, props: InsertProps) {
if (props.values.length === 0) {
return {
rowCount: 0,
insertIds: []
};
}
const fields = props.values[0].map((item) => item.key).join(',');
const sql = `INSERT INTO ${table} (${fields}) VALUES ${this.getInsertValStr(props.values)}`;
// 获取专用连接而不是从连接池获取
const connection = await (await this.getClient()).getConnection();
try {
const result = await connection.query<ResultSetHeader>(sql);
if (result[0].affectedRows > 0) {
// 在同一个连接上获取LAST_INSERT_ID,确保会话一致性
const [lastIdResult] = await connection.query<RowDataPacket[]>(
'SELECT LAST_INSERT_ID() as firstId'
);
const firstId = lastIdResult[0]?.firstId;
if (firstId && typeof firstId === 'number') {
const count = result[0].affectedRows;
// Generate consecutive IDs: firstId, firstId+1, firstId+2, ...
const ids = Array.from({ length: count }, (_, i) => String(firstId + i));
return {
rowCount: result[0].affectedRows,
insertIds: ids
};
}
// Fallback: try to use insertId from ResultSetHeader if LAST_INSERT_ID() fails
if (result[0].insertId) {
const startId = result[0].insertId;
const count = result[0].affectedRows;
const ids = Array.from({ length: count }, (_, i) => String(startId + i));
return {
rowCount: result[0].affectedRows,
insertIds: ids
};
}
}
return {
rowCount: result[0].affectedRows || 0,
insertIds: []
};
} catch (error) {
addLog.error('[${this.controllerType}] batch insert error', error);
throw error;
} finally {
connection.release(); // 释放连接回连接池
}
}
async query<T extends QueryResult = any>(sql: string) {
const client = await this.getClient();
const start = Date.now();
return client.query<T>(sql).then((res) => {
const time = Date.now() - start;
if (time > 300) {
addLog.warn(`[${this.controllerType}] query time: ${time}ms, sql: ${sql}`);
}
return res;
});
}
}