perf: load plugin groups code;perf: system plugin schema;fix: special variables replace;perf: retry cron app job (#3347)

* perf: load plugin groups code

* perf: system plugin schema

* feat: retry cron app job

* fix: special variables replace
This commit is contained in:
Archer
2024-12-09 17:18:07 +08:00
committed by GitHub
parent a4f6128a89
commit 1a294c1fd3
16 changed files with 106 additions and 92 deletions

View File

@@ -12,12 +12,14 @@ import { TimerIdEnum } from '@fastgpt/service/common/system/timerLock/constants'
import { addHours } from 'date-fns';
import { getScheduleTriggerApp } from '@/service/core/app/utils';
// Try to run train every minute
const setTrainingQueueCron = () => {
setCron('*/1 * * * *', () => {
startTrainingQueue();
});
};
// Clear tmp upload files every ten minutes
const setClearTmpUploadFilesCron = () => {
// Clear tmp upload files every ten minutes
setCron('*/10 * * * *', () => {
@@ -61,6 +63,7 @@ const clearInvalidDataCron = () => {
});
};
// Run app timer trigger every hour
const scheduleTriggerAppCron = () => {
setCron('0 */1 * * *', async () => {
if (

View File

@@ -3,7 +3,7 @@ import { pushChatUsage } from '@/service/support/wallet/usage/push';
import { defaultApp } from '@/web/core/app/constants';
import { getNextTimeByCronStringAndTimezone } from '@fastgpt/global/common/string/time';
import { getNanoid } from '@fastgpt/global/common/string/tools';
import { delay } from '@fastgpt/global/common/system/utils';
import { delay, retryFn } from '@fastgpt/global/common/system/utils';
import { ChatItemValueTypeEnum } from '@fastgpt/global/core/chat/constants';
import {
getWorkflowEntryNodeIds,
@@ -18,63 +18,72 @@ import { dispatchWorkFlow } from '@fastgpt/service/core/workflow/dispatch';
export const getScheduleTriggerApp = async () => {
// 1. Find all the app
const apps = await MongoApp.find({
scheduledTriggerConfig: { $ne: null },
scheduledTriggerNextTime: { $lte: new Date() }
const apps = await retryFn(() => {
return MongoApp.find({
scheduledTriggerConfig: { $ne: null },
scheduledTriggerNextTime: { $lte: new Date() }
});
});
// 2. Run apps
await Promise.allSettled(
apps.map(async (app) => {
if (!app.scheduledTriggerConfig) return;
// random delay 0 ~ 60s
await delay(Math.floor(Math.random() * 60 * 1000));
const { user } = await getUserChatInfoAndAuthTeamPoints(app.tmbId);
try {
const { flowUsages } = await dispatchWorkFlow({
chatId: getNanoid(),
user,
mode: 'chat',
runningAppInfo: {
id: String(app._id),
teamId: String(app.teamId),
tmbId: String(app.tmbId)
},
uid: String(app.tmbId),
runtimeNodes: storeNodes2RuntimeNodes(app.modules, getWorkflowEntryNodeIds(app.modules)),
runtimeEdges: initWorkflowEdgeStatus(app.edges),
variables: {},
query: [
{
type: ChatItemValueTypeEnum.text,
text: {
content: app.scheduledTriggerConfig?.defaultPrompt
if (!app.scheduledTriggerConfig) return;
// random delay 0 ~ 60s
await delay(Math.floor(Math.random() * 60 * 1000));
const { user } = await getUserChatInfoAndAuthTeamPoints(app.tmbId);
await retryFn(async () => {
if (!app.scheduledTriggerConfig) return;
const { flowUsages } = await dispatchWorkFlow({
chatId: getNanoid(),
user,
mode: 'chat',
runningAppInfo: {
id: String(app._id),
teamId: String(app.teamId),
tmbId: String(app.tmbId)
},
uid: String(app.tmbId),
runtimeNodes: storeNodes2RuntimeNodes(
app.modules,
getWorkflowEntryNodeIds(app.modules)
),
runtimeEdges: initWorkflowEdgeStatus(app.edges),
variables: {},
query: [
{
type: ChatItemValueTypeEnum.text,
text: {
content: app.scheduledTriggerConfig?.defaultPrompt
}
}
}
],
chatConfig: defaultApp.chatConfig,
histories: [],
stream: false,
maxRunTimes: WORKFLOW_MAX_RUN_TIMES
});
pushChatUsage({
appName: app.name,
appId: app._id,
teamId: String(app.teamId),
tmbId: String(app.tmbId),
source: UsageSourceEnum.cronJob,
flowUsages
],
chatConfig: defaultApp.chatConfig,
histories: [],
stream: false,
maxRunTimes: WORKFLOW_MAX_RUN_TIMES
});
pushChatUsage({
appName: app.name,
appId: app._id,
teamId: String(app.teamId),
tmbId: String(app.tmbId),
source: UsageSourceEnum.cronJob,
flowUsages
});
});
// update next time
app.scheduledTriggerNextTime = getNextTimeByCronStringAndTimezone(
app.scheduledTriggerConfig
);
await app.save();
} catch (error) {
addLog.error('Schedule trigger error', error);
addLog.warn('Schedule trigger error', { error });
}
// update next time
app.scheduledTriggerNextTime = getNextTimeByCronStringAndTimezone(app.scheduledTriggerConfig);
await app.save();
return;
})
);
};