mirror of
https://github.com/labring/FastGPT.git
synced 2026-04-27 02:08:10 +08:00
cc3a91d009
* Opensandbox (#6651) * volumn manager * feat: opensandbox volumn * perf: action (#6654) * perf: action * doc * doc * deploy tml * update template
962 lines
27 KiB
TypeScript
962 lines
27 KiB
TypeScript
/**
|
||
* ProcessPool / PythonProcessPool 单元测试
|
||
*
|
||
* 覆盖进程池核心逻辑:
|
||
* - 生命周期(init / shutdown / stats)
|
||
* - Worker 崩溃自动恢复(respawn)
|
||
* - 池满排队行为
|
||
* - 并发正确性
|
||
* - shutdown 后行为
|
||
*/
|
||
import { describe, it, expect, beforeEach, afterEach } from 'vitest';
|
||
import { ProcessPool } from '../../src/pool/process-pool';
|
||
import { PythonProcessPool } from '../../src/pool/python-process-pool';
|
||
|
||
// ============================================================
|
||
// JS ProcessPool
|
||
// ============================================================
|
||
describe('ProcessPool 生命周期', () => {
|
||
let pool: ProcessPool;
|
||
|
||
afterEach(async () => {
|
||
try {
|
||
await pool?.shutdown();
|
||
} catch {}
|
||
});
|
||
|
||
it('init 后 stats 正确', async () => {
|
||
pool = new ProcessPool(2);
|
||
await pool.init();
|
||
const s = pool.stats;
|
||
expect(s.total).toBe(2);
|
||
expect(s.idle).toBe(2);
|
||
expect(s.busy).toBe(0);
|
||
expect(s.queued).toBe(0);
|
||
expect(s.poolSize).toBe(2);
|
||
});
|
||
|
||
it('shutdown 后 stats 归零', async () => {
|
||
pool = new ProcessPool(2);
|
||
await pool.init();
|
||
await pool.shutdown();
|
||
const s = pool.stats;
|
||
expect(s.total).toBe(0);
|
||
expect(s.idle).toBe(0);
|
||
expect(s.busy).toBe(0);
|
||
});
|
||
|
||
it('execute 后 worker 归还到 idle', async () => {
|
||
pool = new ProcessPool(1);
|
||
await pool.init();
|
||
await pool.execute({
|
||
code: `async function main() { return { ok: true }; }`,
|
||
variables: {}
|
||
});
|
||
const s = pool.stats;
|
||
expect(s.idle).toBe(1);
|
||
expect(s.busy).toBe(0);
|
||
});
|
||
});
|
||
|
||
describe('ProcessPool Worker 恢复', () => {
|
||
let pool: ProcessPool;
|
||
|
||
afterEach(async () => {
|
||
try {
|
||
await pool?.shutdown();
|
||
} catch {}
|
||
});
|
||
|
||
it('worker 崩溃后自动 respawn,后续请求正常', async () => {
|
||
pool = new ProcessPool(1);
|
||
await pool.init();
|
||
expect(pool.stats.total).toBe(1);
|
||
|
||
// 让 worker 崩溃(process.exit)
|
||
const result = await pool.execute({
|
||
code: `async function main() { process.exit(1); }`,
|
||
variables: {}
|
||
});
|
||
expect(result.success).toBe(false);
|
||
|
||
// 等 respawn 完成
|
||
await new Promise((r) => setTimeout(r, 1500));
|
||
|
||
// 新 worker 应该可用
|
||
const result2 = await pool.execute({
|
||
code: `async function main() { return { recovered: true }; }`,
|
||
variables: {}
|
||
});
|
||
expect(result2.success).toBe(true);
|
||
expect(result2.data?.codeReturn.recovered).toBe(true);
|
||
});
|
||
|
||
it('超时后 worker 被 kill 并 respawn', async () => {
|
||
pool = new ProcessPool(1);
|
||
await pool.init();
|
||
|
||
const result = await pool.execute({
|
||
code: `async function main() { while(true) {} }`,
|
||
variables: {}
|
||
});
|
||
expect(result.success).toBe(false);
|
||
expect(result.message).toContain('timed out');
|
||
|
||
// 等 respawn
|
||
await new Promise((r) => setTimeout(r, 1500));
|
||
|
||
const result2 = await pool.execute({
|
||
code: `async function main() { return { ok: true }; }`,
|
||
variables: {}
|
||
});
|
||
expect(result2.success).toBe(true);
|
||
});
|
||
});
|
||
|
||
describe('ProcessPool 并发与排队', () => {
|
||
let pool: ProcessPool;
|
||
|
||
afterEach(async () => {
|
||
try {
|
||
await pool?.shutdown();
|
||
} catch {}
|
||
});
|
||
|
||
it('pool size=2,3 个并发请求,1 个排队', async () => {
|
||
pool = new ProcessPool(2);
|
||
await pool.init();
|
||
|
||
// 3 个并发,每个 sleep 200ms
|
||
const promises = Array.from({ length: 3 }, (_, i) =>
|
||
pool.execute({
|
||
code: `async function main(v) { await new Promise(r => setTimeout(r, 200)); return { idx: v.idx }; }`,
|
||
variables: { idx: i }
|
||
})
|
||
);
|
||
|
||
const results = await Promise.all(promises);
|
||
for (let i = 0; i < 3; i++) {
|
||
expect(results[i].success).toBe(true);
|
||
expect(results[i].data?.codeReturn.idx).toBe(i);
|
||
}
|
||
});
|
||
|
||
it('pool size=1,10 个并发请求全部正确完成(串行排队)', async () => {
|
||
pool = new ProcessPool(1);
|
||
await pool.init();
|
||
|
||
const promises = Array.from({ length: 10 }, (_, i) =>
|
||
pool.execute({
|
||
code: `async function main(v) { return { n: v.n * 2 }; }`,
|
||
variables: { n: i }
|
||
})
|
||
);
|
||
|
||
const results = await Promise.all(promises);
|
||
for (let i = 0; i < 10; i++) {
|
||
expect(results[i].success).toBe(true);
|
||
expect(results[i].data?.codeReturn.n).toBe(i * 2);
|
||
}
|
||
});
|
||
|
||
it('pool size=2,并发中 1 个崩溃不影响其他请求', async () => {
|
||
pool = new ProcessPool(2);
|
||
await pool.init();
|
||
|
||
const p1 = pool.execute({
|
||
code: `async function main() { process.exit(1); }`,
|
||
variables: {}
|
||
});
|
||
const p2 = pool.execute({
|
||
code: `async function main() { return { ok: true }; }`,
|
||
variables: {}
|
||
});
|
||
|
||
const [r1, r2] = await Promise.all([p1, p2]);
|
||
expect(r1.success).toBe(false);
|
||
expect(r2.success).toBe(true);
|
||
expect(r2.data?.codeReturn.ok).toBe(true);
|
||
});
|
||
});
|
||
|
||
// ============================================================
|
||
// JS ProcessPool - Worker Ping/Pong 健康检查
|
||
// ============================================================
|
||
describe('ProcessPool Worker 健康检查 (ping/pong)', () => {
|
||
let pool: ProcessPool;
|
||
|
||
afterEach(async () => {
|
||
try {
|
||
await pool?.shutdown();
|
||
} catch {}
|
||
});
|
||
|
||
it('worker 正常响应 ping 后仍可执行任务', async () => {
|
||
pool = new ProcessPool(1);
|
||
await pool.init();
|
||
|
||
// 先执行一个任务确认正常
|
||
const r1 = await pool.execute({
|
||
code: `async function main() { return { step: 1 }; }`,
|
||
variables: {}
|
||
});
|
||
expect(r1.success).toBe(true);
|
||
expect(r1.data?.codeReturn.step).toBe(1);
|
||
|
||
// 触发健康检查(通过 triggerHealthCheck)
|
||
(pool as any).pingWorker((pool as any).idleWorkers[0]);
|
||
|
||
// 等 ping/pong 完成
|
||
await new Promise((r) => setTimeout(r, 500));
|
||
|
||
// 再执行一个任务确认 worker 没被误杀
|
||
const r2 = await pool.execute({
|
||
code: `async function main() { return { step: 2 }; }`,
|
||
variables: {}
|
||
});
|
||
expect(r2.success).toBe(true);
|
||
expect(r2.data?.codeReturn.step).toBe(2);
|
||
expect(pool.stats.total).toBe(1);
|
||
});
|
||
|
||
it('连续多次 ping 不影响 worker 状态', async () => {
|
||
pool = new ProcessPool(2);
|
||
await pool.init();
|
||
|
||
// 对所有 idle worker 连续 ping 3 次
|
||
for (let i = 0; i < 3; i++) {
|
||
for (const w of [...(pool as any).idleWorkers]) {
|
||
(pool as any).pingWorker(w);
|
||
}
|
||
await new Promise((r) => setTimeout(r, 300));
|
||
}
|
||
|
||
// 所有 worker 应该还在
|
||
expect(pool.stats.total).toBe(2);
|
||
expect(pool.stats.idle).toBe(2);
|
||
|
||
// 执行任务确认功能正常
|
||
const result = await pool.execute({
|
||
code: `async function main() { return { alive: true }; }`,
|
||
variables: {}
|
||
});
|
||
expect(result.success).toBe(true);
|
||
});
|
||
});
|
||
|
||
// ============================================================
|
||
// JS ProcessPool - shutdown reject waiters
|
||
// ============================================================
|
||
describe('ProcessPool shutdown reject waiters', () => {
|
||
it('shutdown 后 waitQueue 中的请求被 reject', async () => {
|
||
const pool = new ProcessPool(1);
|
||
await pool.init();
|
||
|
||
// 发起一个长时间运行的任务占住唯一 worker
|
||
const p1 = pool.execute({
|
||
code: `async function main() { await new Promise(r => setTimeout(r, 3000)); return { done: true }; }`,
|
||
variables: {}
|
||
});
|
||
|
||
// 等一下确保 p1 已经拿到 worker
|
||
await new Promise((r) => setTimeout(r, 200));
|
||
|
||
// 发起第二个请求,它会进入 waitQueue
|
||
const p2 = pool.execute({
|
||
code: `async function main() { return { queued: true }; }`,
|
||
variables: {}
|
||
});
|
||
|
||
// 确认有排队请求
|
||
expect(pool.stats.queued).toBe(1);
|
||
|
||
// shutdown 应该 reject waitQueue 中的请求
|
||
await pool.shutdown();
|
||
|
||
// p2 应该被 reject
|
||
await expect(p2).rejects.toThrow('shutting down');
|
||
|
||
// p1 可能成功也可能因 worker 被 kill 而失败,不关心
|
||
await p1.catch(() => {});
|
||
});
|
||
});
|
||
|
||
// ============================================================
|
||
// JS ProcessPool - 返回值序列化与参数校验(原 base-runner.test.ts)
|
||
// ============================================================
|
||
describe('ProcessPool 返回值序列化与参数校验', () => {
|
||
let pool: ProcessPool;
|
||
|
||
afterEach(async () => {
|
||
try {
|
||
await pool?.shutdown();
|
||
} catch {}
|
||
});
|
||
|
||
it('JS main 返回 undefined 序列化为 null', async () => {
|
||
pool = new ProcessPool(1);
|
||
await pool.init();
|
||
const result = await pool.execute({
|
||
code: `async function main() { return undefined; }`,
|
||
variables: {}
|
||
});
|
||
expect(result.success).toBe(true);
|
||
expect(result.data?.codeReturn).toBeNull();
|
||
});
|
||
|
||
it('JS main 无 return 语句序列化为 null', async () => {
|
||
pool = new ProcessPool(1);
|
||
await pool.init();
|
||
const result = await pool.execute({
|
||
code: `async function main() { const x = 1; }`,
|
||
variables: {}
|
||
});
|
||
expect(result.success).toBe(true);
|
||
expect(result.data?.codeReturn).toBeNull();
|
||
});
|
||
|
||
it('code 为非字符串类型返回错误', async () => {
|
||
pool = new ProcessPool(1);
|
||
await pool.init();
|
||
const result = await pool.execute({
|
||
code: 123 as any,
|
||
variables: {}
|
||
});
|
||
expect(result.success).toBe(false);
|
||
expect(result.message).toContain('empty');
|
||
});
|
||
|
||
it('code 为 null 返回错误', async () => {
|
||
pool = new ProcessPool(1);
|
||
await pool.init();
|
||
const result = await pool.execute({
|
||
code: null as any,
|
||
variables: {}
|
||
});
|
||
expect(result.success).toBe(false);
|
||
expect(result.message).toContain('empty');
|
||
});
|
||
});
|
||
|
||
// ============================================================
|
||
// JS + Python 混合并发(原 base-runner.test.ts)
|
||
// ============================================================
|
||
describe('JS + Python 混合并发', () => {
|
||
let jsPool: ProcessPool;
|
||
let pyPool: PythonProcessPool;
|
||
|
||
afterEach(async () => {
|
||
try {
|
||
await jsPool?.shutdown();
|
||
await pyPool?.shutdown();
|
||
} catch {}
|
||
});
|
||
|
||
it('JS 和 Python 混合并发执行', async () => {
|
||
jsPool = new ProcessPool(1);
|
||
await jsPool.init();
|
||
pyPool = new PythonProcessPool(1);
|
||
await pyPool.init();
|
||
|
||
const jsPromise = jsPool.execute({
|
||
code: `async function main() { return { lang: 'js' }; }`,
|
||
variables: {}
|
||
});
|
||
const pyPromise = pyPool.execute({
|
||
code: `def main():\n return {'lang': 'python'}`,
|
||
variables: {}
|
||
});
|
||
const [jsResult, pyResult] = await Promise.all([jsPromise, pyPromise]);
|
||
expect(jsResult.success).toBe(true);
|
||
expect(jsResult.data?.codeReturn.lang).toBe('js');
|
||
expect(pyResult.success).toBe(true);
|
||
expect(pyResult.data?.codeReturn.lang).toBe('python');
|
||
});
|
||
});
|
||
|
||
// ============================================================
|
||
// JS ProcessPool - 健康检查失败路径
|
||
// ============================================================
|
||
describe('ProcessPool 健康检查失败路径', () => {
|
||
let pool: ProcessPool;
|
||
|
||
afterEach(async () => {
|
||
try {
|
||
await pool?.shutdown();
|
||
} catch {}
|
||
});
|
||
|
||
it('ping timeout: worker 不响应 pong 时被替换', async () => {
|
||
pool = new ProcessPool(1);
|
||
await pool.init();
|
||
expect(pool.stats.total).toBe(1);
|
||
|
||
const worker = (pool as any).idleWorkers[0];
|
||
// 拦截 stdin.write 使 ping 消息不到达 worker(但不关闭 stdin),从而触发真正的 timeout
|
||
const origWrite = worker.proc.stdin!.write.bind(worker.proc.stdin!);
|
||
let interceptPing = true;
|
||
worker.proc.stdin!.write = (...args: any[]) => {
|
||
if (interceptPing) {
|
||
interceptPing = false;
|
||
return true; // 假装写成功但实际不发送
|
||
}
|
||
return origWrite(...args);
|
||
};
|
||
|
||
// 触发 ping
|
||
(pool as any).pingWorker(worker);
|
||
|
||
// 等待 HEALTH_CHECK_TIMEOUT (5s) + respawn
|
||
await new Promise((r) => setTimeout(r, 8000));
|
||
|
||
// worker 应该被替换,池仍然有 1 个 worker
|
||
expect(pool.stats.total).toBe(1);
|
||
|
||
// 新 worker 应该可用
|
||
const result = await pool.execute({
|
||
code: `async function main() { return { ok: true }; }`,
|
||
variables: {}
|
||
});
|
||
expect(result.success).toBe(true);
|
||
}, 15000);
|
||
|
||
it('stdin not writable: worker stdin 关闭时被替换', async () => {
|
||
pool = new ProcessPool(1);
|
||
await pool.init();
|
||
expect(pool.stats.total).toBe(1);
|
||
|
||
const worker = (pool as any).idleWorkers[0];
|
||
// 销毁 stdin 使其 writable = false
|
||
worker.proc.stdin!.destroy();
|
||
|
||
// 触发 ping
|
||
(pool as any).pingWorker(worker);
|
||
|
||
// 等 respawn
|
||
await new Promise((r) => setTimeout(r, 3000));
|
||
|
||
expect(pool.stats.total).toBe(1);
|
||
|
||
const result = await pool.execute({
|
||
code: `async function main() { return { replaced: true }; }`,
|
||
variables: {}
|
||
});
|
||
expect(result.success).toBe(true);
|
||
}, 10000);
|
||
|
||
it('health check invalid response: worker 返回错误类型时被替换', async () => {
|
||
pool = new ProcessPool(1);
|
||
await pool.init();
|
||
expect(pool.stats.total).toBe(1);
|
||
|
||
const worker = (pool as any).idleWorkers[0];
|
||
const origWrite = worker.proc.stdin!.write.bind(worker.proc.stdin!);
|
||
let intercepted = false;
|
||
worker.proc.stdin!.write = (...args: any[]) => {
|
||
if (!intercepted) {
|
||
intercepted = true;
|
||
setTimeout(() => worker.rl.emit('line', JSON.stringify({ type: 'wrong' })), 50);
|
||
return true;
|
||
}
|
||
return origWrite(...args);
|
||
};
|
||
|
||
(pool as any).pingWorker(worker);
|
||
|
||
await new Promise((r) => setTimeout(r, 3000));
|
||
|
||
expect(pool.stats.total).toBe(1);
|
||
|
||
const result = await pool.execute({
|
||
code: `async function main() { return { invalidResp: true }; }`,
|
||
variables: {}
|
||
});
|
||
expect(result.success).toBe(true);
|
||
}, 10000);
|
||
|
||
it('returnToIdle with waiter: ping 期间有等待请求时直接分配', async () => {
|
||
pool = new ProcessPool(1);
|
||
await pool.init();
|
||
|
||
const worker = (pool as any).idleWorkers[0];
|
||
(pool as any).pingWorker(worker);
|
||
|
||
// ping 期间 worker 不在 idle 中,新请求进入 waitQueue
|
||
// ping 成功后 returnToIdle 检查 waitQueue 并直接分配
|
||
const p1 = pool.execute({
|
||
code: `async function main() { return { fromWaiter: true }; }`,
|
||
variables: {}
|
||
});
|
||
|
||
const result = await p1;
|
||
expect(result.success).toBe(true);
|
||
expect(result.data?.codeReturn.fromWaiter).toBe(true);
|
||
});
|
||
|
||
it('health check parse error: worker 返回非 JSON 时被替换', async () => {
|
||
pool = new ProcessPool(1);
|
||
await pool.init();
|
||
expect(pool.stats.total).toBe(1);
|
||
|
||
const worker = (pool as any).idleWorkers[0];
|
||
const origWrite = worker.proc.stdin!.write.bind(worker.proc.stdin!);
|
||
let intercepted = false;
|
||
worker.proc.stdin!.write = (...args: any[]) => {
|
||
if (!intercepted) {
|
||
intercepted = true;
|
||
setTimeout(() => worker.rl.emit('line', 'not-json-at-all'), 50);
|
||
return true;
|
||
}
|
||
return origWrite(...args);
|
||
};
|
||
|
||
(pool as any).pingWorker(worker);
|
||
|
||
await new Promise((r) => setTimeout(r, 3000));
|
||
|
||
expect(pool.stats.total).toBe(1);
|
||
|
||
const result = await pool.execute({
|
||
code: `async function main() { return { parseError: true }; }`,
|
||
variables: {}
|
||
});
|
||
expect(result.success).toBe(true);
|
||
}, 10000);
|
||
|
||
it('health check write error: stdin.write 抛异常时被替换', async () => {
|
||
pool = new ProcessPool(1);
|
||
await pool.init();
|
||
expect(pool.stats.total).toBe(1);
|
||
|
||
const worker = (pool as any).idleWorkers[0];
|
||
// 让 stdin.write 抛异常,但 writable 仍为 true
|
||
worker.proc.stdin!.write = () => {
|
||
throw new Error('mock write error');
|
||
};
|
||
|
||
(pool as any).pingWorker(worker);
|
||
|
||
await new Promise((r) => setTimeout(r, 3000));
|
||
|
||
expect(pool.stats.total).toBe(1);
|
||
|
||
const result = await pool.execute({
|
||
code: `async function main() { return { writeError: true }; }`,
|
||
variables: {}
|
||
});
|
||
expect(result.success).toBe(true);
|
||
}, 10000);
|
||
|
||
it('returnToIdle with waiter: ping 成功后分配给等待中的请求', async () => {
|
||
pool = new ProcessPool(1);
|
||
await pool.init();
|
||
|
||
// 发起一个长任务占住 worker
|
||
const p1 = pool.execute({
|
||
code: `async function main() { await new Promise(r => setTimeout(r, 1000)); return { first: true }; }`,
|
||
variables: {}
|
||
});
|
||
|
||
// 等 p1 拿到 worker
|
||
await new Promise((r) => setTimeout(r, 100));
|
||
|
||
// 发起第二个请求,它会进入 waitQueue
|
||
const p2 = pool.execute({
|
||
code: `async function main() { return { second: true }; }`,
|
||
variables: {}
|
||
});
|
||
|
||
// 确认有排队
|
||
expect(pool.stats.queued).toBe(1);
|
||
|
||
// 等 p1 完成,p2 应该自动被分配
|
||
const [r1, r2] = await Promise.all([p1, p2]);
|
||
expect(r1.success).toBe(true);
|
||
expect(r1.data?.codeReturn.first).toBe(true);
|
||
expect(r2.success).toBe(true);
|
||
expect(r2.data?.codeReturn.second).toBe(true);
|
||
});
|
||
});
|
||
|
||
// ============================================================
|
||
// Python PythonProcessPool - Worker Ping/Pong 健康检查
|
||
// ============================================================
|
||
describe('PythonProcessPool Worker 健康检查 (ping/pong)', () => {
|
||
let pool: PythonProcessPool;
|
||
|
||
afterEach(async () => {
|
||
try {
|
||
await pool?.shutdown();
|
||
} catch {}
|
||
});
|
||
|
||
it('worker 正常响应 ping 后仍可执行任务', async () => {
|
||
pool = new PythonProcessPool(1);
|
||
await pool.init();
|
||
|
||
const r1 = await pool.execute({
|
||
code: `def main():\n return {'step': 1}`,
|
||
variables: {}
|
||
});
|
||
expect(r1.success).toBe(true);
|
||
expect(r1.data?.codeReturn.step).toBe(1);
|
||
|
||
// 触发 ping
|
||
(pool as any).pingWorker((pool as any).idleWorkers[0]);
|
||
await new Promise((r) => setTimeout(r, 500));
|
||
|
||
const r2 = await pool.execute({
|
||
code: `def main():\n return {'step': 2}`,
|
||
variables: {}
|
||
});
|
||
expect(r2.success).toBe(true);
|
||
expect(r2.data?.codeReturn.step).toBe(2);
|
||
expect(pool.stats.total).toBe(1);
|
||
});
|
||
|
||
it('连续多次 ping 不影响 worker 状态', async () => {
|
||
pool = new PythonProcessPool(2);
|
||
await pool.init();
|
||
|
||
for (let i = 0; i < 3; i++) {
|
||
for (const w of [...(pool as any).idleWorkers]) {
|
||
(pool as any).pingWorker(w);
|
||
}
|
||
await new Promise((r) => setTimeout(r, 300));
|
||
}
|
||
|
||
expect(pool.stats.total).toBe(2);
|
||
expect(pool.stats.idle).toBe(2);
|
||
|
||
const result = await pool.execute({
|
||
code: `def main():\n return {'alive': True}`,
|
||
variables: {}
|
||
});
|
||
expect(result.success).toBe(true);
|
||
});
|
||
});
|
||
|
||
// ============================================================
|
||
// Python PythonProcessPool - 健康检查失败路径
|
||
// ============================================================
|
||
describe('PythonProcessPool 健康检查失败路径', () => {
|
||
let pool: PythonProcessPool;
|
||
|
||
afterEach(async () => {
|
||
try {
|
||
await pool?.shutdown();
|
||
} catch {}
|
||
});
|
||
|
||
it('ping timeout: worker 不响应 pong 时被替换', async () => {
|
||
pool = new PythonProcessPool(1);
|
||
await pool.init();
|
||
expect(pool.stats.total).toBe(1);
|
||
|
||
const worker = (pool as any).idleWorkers[0];
|
||
// 拦截 stdin.write 使 ping 不到达 worker,触发真正的 timeout
|
||
const origWrite = worker.proc.stdin!.write.bind(worker.proc.stdin!);
|
||
let interceptPing = true;
|
||
worker.proc.stdin!.write = (...args: any[]) => {
|
||
if (interceptPing) {
|
||
interceptPing = false;
|
||
return true;
|
||
}
|
||
return origWrite(...args);
|
||
};
|
||
|
||
(pool as any).pingWorker(worker);
|
||
|
||
await new Promise((r) => setTimeout(r, 8000));
|
||
|
||
expect(pool.stats.total).toBe(1);
|
||
|
||
const result = await pool.execute({
|
||
code: `def main():\n return {'ok': True}`,
|
||
variables: {}
|
||
});
|
||
expect(result.success).toBe(true);
|
||
}, 15000);
|
||
|
||
it('stdin not writable: worker stdin 关闭时被替换', async () => {
|
||
pool = new PythonProcessPool(1);
|
||
await pool.init();
|
||
expect(pool.stats.total).toBe(1);
|
||
|
||
const worker = (pool as any).idleWorkers[0];
|
||
worker.proc.stdin!.destroy();
|
||
|
||
(pool as any).pingWorker(worker);
|
||
|
||
await new Promise((r) => setTimeout(r, 3000));
|
||
|
||
expect(pool.stats.total).toBe(1);
|
||
|
||
const result = await pool.execute({
|
||
code: `def main():\n return {'replaced': True}`,
|
||
variables: {}
|
||
});
|
||
expect(result.success).toBe(true);
|
||
}, 10000);
|
||
|
||
it('health check invalid response: worker 返回错误类型时被替换', async () => {
|
||
pool = new PythonProcessPool(1);
|
||
await pool.init();
|
||
expect(pool.stats.total).toBe(1);
|
||
|
||
const worker = (pool as any).idleWorkers[0];
|
||
const origWrite = worker.proc.stdin!.write.bind(worker.proc.stdin!);
|
||
let intercepted = false;
|
||
worker.proc.stdin!.write = (...args: any[]) => {
|
||
if (!intercepted) {
|
||
intercepted = true;
|
||
setTimeout(() => worker.rl.emit('line', JSON.stringify({ type: 'wrong' })), 50);
|
||
return true;
|
||
}
|
||
return origWrite(...args);
|
||
};
|
||
|
||
(pool as any).pingWorker(worker);
|
||
|
||
await new Promise((r) => setTimeout(r, 3000));
|
||
|
||
expect(pool.stats.total).toBe(1);
|
||
|
||
const result = await pool.execute({
|
||
code: `def main():\n return {'invalidResp': True}`,
|
||
variables: {}
|
||
});
|
||
expect(result.success).toBe(true);
|
||
}, 10000);
|
||
|
||
it('returnToIdle with waiter: ping 期间有等待请求时直接分配', async () => {
|
||
pool = new PythonProcessPool(1);
|
||
await pool.init();
|
||
|
||
const worker = (pool as any).idleWorkers[0];
|
||
(pool as any).pingWorker(worker);
|
||
|
||
const p1 = pool.execute({
|
||
code: `def main():\n return {'fromWaiter': True}`,
|
||
variables: {}
|
||
});
|
||
|
||
const result = await p1;
|
||
expect(result.success).toBe(true);
|
||
expect(result.data?.codeReturn.fromWaiter).toBe(true);
|
||
});
|
||
|
||
it('health check parse error: worker 返回非 JSON 时被替换', async () => {
|
||
pool = new PythonProcessPool(1);
|
||
await pool.init();
|
||
expect(pool.stats.total).toBe(1);
|
||
|
||
const worker = (pool as any).idleWorkers[0];
|
||
const origWrite = worker.proc.stdin!.write.bind(worker.proc.stdin!);
|
||
let intercepted = false;
|
||
worker.proc.stdin!.write = (...args: any[]) => {
|
||
if (!intercepted) {
|
||
intercepted = true;
|
||
setTimeout(() => worker.rl.emit('line', 'not-json'), 50);
|
||
return true;
|
||
}
|
||
return origWrite(...args);
|
||
};
|
||
|
||
(pool as any).pingWorker(worker);
|
||
|
||
await new Promise((r) => setTimeout(r, 3000));
|
||
|
||
expect(pool.stats.total).toBe(1);
|
||
|
||
const result = await pool.execute({
|
||
code: `def main():\n return {'parseError': True}`,
|
||
variables: {}
|
||
});
|
||
expect(result.success).toBe(true);
|
||
}, 10000);
|
||
|
||
it('health check write error: stdin.write 抛异常时被替换', async () => {
|
||
pool = new PythonProcessPool(1);
|
||
await pool.init();
|
||
expect(pool.stats.total).toBe(1);
|
||
|
||
const worker = (pool as any).idleWorkers[0];
|
||
worker.proc.stdin!.write = () => {
|
||
throw new Error('mock write error');
|
||
};
|
||
|
||
(pool as any).pingWorker(worker);
|
||
|
||
await new Promise((r) => setTimeout(r, 3000));
|
||
|
||
expect(pool.stats.total).toBe(1);
|
||
|
||
const result = await pool.execute({
|
||
code: `def main():\n return {'writeError': True}`,
|
||
variables: {}
|
||
});
|
||
expect(result.success).toBe(true);
|
||
}, 10000);
|
||
});
|
||
|
||
// ============================================================
|
||
// Python PythonProcessPool - shutdown reject waiters
|
||
// ============================================================
|
||
describe('PythonProcessPool shutdown reject waiters', () => {
|
||
it('shutdown 后 waitQueue 中的请求被 reject', async () => {
|
||
const pool = new PythonProcessPool(1);
|
||
await pool.init();
|
||
|
||
// 发起一个长时间运行的任务占住唯一 worker
|
||
const p1 = pool.execute({
|
||
code: `import time\ndef main():\n time.sleep(3)\n return {'done': True}`,
|
||
variables: {}
|
||
});
|
||
|
||
// 等一下确保 p1 已经拿到 worker
|
||
await new Promise((r) => setTimeout(r, 200));
|
||
|
||
// 发起第二个请求,它会进入 waitQueue
|
||
const p2 = pool.execute({
|
||
code: `def main():\n return {'queued': True}`,
|
||
variables: {}
|
||
});
|
||
|
||
// 确认有排队请求
|
||
expect(pool.stats.queued).toBe(1);
|
||
|
||
// shutdown 应该 reject waitQueue 中的请求
|
||
await pool.shutdown();
|
||
|
||
// p2 应该被 reject
|
||
await expect(p2).rejects.toThrow('shutting down');
|
||
|
||
// p1 可能成功也可能因 worker 被 kill 而失败,不关心
|
||
await p1.catch(() => {});
|
||
});
|
||
});
|
||
|
||
// ============================================================
|
||
// Python PythonProcessPool
|
||
// ============================================================
|
||
describe('PythonProcessPool 生命周期', () => {
|
||
let pool: PythonProcessPool;
|
||
|
||
afterEach(async () => {
|
||
try {
|
||
await pool?.shutdown();
|
||
} catch {}
|
||
});
|
||
|
||
it('init 后 stats 正确', async () => {
|
||
pool = new PythonProcessPool(2);
|
||
await pool.init();
|
||
const s = pool.stats;
|
||
expect(s.total).toBe(2);
|
||
expect(s.idle).toBe(2);
|
||
expect(s.busy).toBe(0);
|
||
expect(s.queued).toBe(0);
|
||
expect(s.poolSize).toBe(2);
|
||
});
|
||
|
||
it('shutdown 后 stats 归零', async () => {
|
||
pool = new PythonProcessPool(2);
|
||
await pool.init();
|
||
await pool.shutdown();
|
||
const s = pool.stats;
|
||
expect(s.total).toBe(0);
|
||
expect(s.idle).toBe(0);
|
||
expect(s.busy).toBe(0);
|
||
});
|
||
|
||
it('execute 后 worker 归还到 idle', async () => {
|
||
pool = new PythonProcessPool(1);
|
||
await pool.init();
|
||
await pool.execute({
|
||
code: `def main():\n return {'ok': True}`,
|
||
variables: {}
|
||
});
|
||
const s = pool.stats;
|
||
expect(s.idle).toBe(1);
|
||
expect(s.busy).toBe(0);
|
||
});
|
||
});
|
||
|
||
describe('PythonProcessPool Worker 恢复', () => {
|
||
let pool: PythonProcessPool;
|
||
|
||
afterEach(async () => {
|
||
try {
|
||
await pool?.shutdown();
|
||
} catch {}
|
||
});
|
||
|
||
it('超时后 worker 被 kill 并 respawn', async () => {
|
||
pool = new PythonProcessPool(1);
|
||
await pool.init();
|
||
|
||
const result = await pool.execute({
|
||
code: `def main():\n while True:\n pass`,
|
||
variables: {}
|
||
});
|
||
expect(result.success).toBe(false);
|
||
expect(result.message).toContain('timed out');
|
||
|
||
// 等 respawn
|
||
await new Promise((r) => setTimeout(r, 2000));
|
||
|
||
const result2 = await pool.execute({
|
||
code: `def main():\n return {'ok': True}`,
|
||
variables: {}
|
||
});
|
||
expect(result2.success).toBe(true);
|
||
});
|
||
});
|
||
|
||
describe('PythonProcessPool 并发与排队', () => {
|
||
let pool: PythonProcessPool;
|
||
|
||
afterEach(async () => {
|
||
try {
|
||
await pool?.shutdown();
|
||
} catch {}
|
||
});
|
||
|
||
it('pool size=2,3 个并发请求,1 个排队', async () => {
|
||
pool = new PythonProcessPool(2);
|
||
await pool.init();
|
||
|
||
const promises = Array.from({ length: 3 }, (_, i) =>
|
||
pool.execute({
|
||
code: `import time\ndef main(variables):\n time.sleep(0.2)\n return {'idx': variables['idx']}`,
|
||
variables: { idx: i }
|
||
})
|
||
);
|
||
|
||
const results = await Promise.all(promises);
|
||
for (let i = 0; i < 3; i++) {
|
||
expect(results[i].success).toBe(true);
|
||
expect(results[i].data?.codeReturn.idx).toBe(i);
|
||
}
|
||
});
|
||
|
||
it('pool size=1,10 个并发请求全部正确完成(串行排队)', async () => {
|
||
pool = new PythonProcessPool(1);
|
||
await pool.init();
|
||
|
||
const promises = Array.from({ length: 10 }, (_, i) =>
|
||
pool.execute({
|
||
code: `def main(variables):\n return {'n': variables['n'] * 2}`,
|
||
variables: { n: i }
|
||
})
|
||
);
|
||
|
||
const results = await Promise.all(promises);
|
||
for (let i = 0; i < 10; i++) {
|
||
expect(results[i].success).toBe(true);
|
||
expect(results[i].data?.codeReturn.n).toBe(i * 2);
|
||
}
|
||
});
|
||
});
|