Files
FastGPT/packages/service/common/bullmq/index.ts
Archer 3a5d725efd feature: 4.10.1 (#5201)
* add dynamic inputRender (#5127)

* dynamic input component

* fix

* fix

* fix

* perf: dynamic render input

* update doc

* perf: error catch

* num input ui

* fix form render (#5177)

* perf: i18n check

* add log

* doc

* Sync dataset  (#5181)

* perf: api dataset create (#5047)

* Sync dataset (#5120)

* add

* wait

* restructure dataset sync, update types and APIs, add sync hints, and remove legacy logic

* feat: add function to retrieve real file ID from third-party doc library and rename team permission check function for clarity

* fix come console

* refactor: rename team dataset limit check functions for clarity, update API dataset sync limit usage, and rename root directory to "ROOT_FOLDER"

* frat: update sync dataset login

* fix delete.ts

* feat: update pnpm-lock.yaml to include bullmq, fix comments in api.d.ts and type.d.ts, rename API file ID field, optimize dataset sync logic, and add website sync feature with related APIs

* feat: update CollectionCard to support site dataset sync, add API root ID constant and init sync API

* feat: add RootCollectionId constant to replace hardcoded root ID

---------

Co-authored-by: dreamer6680 <146868355@qq.com>

* perf: code

* feat: update success message for dataset sync, revise related i18n texts, and optimize file selection logic (#5166)

Co-authored-by: dreamer6680 <146868355@qq.com>

* perf: select file

* Sync dataset (#5180)

* feat: update success message for dataset sync, revise related i18n texts, and optimize file selection logic

* fix: make listfile function return rawid string

---------

Co-authored-by: dreamer6680 <146868355@qq.com>

* init sh

* fix: ts

---------

Co-authored-by: dreamer6680 <1468683855@qq.com>
Co-authored-by: dreamer6680 <146868355@qq.com>

* update doc

* i18n

---------

Co-authored-by: heheer <heheer@sealos.io>
Co-authored-by: dreamer6680 <1468683855@qq.com>
Co-authored-by: dreamer6680 <146868355@qq.com>
2025-07-11 17:02:48 +08:00

89 lines
2.3 KiB
TypeScript

import {
type ConnectionOptions,
type Processor,
Queue,
type QueueOptions,
Worker,
type WorkerOptions
} from 'bullmq';
import { addLog } from '../system/log';
import { newQueueRedisConnection, newWorkerRedisConnection } from '../redis';
const defaultWorkerOpts: Omit<ConnectionOptions, 'connection'> = {
removeOnComplete: {
count: 0 // Delete jobs immediately on completion
},
removeOnFail: {
count: 0 // Delete jobs immediately on failure
}
};
export enum QueueNames {
datasetSync = 'datasetSync',
// abondoned
websiteSync = 'websiteSync'
}
export const queues = (() => {
if (!global.queues) {
global.queues = new Map<QueueNames, Queue>();
}
return global.queues;
})();
export const workers = (() => {
if (!global.workers) {
global.workers = new Map<QueueNames, Worker>();
}
return global.workers;
})();
export function getQueue<DataType, ReturnType = void>(
name: QueueNames,
opts?: Omit<QueueOptions, 'connection'>
): Queue<DataType, ReturnType> {
// check if global.queues has the queue
const queue = queues.get(name);
if (queue) {
return queue as Queue<DataType, ReturnType>;
}
const newQueue = new Queue<DataType, ReturnType>(name.toString(), {
connection: newQueueRedisConnection(),
...opts
});
// default error handler, to avoid unhandled exceptions
newQueue.on('error', (error) => {
addLog.error(`MQ Queue [${name}]: ${error.message}`, error);
});
queues.set(name, newQueue);
return newQueue;
}
export function getWorker<DataType, ReturnType = void>(
name: QueueNames,
processor: Processor<DataType, ReturnType>,
opts?: Omit<WorkerOptions, 'connection'>
): Worker<DataType, ReturnType> {
const worker = workers.get(name);
if (worker) {
return worker as Worker<DataType, ReturnType>;
}
const newWorker = new Worker<DataType, ReturnType>(name.toString(), processor, {
connection: newWorkerRedisConnection(),
...defaultWorkerOpts,
...opts
});
// default error handler, to avoid unhandled exceptions
newWorker.on('error', (error) => {
addLog.error(`MQ Worker [${name}]: ${error.message}`, error);
});
newWorker.on('failed', (jobId, error) => {
addLog.error(`MQ Worker [${name}]: ${error.message}`, error);
});
workers.set(name, newWorker);
return newWorker;
}
export * from 'bullmq';