This commit is contained in:
Archer
2023-11-09 09:46:57 +08:00
committed by GitHub
parent 661ee79943
commit 8bb5588305
402 changed files with 9899 additions and 5967 deletions

View File

@@ -1,17 +1,16 @@
import { MongoDatasetTraining } from '@fastgpt/service/core/dataset/training/schema';
import { pushQABill } from '@/service/common/bill/push';
import { pushQABill } from '@/service/support/wallet/bill/push';
import { TrainingModeEnum } from '@fastgpt/global/core/dataset/constant';
import { ERROR_ENUM } from '@fastgpt/global/common/error/errorCode';
import { sendInform } from '@/pages/api/user/inform/send';
import { authBalanceByUid } from '@fastgpt/service/support/user/auth';
import { sendOneInform } from '../support/user/inform/api';
import { getAIApi } from '@fastgpt/service/core/ai/config';
import type { ChatCompletionRequestMessage } from '@fastgpt/global/core/ai/type.d';
import { addLog } from '../utils/tools';
import type { ChatMessageItemType } from '@fastgpt/global/core/ai/type.d';
import { addLog } from '@fastgpt/service/common/mongo/controller';
import { splitText2Chunks } from '@/global/common/string/tools';
import { replaceVariable } from '@/global/common/string/tools';
import { Prompt_AgentQA } from '@/global/core/prompt/agent';
import { pushDataToDatasetCollection } from '@/pages/api/core/dataset/data/pushData';
import { getErrText } from '@fastgpt/global/common/error/utils';
import { authTeamBalance } from '../support/permission/auth/bill';
const reduceQueue = () => {
global.qaQueueLen = global.qaQueueLen > 0 ? global.qaQueueLen - 1 : 0;
@@ -21,45 +20,92 @@ export async function generateQA(): Promise<any> {
if (global.qaQueueLen >= global.systemEnv.qaMaxProcess) return;
global.qaQueueLen++;
let trainingId = '';
let userId = '';
// get training data
const {
data,
text,
done = false,
error = false
} = await (async () => {
try {
const data = (
await MongoDatasetTraining.findOneAndUpdate(
{
mode: TrainingModeEnum.qa,
lockTime: { $lte: new Date(Date.now() - 10 * 60 * 1000) }
},
{
lockTime: new Date()
}
).select({
_id: 1,
userId: 1,
teamId: 1,
tmbId: 1,
datasetId: 1,
datasetCollectionId: 1,
q: 1,
model: 1,
billId: 1,
prompt: 1
})
)?.toJSON();
// task preemption
if (!data) {
return {
done: true
};
}
return {
data,
text: data.q
};
} catch (error) {
console.log(`Get Training Data error`, error);
return {
error: true
};
}
})();
if (done) {
reduceQueue();
global.vectorQueueLen <= 0 && console.log(`【索引】任务完成`);
return;
}
if (error || !data) {
reduceQueue();
return generateQA();
}
// auth balance
try {
await authTeamBalance(data.teamId);
} catch (error) {
// send inform and lock data
try {
sendOneInform({
type: 'system',
title: '索引生成任务中止',
content:
'由于账号余额不足,索引生成任务中止,重新充值后将会继续。暂停的任务将在 7 天后被删除。',
tmbId: data.tmbId
});
console.log('余额不足,暂停向量生成任务');
await MongoDatasetTraining.findById(data._id, {
lockTime: new Date('2999/5/5')
});
} catch (error) {}
reduceQueue();
return generateQA();
}
try {
const data = await MongoDatasetTraining.findOneAndUpdate(
{
mode: TrainingModeEnum.qa,
lockTime: { $lte: new Date(Date.now() - 10 * 60 * 1000) }
},
{
lockTime: new Date()
}
).select({
_id: 1,
userId: 1,
datasetCollectionId: 1,
q: 1,
model: 1,
prompt: 1,
billId: 1
});
// task preemption
if (!data) {
reduceQueue();
global.qaQueueLen <= 0 && console.log(`【QA】任务完成`);
return;
}
trainingId = data._id;
userId = String(data.userId);
await authBalanceByUid(userId);
const startTime = Date.now();
// request LLM to get QA
const text = data.q;
const messages: ChatCompletionRequestMessage[] = [
const messages: ChatMessageItemType[] = [
{
role: 'user',
content: data.prompt
@@ -84,7 +130,8 @@ export async function generateQA(): Promise<any> {
// get vector and insert
await pushDataToDatasetCollection({
userId,
teamId: data.teamId,
tmbId: data.tmbId,
collectionId: data.datasetCollectionId,
data: qaArr,
mode: TrainingModeEnum.index,
@@ -97,10 +144,11 @@ export async function generateQA(): Promise<any> {
console.log(`split result length: `, qaArr.length);
console.log('生成QA成功time:', `${(Date.now() - startTime) / 1000}s`);
// 计费
// add bill
if (qaArr.length > 0) {
pushQABill({
userId: data.userId,
teamId: data.teamId,
tmbId: data.tmbId,
totalTokens,
billId: data.billId
});
@@ -114,36 +162,30 @@ export async function generateQA(): Promise<any> {
reduceQueue();
// log
if (err?.response) {
console.log('openai error: 生成QA错误');
console.log(err.response?.status, err.response?.statusText, err.response?.data);
addLog.info('openai error: 生成QA错误', {
status: err.response?.status,
stateusText: err.response?.statusText,
data: err.response?.data
});
} else {
console.log(err);
addLog.error(getErrText(err, '生成 QA 错误'));
}
// message error or openai account error
if (err?.message === 'invalid message format') {
await MongoDatasetTraining.findByIdAndRemove(trainingId);
}
// 账号余额不足,删除任务
if (userId && err === ERROR_ENUM.insufficientQuota) {
sendInform({
type: 'system',
title: 'QA 任务中止',
content:
'由于账号余额不足,索引生成任务中止,重新充值后将会继续。暂停的任务将在 7 天后被删除。',
userId
if (
err?.message === 'invalid message format' ||
err.response?.data?.error?.type === 'invalid_request_error' ||
err?.code === 500
) {
addLog.info('invalid message format', {
text
});
console.log('余额不足,暂停向量生成任务');
await MongoDatasetTraining.updateMany(
{
userId
},
{
lockTime: new Date('2999/5/5')
}
);
try {
await MongoDatasetTraining.findByIdAndUpdate(data._id, {
lockTime: new Date('2998/5/5')
});
} catch (error) {}
return generateQA();
}

View File

@@ -1,11 +1,11 @@
import { insertData2Dataset } from '../core/dataset/data/utils';
import { getVector } from '@/pages/api/openapi/plugin/vector';
import { insertData2Dataset } from '@/service/core/dataset/data/controller';
import { MongoDatasetTraining } from '@fastgpt/service/core/dataset/training/schema';
import { ERROR_ENUM } from '@fastgpt/global/common/error/errorCode';
import { TrainingModeEnum } from '@fastgpt/global/core/dataset/constant';
import { sendInform } from '@/pages/api/user/inform/send';
import { addLog } from '../utils/tools';
import { sendOneInform } from '../support/user/inform/api';
import { addLog } from '@fastgpt/service/common/mongo/controller';
import { getErrText } from '@fastgpt/global/common/error/utils';
import { authTeamBalance } from '@/service/support/permission/auth/bill';
import { pushGenerateVectorBill } from '@/service/support/wallet/bill/push';
const reduceQueue = () => {
global.vectorQueueLen = global.vectorQueueLen > 0 ? global.vectorQueueLen - 1 : 0;
@@ -16,68 +16,114 @@ export async function generateVector(): Promise<any> {
if (global.vectorQueueLen >= global.systemEnv.vectorMaxProcess) return;
global.vectorQueueLen++;
let trainingId = '';
let userId = '';
let dataItems: {
q: string;
a: string;
} = {
q: '',
a: ''
};
// get training data
const {
data,
dataItem,
done = false,
error = false
} = await (async () => {
try {
const data = (
await MongoDatasetTraining.findOneAndUpdate(
{
mode: TrainingModeEnum.index,
lockTime: { $lte: new Date(Date.now() - 1 * 60 * 1000) }
},
{
lockTime: new Date()
}
).select({
_id: 1,
userId: 1,
teamId: 1,
tmbId: 1,
datasetId: 1,
datasetCollectionId: 1,
q: 1,
a: 1,
model: 1,
billId: 1
})
)?.toJSON();
// task preemption
if (!data) {
return {
done: true
};
}
return {
data,
dataItem: {
q: data.q.replace(/[\x00-\x08]/g, ' '),
a: data.a?.replace(/[\x00-\x08]/g, ' ') || ''
}
};
} catch (error) {
console.log(`Get Training Data error`, error);
return {
error: true
};
}
})();
if (done) {
reduceQueue();
global.vectorQueueLen <= 0 && console.log(`【索引】任务完成`);
return;
}
if (error || !data) {
reduceQueue();
return generateVector();
}
// auth balance
try {
await authTeamBalance(data.teamId);
} catch (error) {
// send inform and lock data
try {
sendOneInform({
type: 'system',
title: '索引生成任务中止',
content:
'由于账号余额不足,索引生成任务中止,重新充值后将会继续。暂停的任务将在 7 天后被删除。',
tmbId: data.tmbId
});
console.log('余额不足,暂停向量生成任务');
await MongoDatasetTraining.findById(data._id, {
lockTime: new Date('2999/5/5')
});
} catch (error) {}
reduceQueue();
return generateVector();
}
// create vector and insert
try {
const data = await MongoDatasetTraining.findOneAndUpdate(
{
mode: TrainingModeEnum.index,
lockTime: { $lte: new Date(Date.now() - 1 * 60 * 1000) }
},
{
lockTime: new Date()
}
)
.select({
_id: 1,
userId: 1,
datasetId: 1,
datasetCollectionId: 1,
q: 1,
a: 1,
model: 1,
billId: 1
})
.lean();
// task preemption
if (!data) {
reduceQueue();
global.vectorQueueLen <= 0 && console.log(`【索引】任务完成`);
return;
}
trainingId = data._id;
userId = String(data.userId);
dataItems = {
q: data.q.replace(/[\x00-\x08]/g, ' '),
a: data.a?.replace(/[\x00-\x08]/g, ' ') || ''
};
// insert data 2 pg
await insertData2Dataset({
userId,
// insert data to pg
const { tokenLen } = await insertData2Dataset({
teamId: data.teamId,
tmbId: data.teamId,
datasetId: data.datasetId,
collectionId: data.datasetCollectionId,
q: dataItems.q,
a: dataItems.a,
q: dataItem.q,
a: dataItem.a,
model: data.model
});
// push bill
pushGenerateVectorBill({
teamId: data.teamId,
tmbId: data.teamId,
tokenLen: tokenLen,
model: data.model,
billId: data.billId
});
// delete data from training
await MongoDatasetTraining.findByIdAndDelete(data._id);
// console.log(`生成向量成功: ${data._id}`);
reduceQueue();
generateVector();
} catch (err: any) {
@@ -97,48 +143,20 @@ export async function generateVector(): Promise<any> {
// message error or openai account error
if (
err?.message === 'invalid message format' ||
err.response?.data?.error?.type === 'invalid_request_error'
err.response?.data?.error?.type === 'invalid_request_error' ||
err?.code === 500
) {
addLog.info('invalid message format', {
dataItems
dataItem
});
try {
await MongoDatasetTraining.findByIdAndUpdate(trainingId, {
await MongoDatasetTraining.findByIdAndUpdate(data._id, {
lockTime: new Date('2998/5/5')
});
} catch (error) {}
return generateVector();
}
// err vector data
if (err?.code === 500) {
await MongoDatasetTraining.findByIdAndDelete(trainingId);
return generateVector();
}
// 账号余额不足,暂停任务
if (userId && err === ERROR_ENUM.insufficientQuota) {
try {
sendInform({
type: 'system',
title: '索引生成任务中止',
content:
'由于账号余额不足,索引生成任务中止,重新充值后将会继续。暂停的任务将在 7 天后被删除。',
userId
});
console.log('余额不足,暂停向量生成任务');
await MongoDatasetTraining.updateMany(
{
userId
},
{
lockTime: new Date('2999/5/5')
}
);
} catch (error) {}
return generateVector();
}
setTimeout(() => {
generateVector();
}, 1000);

View File

@@ -1,16 +0,0 @@
export const startSendInform = async () => {
if (global.sendInformQueue.length === 0 || global.sendInformQueueLen > 0) return;
global.sendInformQueueLen++;
try {
const fn = global.sendInformQueue[global.sendInformQueue.length - 1];
await fn();
global.sendInformQueue.pop();
global.sendInformQueueLen--;
startSendInform();
} catch (error) {
global.sendInformQueueLen--;
startSendInform();
}
};