perf: app cron job (#3360)

This commit is contained in:
Archer
2024-12-10 14:05:34 +08:00
committed by GitHub
parent c98224dda3
commit b327e487a5
9 changed files with 141 additions and 45 deletions

View File

@@ -0,0 +1,27 @@
import type { NextApiRequest, NextApiResponse } from 'next';
import { authCert } from '@fastgpt/service/support/permission/auth/common';
import { NextAPI } from '@/service/middleware/entry';
import { MongoApp } from '@fastgpt/service/core/app/schema';
/* 初始化发布的版本 */
async function handler(req: NextApiRequest, res: NextApiResponse) {
await authCert({ req, authRoot: true });
// scheduledTriggerConfig为 null 的,都转成 unExist
return MongoApp.updateMany(
{
$or: [
{ scheduledTriggerConfig: { $eq: null } },
{ 'scheduledTriggerConfig.cronString': { $eq: '' } }
]
},
{
$unset: {
scheduledTriggerConfig: '',
scheduledTriggerNextTime: ''
}
}
);
}
export default NextAPI(handler);

View File

@@ -10,6 +10,7 @@ import { PostPublishAppProps } from '@/global/core/app/api';
import { WritePermissionVal } from '@fastgpt/global/support/permission/constant';
import { ApiRequestProps } from '@fastgpt/service/type/next';
import { AppTypeEnum } from '@fastgpt/global/core/app/constants';
import { getScheduleTriggerApp } from '@/service/core/app/utils';
async function handler(
req: ApiRequestProps<PostPublishAppProps>,
@@ -52,12 +53,17 @@ async function handler(
updateTime: new Date(),
version: 'v2',
// 只有发布才会更新定时器
...(isPublish && {
scheduledTriggerConfig: chatConfig?.scheduledTriggerConfig,
scheduledTriggerNextTime: chatConfig?.scheduledTriggerConfig?.cronString
? getNextTimeByCronStringAndTimezone(chatConfig.scheduledTriggerConfig)
: null
}),
...(isPublish &&
(chatConfig?.scheduledTriggerConfig?.cronString
? {
$set: {
scheduledTriggerConfig: chatConfig.scheduledTriggerConfig,
scheduledTriggerNextTime: getNextTimeByCronStringAndTimezone(
chatConfig.scheduledTriggerConfig
)
}
}
: { $unset: { scheduledTriggerConfig: '', scheduledTriggerNextTime: '' } })),
'pluginData.nodeVersion': _id
},
{
@@ -66,6 +72,8 @@ async function handler(
);
});
await getScheduleTriggerApp();
return {};
}

View File

@@ -1,10 +1,13 @@
import { getUserChatInfoAndAuthTeamPoints } from '@/service/support/permission/auth/team';
import { pushChatUsage } from '@/service/support/wallet/usage/push';
import { defaultApp } from '@/web/core/app/constants';
import { getNextTimeByCronStringAndTimezone } from '@fastgpt/global/common/string/time';
import { getNanoid } from '@fastgpt/global/common/string/tools';
import { delay, retryFn } from '@fastgpt/global/common/system/utils';
import { ChatItemValueTypeEnum } from '@fastgpt/global/core/chat/constants';
import {
ChatItemValueTypeEnum,
ChatRoleEnum,
ChatSourceEnum
} from '@fastgpt/global/core/chat/constants';
import {
getWorkflowEntryNodeIds,
initWorkflowEdgeStatus,
@@ -15,12 +18,16 @@ import { addLog } from '@fastgpt/service/common/system/log';
import { MongoApp } from '@fastgpt/service/core/app/schema';
import { WORKFLOW_MAX_RUN_TIMES } from '@fastgpt/service/core/workflow/constants';
import { dispatchWorkFlow } from '@fastgpt/service/core/workflow/dispatch';
import { DispatchNodeResponseKeyEnum } from '@fastgpt/global/core/workflow/runtime/constants';
import { UserChatItemValueItemType } from '@fastgpt/global/core/chat/type';
import { saveChat } from '@fastgpt/service/core/chat/saveChat';
import { getAppLatestVersion } from '@fastgpt/service/core/app/version/controller';
export const getScheduleTriggerApp = async () => {
// 1. Find all the app
const apps = await retryFn(() => {
return MongoApp.find({
scheduledTriggerConfig: { $ne: null },
scheduledTriggerConfig: { $exists: true },
scheduledTriggerNextTime: { $lte: new Date() }
});
});
@@ -34,11 +41,22 @@ export const getScheduleTriggerApp = async () => {
await delay(Math.floor(Math.random() * 60 * 1000));
const { user } = await getUserChatInfoAndAuthTeamPoints(app.tmbId);
await retryFn(async () => {
if (!app.scheduledTriggerConfig) return;
// Get app latest version
const { nodes, edges, chatConfig } = await getAppLatestVersion(app._id, app);
const { flowUsages } = await dispatchWorkFlow({
chatId: getNanoid(),
const chatId = getNanoid();
const userQuery: UserChatItemValueItemType[] = [
{
type: ChatItemValueTypeEnum.text,
text: {
content: app.scheduledTriggerConfig?.defaultPrompt
}
}
];
const { flowUsages, assistantResponses, flowResponses } = await retryFn(() => {
return dispatchWorkFlow({
chatId,
user,
mode: 'chat',
runningAppInfo: {
@@ -47,33 +65,48 @@ export const getScheduleTriggerApp = async () => {
tmbId: String(app.tmbId)
},
uid: String(app.tmbId),
runtimeNodes: storeNodes2RuntimeNodes(
app.modules,
getWorkflowEntryNodeIds(app.modules)
),
runtimeEdges: initWorkflowEdgeStatus(app.edges),
runtimeNodes: storeNodes2RuntimeNodes(nodes, getWorkflowEntryNodeIds(nodes)),
runtimeEdges: initWorkflowEdgeStatus(edges),
variables: {},
query: [
{
type: ChatItemValueTypeEnum.text,
text: {
content: app.scheduledTriggerConfig?.defaultPrompt
}
}
],
chatConfig: defaultApp.chatConfig,
query: userQuery,
chatConfig,
histories: [],
stream: false,
maxRunTimes: WORKFLOW_MAX_RUN_TIMES
});
pushChatUsage({
appName: app.name,
appId: app._id,
teamId: String(app.teamId),
tmbId: String(app.tmbId),
source: UsageSourceEnum.cronJob,
flowUsages
});
});
// Save chat
await saveChat({
chatId,
appId: app._id,
teamId: String(app.teamId),
tmbId: String(app.tmbId),
nodes,
appChatConfig: chatConfig,
variables: {},
isUpdateUseTime: false, // owner update use time
newTitle: 'Cron Job',
source: ChatSourceEnum.cronJob,
content: [
{
obj: ChatRoleEnum.Human,
value: userQuery
},
{
obj: ChatRoleEnum.AI,
value: assistantResponses,
[DispatchNodeResponseKeyEnum.nodeResponse]: flowResponses
}
]
});
pushChatUsage({
appName: app.name,
appId: app._id,
teamId: String(app.teamId),
tmbId: String(app.tmbId),
source: UsageSourceEnum.cronJob,
flowUsages
});
// update next time