Files
FastGPT/projects/sandbox/test/unit/process-pool.test.ts
T
Archer 6b61359516 feat(sandbox): 重构代码沙盒,支持内置函数和网络请求 (#6479)
* fix(sandbox): 重构代码沙盒,支持内置函数和网络请求 (#6462)

* feat(sandbox): 重写代码沙盒 - Bun + Hono + 统一子进程模型

- 运行时: Node.js → Bun
- HTTP 框架: NestJS + Fastify → Hono
- JS 执行: isolated-vm → Bun 子进程(与 Python 统一)
- 架构: 统一子进程模型,JS 和 Python 共享同一套执行引擎

- SubprocessRunner 基类,JS/Python 各自继承
- ProcessPool 进程池预热(SANDBOX_JS_POOL_SIZE / SANDBOX_PYTHON_POOL_SIZE)
- SystemHelper 命名空间(JS 端保留向后兼容全局函数)
- 临时文件系统隔离 + 路径遍历防护 + 磁盘配额
- 请求级资源限制(timeoutMs / memoryMB / diskMB)

- JS: 原型链冻结 + Bun API 禁用 + Function 构造器注入 safe require(模块白名单)
- Python: 宿主侧正则预检 + __import__ 拦截 + resource 限制

- 移除: @nestjs/*(6个包)、fastify、isolated-vm、node-gyp、reflect-metadata、rxjs
- 新增: hono
- 保留: tiktoken
- 新增用户可用包: lodash、dayjs、axios、moment、uuid、crypto-js、qs

- 67 个测试全部通过(单元测试 + 安全测试 + 集成测试)
- vitest 独立配置,不影响全局

* fix(sandbox): 安全加固 - 扩展 Bun API 封锁、清理 process.env、闭包封装 Python import 拦截

- JS: 扩展 Bun 危险 API 封锁列表(serve/connect/listen/udpSocket/dns/plugin/build/Transpiler)
- JS: 清理 process.env,仅保留沙箱必要变量,防止泄露敏感环境变量
- Python: 用闭包封装 _safe_import,del 掉 _original_import/_make_safe_import/_BLOCKED_MODULES
  防止用户代码恢复原始 __import__
- Dockerfile: 复制 bun.lock 并使用 --frozen-lockfile 确保构建可复现

* fix(sandbox): 将 sandbox 从 pnpm workspace 中移除,独立管理依赖

* fix(sandbox): 从全局 vitest 移除 sandbox 测试,集成测试无 SANDBOX_URL 时跳过

* ci(sandbox): 添加独立测试 workflow,仅 sandbox 代码变更时触发

* refactor(sandbox): 使用 export default 启动方式,与 sandbox_server 保持一致

* fix: sandbox security hardening & comprehensive test suite

Security fixes:
- JS: freeze Function constructor to block constructor.constructor escape
- JS: handle undefined return from main() (serialize as null)
- Python: fix http_request using from-import after __import__ interception
- Python: __import__ whitelist mode blocks exec/eval import bypasses

New tests (223 passing):
- security/escape-attacks: JS/Python escape attack vectors
- security/network-security: IP blacklist, protocol restrictions, httpRequest
- compat/legacy-js: 18 backward compatibility tests
- compat/legacy-python: 21 backward compatibility tests
- boundary: timeout, memory, disk, edge cases
- examples: common user code patterns

* feat(sandbox): env vars for all limits + rewrite README

- Network limits configurable via env: SANDBOX_MAX_REQUESTS, SANDBOX_REQUEST_TIMEOUT, SANDBOX_MAX_RESPONSE_SIZE
- Resource upper bounds configurable: SANDBOX_MAX_TIMEOUT, SANDBOX_MAX_MEMORY_MB, SANDBOX_MAX_DISK_MB
- README: architecture, API docs, env var reference, how to add JS/Python packages, security overview, built-in functions

* refactor(sandbox): extract env.ts with dotenv for typed env loading

- New env.ts: dotenv.config() + typed helpers (str/int/bool)
- config.ts re-exports env for backward compatibility
- index.ts imports env first to ensure .env loaded before anything else

* refactor(sandbox): use zod for env validation and type coercion

- Replace manual parseInt/str helpers with zod schema + coerce
- Invalid env vars now fail fast with formatted error on startup
- dotenv + zod, clean and declarative

* chore(sandbox): remove unused process pool code

- Delete pool.ts and pool.test.ts (pool was never wired into runners)
- Remove PoolConfig/PooledProcess types
- Remove pool env vars from env.ts
- Clean up README

* feat(sandbox): add concurrency limiter with semaphore

- New Semaphore utility for max concurrent subprocess control
- SANDBOX_MAX_CONCURRENCY env var (default 50)
- Excess requests queue instead of spawning unbounded processes
- Health endpoint exposes concurrency stats (current/queued/max)

* test(sandbox): add semaphore tests and expand coverage to 292 cases

- New semaphore.test.ts (11 tests): acquire/release, queuing, FIFO, stats, serial execution
- JS runner: blank code, template literals, primitive returns, more modules, unicode, partial limits
- Python runner: blank code, triple quotes, primitive returns, unicode, null vars, division errors
- JS security: process.exit, globalThis, Symbol.unscopables, Proxy, dynamic import, path traversal
- Python security: pickle/multiprocessing/threading/ctypes/signal, exec bypass, __subclasses__
- Escape attacks: type() class creation, __builtins__ tampering, getattr access
- Boundary: long vars, special JSON chars, float precision, big ints, circular refs, Promise.reject

* test(sandbox): test-master review - add 31 tests, coverage report

- base-runner.test.ts (10): BaseRunner precheck, temp dir, semaphore integration
- semaphore-race.test.ts (5): race conditions, rapid acquire/release, stress test
- coverage-gaps.test.ts (16): security coverage gaps found during review
- REVIEW-REPORT.md: full test audit report

Total: 323 passed, 0 failed

* fix(sandbox): address PR #6439 review issues

Security fixes:
- Intercept Python builtins.open(), restrict file access to sandbox tmpdir
- Remove unused pool.ts, warmup.mjs, warmup.py (security risk)
- Fix DNS rebinding TOCTOU: use resolved IP for HTTP connections
- Fix symlink path traversal: use realpath instead of normpath
- Add try/finally cleanup for __import__ hook

Robustness:
- Add __SANDBOX_RESULT__ prefix to stdout parsing, prevent user output interference
- Fix disk quota tracking: deduct old file size on overwrite
- Add __import__() pattern scanning in Python precheck

Tests:
- Fix eval+__import__ test assertion (accept both catch and fail paths)

All 323 tests passing.

* fix(sandbox): remove warmup scripts COPY from Dockerfile

* docs(sandbox): add technical design document

* feat(sandbox): configurable module allowlist/blocklist via env vars

- SANDBOX_JS_ALLOWED_MODULES: JS require whitelist (comma-separated)
- SANDBOX_PYTHON_BLOCKED_MODULES: Python import blacklist (comma-separated)
- Defaults unchanged, fully backward compatible

* fix(sandbox): 修复多个安全漏洞

1. Python HTTPS DNS rebinding: HTTPS 请求现在也使用 resolved IP 发起连接
2. Python __import__ hook 恢复漏洞: 移除 finally 块中恢复原始 __import__ 的代码
3. Python 内部变量泄露: 用户代码执行前删除 _os, _socket 等内部模块引用
4. JS process 危险 API: 禁用 process.binding/dlopen/kill/chdir 等,冻结 process.env
5. Python open() fd 绕过: 阻止通过整数文件描述符绕过路径检查
6. API 输入校验: 使用 zod schema 校验请求体,限制代码大小 1MB
7. 无认证警告: SANDBOX_TOKEN 未设置时输出生产环境警告

新增 security-fixes.test.ts 包含所有修复的回归测试

* test: consolidate security tests + add integration test suite

- Merge 6 security test files into 1 consolidated security.test.ts (109 tests)
  - JS/Python module interception (precheck + runtime)
  - JS escape attacks (prototype, constructor, Reflect, globalThis)
  - Python escape attacks (__import__ hook, exec/eval, internal vars, __subclasses__)
  - SSRF protection (private IPs, cloud metadata, file protocol)
  - File system isolation (path traversal, fd, disk quota)
  - Variable injection attacks
  - API input validation

- Add black-box integration test suite functional.test.ts (56 tests)
  - Basic operations (math, string, array, JSON, regex, Date, Promise, Map/Set)
  - Variable passing (string, number, complex objects, empty, multiple)
  - Whitelisted modules (crypto-js, moment, lodash)
  - SystemHelper/system_helper (fs, delay, strToBase64, httpRequest)
  - Error handling (syntax, runtime, undefined var, timeout)
  - Network requests (GET, POST)
  - Complex scenarios (CSV pipeline, recursion, class definition)

- Remove 34 duplicate test cases across merged files
- Total: 363 passed, 8 skipped (integration API tests need server)

* fix(sandbox): z.record() zod v4 compatibility - add key type param

* feat(sandbox): add .env.template with all config options and comments

* refactor(sandbox): remove disk write support and temp filesystem

* test(sandbox): remove all fs-related tests and add test case inventory

- Remove fs read/write tests from unit, integration, boundary, examples
- Remove path traversal, absolute path, open fd, builtins.open tests from security
- Add comprehensive test/case.md with all 344 test cases categorized
- All tests pass: 344 passed, 8 skipped, 0 failed

* feat(sandbox): add GET /sandbox/modules API to list available packages and builtins

* test(sandbox): add unit tests for GET /sandbox/modules API

* refactor(test): rewrite api.test.ts to use app.request() - no external server needed

* feat(sandbox): validate SANDBOX_TOKEN charset in env schema (ASCII printable only)

* chore(sandbox): remove DESIGN.md and package-lock.json from PR

* feat(sandbox): replace spawn-per-request with process pool architecture

- Add ProcessPool (JS) and PythonProcessPool with long-lived worker processes
- Workers communicate via stdin/stdout line-based JSON protocol
- Pool size configurable via SANDBOX_POOL_SIZE env var (default 20)
- Auto-respawn workers on crash
- Semaphore-based queueing when requests exceed pool size

Performance gains (simple functions):
- JS: 22 QPS → 1,328 QPS (60x improvement)
- Python: 14.7 QPS → 3,395 QPS (231x improvement)

- Fix import.meta.dir compatibility for vitest (Node) environments
- Export poolReady promise for test initialization
- Add benchmark scripts to test/benchmark/
- All 354 tests passing (12 test files)

* chore(sandbox): clean up unused files, update README with pool architecture

- Remove test/REVIEW-REPORT.md, test/case.md, test/benchmark.ts (obsolete)
- Rewrite README: pool architecture diagram, performance benchmarks,
  SANDBOX_POOL_SIZE config, project structure, health endpoint format

* fix(sandbox): 修复进程池超时后 worker respawn 竞态条件

根因:超时 kill worker 后,exit 事件是异步的,release() 先执行时
worker 还在列表里,死 worker 被放回 idle 池,后续请求发给死进程。

修复:
- 超时回调中先 removeWorker 再 kill,防止 release 归还死 worker
- removeWorker 返回 bool,exit 事件中避免重复 respawn
- 超时回调主动触发 spawnWorker 补充池
- release 检查 worker 是否仍在池中
- spawnWorker 完成时检查 waitQueue 直接分配

* fix: security hardening & test migration to process pool

- JS worker: harden process object (kill/chdir/env freeze/binding/dlopen)
- Python worker: stack-frame based __import__ hook to block exec/eval bypass
- Python worker: BuiltinsProxy to prevent __import__ override via builtins module
- Python worker: restricted __builtins__ dict in exec_globals (no internal refs)
- Python worker: restore __import__ before each execution
- Migrate all 9 test files from JsRunner/PythonRunner to ProcessPool/PythonProcessPool
- Configure vitest for serial execution (pool size=1, fileParallelism: false)
- Fix security test assertion for builtins tampering (success=true with escaped=false)
- All 102 security tests passing

* docs(sandbox): update README with accurate benchmark data, remove non-existent features

- Update performance table with latest benchmark results (JS 1414 QPS, Python 4247 QPS)
- Remove SANDBOX_DISK_MB/SANDBOX_MAX_DISK_MB env vars (not implemented)
- Remove SystemHelper.fs.* / system_helper.fs.* docs (not implemented in workers)
- Fix security section to match actual implementation
- Update test count to 351

* refactor(sandbox): remove legacy runner/sandbox/template code

- Delete src/runner/ (base.ts, js-runner.ts, python-runner.ts)
- Delete src/sandbox/ (js-template.ts, python-template.ts, network-config.ts)
- Delete test/unit/js-runner.test.ts, test/unit/python-runner.test.ts
- Keep src/utils/semaphore.ts (generic utility, has its own tests)
- Update README project structure and test count (297 cases)

All functionality is now in src/pool/ (process-pool architecture).
297 tests passing, 0 failures.

* test(sandbox): add process pool lifecycle/respawn/concurrency tests

- ProcessPool: init/shutdown/stats, worker crash respawn, timeout respawn,
  pool-full queuing, concurrent crash isolation
- PythonProcessPool: init/shutdown/stats, timeout respawn, queuing
- 14 new test cases, total 311 passing

* fix(sandbox): ping/pong health check, replace httpbin.org with baidu.com

- Worker health check: send actual ping message and verify pong response
  instead of only checking stdin.writable (detects stuck workers)
- JS worker.ts: handle {type:'ping'} → reply {type:'pong'}
- Python worker.py: handle {type:'ping'} → reply {type:'pong'}
- ProcessPool/PythonProcessPool: rewrite pingWorker to send ping,
  wait for pong with timeout, replace worker on failure
- Replace all httpbin.org URLs with www.baidu.com in tests
  (httpbin.org unreachable from China/Sealos Devbox)
- Add 4 new health check tests (ping/pong for JS and Python pools)
- All 318 tests passing, 0 failures

* docs: add test report (test/README.md) and update README testing section

- test/README.md: detailed report with 315 passed / 3 skipped / 0 failed
- README.md: updated test section with coverage dimensions table and link to report

* docs: add functional test cases checklist (110 cases)

* fix(sandbox): fix Dockerfile Python env and import detection

1. Dockerfile: Remove broken multi-stage Python 3.11 copy.
   - The previous approach copied python3 binary from python:3.11-alpine
     but missed libpython3.11.so.1.0, causing Python pool init failure.
   - Now uses system Python from apk and installs pip packages directly.

2. worker.py: Fix false positive import blocking for third-party packages.
   - numpy/pandas were blocked because their internal 'import os' was
     detected as user-initiated (full stack scan found user code frames).
   - Changed to check only the direct caller frame: if the import comes
     from site-packages (third-party lib internals), allow it.
   - Direct user imports of blocked modules are still properly rejected.

* fix(sandbox): block dynamic import() and restrict file system access

Security fixes found during deep review:

1. JS: Block import() dynamic imports that bypass require whitelist.
   - import('fs') could read arbitrary files on the container.
   - Added static regex check to reject code containing import().

2. Python: Restrict open() to prevent user code from reading files.
   - open('/etc/passwd') was accessible from user code.
   - Added _restricted_open() that checks caller frame: only allows
     stdlib/site-packages internal calls, blocks user code (<string>).

3. Python: Remove duplicate return statement in _safe_import.

All 315 tests pass (3 skipped).

* test(sandbox): add regression tests for import() and open() security fixes

- JS: import('fs'), import('child_process'), import('os') blocked
- JS: string containing 'import' not false-positive
- Python: open('/etc/passwd'), open('/proc/self/environ'), open('/tmp/evil.txt', 'w') blocked
- Python: numpy internal open() not affected (conditional on numpy availability)

Total: 322 passed | 3 skipped (was 315 passed)

* docs(sandbox): rewrite sandbox documentation with JS + Python coverage

- Add Python language support documentation
- Add httpRequest/http_request function docs
- Add available modules list (JS whitelist + Python safe modules)
- Add security restrictions section
- Add practical examples (data processing, date calc, webhook signing)
- Add JS/Python function name mapping table

* docs(sandbox): use SystemHelper/system_helper for built-in functions

Direct calls (countToken, delay, etc.) are deprecated (kept for compat).
All examples now use SystemHelper.xxx() / system_helper.xxx().

* docs(sandbox): Python only show named-params style as recommended

* feat(sandbox): unify Python SystemHelper API with camelCase aliases

- Add camelCase aliases to Python SystemHelper: countToken, strToBase64,
  createHmac, httpRequest (matching JS API exactly)
- Update docs to use SystemHelper uniformly for both JS and Python
- snake_case methods (count_token, etc.) still work for backward compat

* feat(sandbox): add matplotlib and increase HTTP timeout to 60s

- Add matplotlib to Python dependencies
- Increase HTTP request timeout from 10s to 60s (both JS and Python)
- Update docs accordingly

* docs(sandbox): split docs for old/new sandbox versions

- sandbox.mdx → '代码运行(旧版)' for FastGPT ≤ 4.14.7 (URL unchanged)
- sandbox-v5.mdx → '代码运行' for FastGPT ≥ 4.14.8
- Both pages cross-link to each other
- meta.json updated: sandbox-v5 listed before sandbox

* docs: rename old sandbox doc to 代码运行(弃)

* refactor(sandbox): remove SANDBOX_TIMEOUT, use SANDBOX_MAX_TIMEOUT as unified timeout

* fix(sandbox): add build dependencies for matplotlib in Dockerfile

* refactor(sandbox): migrate Python from blocklist to allowlist for module control

- Change SANDBOX_PYTHON_BLOCKED_MODULES to SANDBOX_PYTHON_ALLOWED_MODULES
- Update Python worker to use allowlist instead of blocklist
- Add comprehensive safe module list: math, json, datetime, numpy, pandas, etc.
- Improve error message: 'Module X is not in the allowlist'
- Consistent with JS allowlist approach for better security

* fix(sandbox): add _strptime to allowlist and update test assertions

- Add _strptime module (required by datetime.strptime)
- Update test assertions for Python module import errors
- All 325 tests now pass (322 passed, 3 skipped)

* fix(docs): center SVG icon in size-5 container on medium screens

* docs(sandbox): simplify built-in functions and improve module documentation

- Remove delay, countToken, strToBase64, createHmac functions (keep only httpRequest)
- Convert Python module list to table format (10 tables by category)
- Reorganize usage examples with collapsible sections (JS and Python)
- Fix icon alignment in desktop/mobile sidebar navigation
- All 325 tests passing

---------

Co-authored-by: Lobster 3 <lobster3@sandbox.dev>
Co-authored-by: OpenClaw Bot <bot@openclaw.ai>
Co-authored-by: Archer <c121914yu@gmail.com>
Co-authored-by: archer <archer@archerdeMac-mini.local>

* perf: code sandbox

* update action

* Update projects/app/src/components/core/chat/ChatContainer/ChatBox/index.tsx

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* update timeout

* update memory limit function

* sandbox

* perf: process poll

* env template

* feat: code tip

* fix: code sandbox error tip

* update memory limit fn

* update memory limit fn

* fix: test

* fix: test

* fix: sandbox

---------

Co-authored-by: Archer <archer@fastgpt.io>
Co-authored-by: Lobster 3 <lobster3@sandbox.dev>
Co-authored-by: OpenClaw Bot <bot@openclaw.ai>
Co-authored-by: Archer <c121914yu@gmail.com>
Co-authored-by: archer <archer@archerdeMac-mini.local>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2026-02-28 12:36:59 +08:00

962 lines
27 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
/**
* ProcessPool / PythonProcessPool 单元测试
*
* 覆盖进程池核心逻辑:
* - 生命周期(init / shutdown / stats
* - Worker 崩溃自动恢复(respawn
* - 池满排队行为
* - 并发正确性
* - shutdown 后行为
*/
import { describe, it, expect, beforeEach, afterEach } from 'vitest';
import { ProcessPool } from '../../src/pool/process-pool';
import { PythonProcessPool } from '../../src/pool/python-process-pool';
// ============================================================
// JS ProcessPool
// ============================================================
describe('ProcessPool 生命周期', () => {
let pool: ProcessPool;
afterEach(async () => {
try {
await pool?.shutdown();
} catch {}
});
it('init 后 stats 正确', async () => {
pool = new ProcessPool(2);
await pool.init();
const s = pool.stats;
expect(s.total).toBe(2);
expect(s.idle).toBe(2);
expect(s.busy).toBe(0);
expect(s.queued).toBe(0);
expect(s.poolSize).toBe(2);
});
it('shutdown 后 stats 归零', async () => {
pool = new ProcessPool(2);
await pool.init();
await pool.shutdown();
const s = pool.stats;
expect(s.total).toBe(0);
expect(s.idle).toBe(0);
expect(s.busy).toBe(0);
});
it('execute 后 worker 归还到 idle', async () => {
pool = new ProcessPool(1);
await pool.init();
await pool.execute({
code: `async function main() { return { ok: true }; }`,
variables: {}
});
const s = pool.stats;
expect(s.idle).toBe(1);
expect(s.busy).toBe(0);
});
});
describe('ProcessPool Worker 恢复', () => {
let pool: ProcessPool;
afterEach(async () => {
try {
await pool?.shutdown();
} catch {}
});
it('worker 崩溃后自动 respawn,后续请求正常', async () => {
pool = new ProcessPool(1);
await pool.init();
expect(pool.stats.total).toBe(1);
// 让 worker 崩溃(process.exit
const result = await pool.execute({
code: `async function main() { process.exit(1); }`,
variables: {}
});
expect(result.success).toBe(false);
// 等 respawn 完成
await new Promise((r) => setTimeout(r, 1500));
// 新 worker 应该可用
const result2 = await pool.execute({
code: `async function main() { return { recovered: true }; }`,
variables: {}
});
expect(result2.success).toBe(true);
expect(result2.data?.codeReturn.recovered).toBe(true);
});
it('超时后 worker 被 kill 并 respawn', async () => {
pool = new ProcessPool(1);
await pool.init();
const result = await pool.execute({
code: `async function main() { while(true) {} }`,
variables: {}
});
expect(result.success).toBe(false);
expect(result.message).toContain('timed out');
// 等 respawn
await new Promise((r) => setTimeout(r, 1500));
const result2 = await pool.execute({
code: `async function main() { return { ok: true }; }`,
variables: {}
});
expect(result2.success).toBe(true);
});
});
describe('ProcessPool 并发与排队', () => {
let pool: ProcessPool;
afterEach(async () => {
try {
await pool?.shutdown();
} catch {}
});
it('pool size=23 个并发请求,1 个排队', async () => {
pool = new ProcessPool(2);
await pool.init();
// 3 个并发,每个 sleep 200ms
const promises = Array.from({ length: 3 }, (_, i) =>
pool.execute({
code: `async function main(v) { await new Promise(r => setTimeout(r, 200)); return { idx: v.idx }; }`,
variables: { idx: i }
})
);
const results = await Promise.all(promises);
for (let i = 0; i < 3; i++) {
expect(results[i].success).toBe(true);
expect(results[i].data?.codeReturn.idx).toBe(i);
}
});
it('pool size=1,10 个并发请求全部正确完成(串行排队)', async () => {
pool = new ProcessPool(1);
await pool.init();
const promises = Array.from({ length: 10 }, (_, i) =>
pool.execute({
code: `async function main(v) { return { n: v.n * 2 }; }`,
variables: { n: i }
})
);
const results = await Promise.all(promises);
for (let i = 0; i < 10; i++) {
expect(results[i].success).toBe(true);
expect(results[i].data?.codeReturn.n).toBe(i * 2);
}
});
it('pool size=2,并发中 1 个崩溃不影响其他请求', async () => {
pool = new ProcessPool(2);
await pool.init();
const p1 = pool.execute({
code: `async function main() { process.exit(1); }`,
variables: {}
});
const p2 = pool.execute({
code: `async function main() { return { ok: true }; }`,
variables: {}
});
const [r1, r2] = await Promise.all([p1, p2]);
expect(r1.success).toBe(false);
expect(r2.success).toBe(true);
expect(r2.data?.codeReturn.ok).toBe(true);
});
});
// ============================================================
// JS ProcessPool - Worker Ping/Pong 健康检查
// ============================================================
describe('ProcessPool Worker 健康检查 (ping/pong)', () => {
let pool: ProcessPool;
afterEach(async () => {
try {
await pool?.shutdown();
} catch {}
});
it('worker 正常响应 ping 后仍可执行任务', async () => {
pool = new ProcessPool(1);
await pool.init();
// 先执行一个任务确认正常
const r1 = await pool.execute({
code: `async function main() { return { step: 1 }; }`,
variables: {}
});
expect(r1.success).toBe(true);
expect(r1.data?.codeReturn.step).toBe(1);
// 触发健康检查(通过 triggerHealthCheck
(pool as any).pingWorker((pool as any).idleWorkers[0]);
// 等 ping/pong 完成
await new Promise((r) => setTimeout(r, 500));
// 再执行一个任务确认 worker 没被误杀
const r2 = await pool.execute({
code: `async function main() { return { step: 2 }; }`,
variables: {}
});
expect(r2.success).toBe(true);
expect(r2.data?.codeReturn.step).toBe(2);
expect(pool.stats.total).toBe(1);
});
it('连续多次 ping 不影响 worker 状态', async () => {
pool = new ProcessPool(2);
await pool.init();
// 对所有 idle worker 连续 ping 3 次
for (let i = 0; i < 3; i++) {
for (const w of [...(pool as any).idleWorkers]) {
(pool as any).pingWorker(w);
}
await new Promise((r) => setTimeout(r, 300));
}
// 所有 worker 应该还在
expect(pool.stats.total).toBe(2);
expect(pool.stats.idle).toBe(2);
// 执行任务确认功能正常
const result = await pool.execute({
code: `async function main() { return { alive: true }; }`,
variables: {}
});
expect(result.success).toBe(true);
});
});
// ============================================================
// JS ProcessPool - shutdown reject waiters
// ============================================================
describe('ProcessPool shutdown reject waiters', () => {
it('shutdown 后 waitQueue 中的请求被 reject', async () => {
const pool = new ProcessPool(1);
await pool.init();
// 发起一个长时间运行的任务占住唯一 worker
const p1 = pool.execute({
code: `async function main() { await new Promise(r => setTimeout(r, 3000)); return { done: true }; }`,
variables: {}
});
// 等一下确保 p1 已经拿到 worker
await new Promise((r) => setTimeout(r, 200));
// 发起第二个请求,它会进入 waitQueue
const p2 = pool.execute({
code: `async function main() { return { queued: true }; }`,
variables: {}
});
// 确认有排队请求
expect(pool.stats.queued).toBe(1);
// shutdown 应该 reject waitQueue 中的请求
await pool.shutdown();
// p2 应该被 reject
await expect(p2).rejects.toThrow('shutting down');
// p1 可能成功也可能因 worker 被 kill 而失败,不关心
await p1.catch(() => {});
});
});
// ============================================================
// JS ProcessPool - 返回值序列化与参数校验(原 base-runner.test.ts
// ============================================================
describe('ProcessPool 返回值序列化与参数校验', () => {
let pool: ProcessPool;
afterEach(async () => {
try {
await pool?.shutdown();
} catch {}
});
it('JS main 返回 undefined 序列化为 null', async () => {
pool = new ProcessPool(1);
await pool.init();
const result = await pool.execute({
code: `async function main() { return undefined; }`,
variables: {}
});
expect(result.success).toBe(true);
expect(result.data?.codeReturn).toBeNull();
});
it('JS main 无 return 语句序列化为 null', async () => {
pool = new ProcessPool(1);
await pool.init();
const result = await pool.execute({
code: `async function main() { const x = 1; }`,
variables: {}
});
expect(result.success).toBe(true);
expect(result.data?.codeReturn).toBeNull();
});
it('code 为非字符串类型返回错误', async () => {
pool = new ProcessPool(1);
await pool.init();
const result = await pool.execute({
code: 123 as any,
variables: {}
});
expect(result.success).toBe(false);
expect(result.message).toContain('empty');
});
it('code 为 null 返回错误', async () => {
pool = new ProcessPool(1);
await pool.init();
const result = await pool.execute({
code: null as any,
variables: {}
});
expect(result.success).toBe(false);
expect(result.message).toContain('empty');
});
});
// ============================================================
// JS + Python 混合并发(原 base-runner.test.ts
// ============================================================
describe('JS + Python 混合并发', () => {
let jsPool: ProcessPool;
let pyPool: PythonProcessPool;
afterEach(async () => {
try {
await jsPool?.shutdown();
await pyPool?.shutdown();
} catch {}
});
it('JS 和 Python 混合并发执行', async () => {
jsPool = new ProcessPool(1);
await jsPool.init();
pyPool = new PythonProcessPool(1);
await pyPool.init();
const jsPromise = jsPool.execute({
code: `async function main() { return { lang: 'js' }; }`,
variables: {}
});
const pyPromise = pyPool.execute({
code: `def main():\n return {'lang': 'python'}`,
variables: {}
});
const [jsResult, pyResult] = await Promise.all([jsPromise, pyPromise]);
expect(jsResult.success).toBe(true);
expect(jsResult.data?.codeReturn.lang).toBe('js');
expect(pyResult.success).toBe(true);
expect(pyResult.data?.codeReturn.lang).toBe('python');
});
});
// ============================================================
// JS ProcessPool - 健康检查失败路径
// ============================================================
describe('ProcessPool 健康检查失败路径', () => {
let pool: ProcessPool;
afterEach(async () => {
try {
await pool?.shutdown();
} catch {}
});
it('ping timeout: worker 不响应 pong 时被替换', async () => {
pool = new ProcessPool(1);
await pool.init();
expect(pool.stats.total).toBe(1);
const worker = (pool as any).idleWorkers[0];
// 拦截 stdin.write 使 ping 消息不到达 worker(但不关闭 stdin),从而触发真正的 timeout
const origWrite = worker.proc.stdin!.write.bind(worker.proc.stdin!);
let interceptPing = true;
worker.proc.stdin!.write = (...args: any[]) => {
if (interceptPing) {
interceptPing = false;
return true; // 假装写成功但实际不发送
}
return origWrite(...args);
};
// 触发 ping
(pool as any).pingWorker(worker);
// 等待 HEALTH_CHECK_TIMEOUT (5s) + respawn
await new Promise((r) => setTimeout(r, 8000));
// worker 应该被替换,池仍然有 1 个 worker
expect(pool.stats.total).toBe(1);
// 新 worker 应该可用
const result = await pool.execute({
code: `async function main() { return { ok: true }; }`,
variables: {}
});
expect(result.success).toBe(true);
}, 15000);
it('stdin not writable: worker stdin 关闭时被替换', async () => {
pool = new ProcessPool(1);
await pool.init();
expect(pool.stats.total).toBe(1);
const worker = (pool as any).idleWorkers[0];
// 销毁 stdin 使其 writable = false
worker.proc.stdin!.destroy();
// 触发 ping
(pool as any).pingWorker(worker);
// 等 respawn
await new Promise((r) => setTimeout(r, 3000));
expect(pool.stats.total).toBe(1);
const result = await pool.execute({
code: `async function main() { return { replaced: true }; }`,
variables: {}
});
expect(result.success).toBe(true);
}, 10000);
it('health check invalid response: worker 返回错误类型时被替换', async () => {
pool = new ProcessPool(1);
await pool.init();
expect(pool.stats.total).toBe(1);
const worker = (pool as any).idleWorkers[0];
const origWrite = worker.proc.stdin!.write.bind(worker.proc.stdin!);
let intercepted = false;
worker.proc.stdin!.write = (...args: any[]) => {
if (!intercepted) {
intercepted = true;
setTimeout(() => worker.rl.emit('line', JSON.stringify({ type: 'wrong' })), 50);
return true;
}
return origWrite(...args);
};
(pool as any).pingWorker(worker);
await new Promise((r) => setTimeout(r, 3000));
expect(pool.stats.total).toBe(1);
const result = await pool.execute({
code: `async function main() { return { invalidResp: true }; }`,
variables: {}
});
expect(result.success).toBe(true);
}, 10000);
it('returnToIdle with waiter: ping 期间有等待请求时直接分配', async () => {
pool = new ProcessPool(1);
await pool.init();
const worker = (pool as any).idleWorkers[0];
(pool as any).pingWorker(worker);
// ping 期间 worker 不在 idle 中,新请求进入 waitQueue
// ping 成功后 returnToIdle 检查 waitQueue 并直接分配
const p1 = pool.execute({
code: `async function main() { return { fromWaiter: true }; }`,
variables: {}
});
const result = await p1;
expect(result.success).toBe(true);
expect(result.data?.codeReturn.fromWaiter).toBe(true);
});
it('health check parse error: worker 返回非 JSON 时被替换', async () => {
pool = new ProcessPool(1);
await pool.init();
expect(pool.stats.total).toBe(1);
const worker = (pool as any).idleWorkers[0];
const origWrite = worker.proc.stdin!.write.bind(worker.proc.stdin!);
let intercepted = false;
worker.proc.stdin!.write = (...args: any[]) => {
if (!intercepted) {
intercepted = true;
setTimeout(() => worker.rl.emit('line', 'not-json-at-all'), 50);
return true;
}
return origWrite(...args);
};
(pool as any).pingWorker(worker);
await new Promise((r) => setTimeout(r, 3000));
expect(pool.stats.total).toBe(1);
const result = await pool.execute({
code: `async function main() { return { parseError: true }; }`,
variables: {}
});
expect(result.success).toBe(true);
}, 10000);
it('health check write error: stdin.write 抛异常时被替换', async () => {
pool = new ProcessPool(1);
await pool.init();
expect(pool.stats.total).toBe(1);
const worker = (pool as any).idleWorkers[0];
// 让 stdin.write 抛异常,但 writable 仍为 true
worker.proc.stdin!.write = () => {
throw new Error('mock write error');
};
(pool as any).pingWorker(worker);
await new Promise((r) => setTimeout(r, 3000));
expect(pool.stats.total).toBe(1);
const result = await pool.execute({
code: `async function main() { return { writeError: true }; }`,
variables: {}
});
expect(result.success).toBe(true);
}, 10000);
it('returnToIdle with waiter: ping 成功后分配给等待中的请求', async () => {
pool = new ProcessPool(1);
await pool.init();
// 发起一个长任务占住 worker
const p1 = pool.execute({
code: `async function main() { await new Promise(r => setTimeout(r, 1000)); return { first: true }; }`,
variables: {}
});
// 等 p1 拿到 worker
await new Promise((r) => setTimeout(r, 100));
// 发起第二个请求,它会进入 waitQueue
const p2 = pool.execute({
code: `async function main() { return { second: true }; }`,
variables: {}
});
// 确认有排队
expect(pool.stats.queued).toBe(1);
// 等 p1 完成,p2 应该自动被分配
const [r1, r2] = await Promise.all([p1, p2]);
expect(r1.success).toBe(true);
expect(r1.data?.codeReturn.first).toBe(true);
expect(r2.success).toBe(true);
expect(r2.data?.codeReturn.second).toBe(true);
});
});
// ============================================================
// Python PythonProcessPool - Worker Ping/Pong 健康检查
// ============================================================
describe('PythonProcessPool Worker 健康检查 (ping/pong)', () => {
let pool: PythonProcessPool;
afterEach(async () => {
try {
await pool?.shutdown();
} catch {}
});
it('worker 正常响应 ping 后仍可执行任务', async () => {
pool = new PythonProcessPool(1);
await pool.init();
const r1 = await pool.execute({
code: `def main():\n return {'step': 1}`,
variables: {}
});
expect(r1.success).toBe(true);
expect(r1.data?.codeReturn.step).toBe(1);
// 触发 ping
(pool as any).pingWorker((pool as any).idleWorkers[0]);
await new Promise((r) => setTimeout(r, 500));
const r2 = await pool.execute({
code: `def main():\n return {'step': 2}`,
variables: {}
});
expect(r2.success).toBe(true);
expect(r2.data?.codeReturn.step).toBe(2);
expect(pool.stats.total).toBe(1);
});
it('连续多次 ping 不影响 worker 状态', async () => {
pool = new PythonProcessPool(2);
await pool.init();
for (let i = 0; i < 3; i++) {
for (const w of [...(pool as any).idleWorkers]) {
(pool as any).pingWorker(w);
}
await new Promise((r) => setTimeout(r, 300));
}
expect(pool.stats.total).toBe(2);
expect(pool.stats.idle).toBe(2);
const result = await pool.execute({
code: `def main():\n return {'alive': True}`,
variables: {}
});
expect(result.success).toBe(true);
});
});
// ============================================================
// Python PythonProcessPool - 健康检查失败路径
// ============================================================
describe('PythonProcessPool 健康检查失败路径', () => {
let pool: PythonProcessPool;
afterEach(async () => {
try {
await pool?.shutdown();
} catch {}
});
it('ping timeout: worker 不响应 pong 时被替换', async () => {
pool = new PythonProcessPool(1);
await pool.init();
expect(pool.stats.total).toBe(1);
const worker = (pool as any).idleWorkers[0];
// 拦截 stdin.write 使 ping 不到达 worker,触发真正的 timeout
const origWrite = worker.proc.stdin!.write.bind(worker.proc.stdin!);
let interceptPing = true;
worker.proc.stdin!.write = (...args: any[]) => {
if (interceptPing) {
interceptPing = false;
return true;
}
return origWrite(...args);
};
(pool as any).pingWorker(worker);
await new Promise((r) => setTimeout(r, 8000));
expect(pool.stats.total).toBe(1);
const result = await pool.execute({
code: `def main():\n return {'ok': True}`,
variables: {}
});
expect(result.success).toBe(true);
}, 15000);
it('stdin not writable: worker stdin 关闭时被替换', async () => {
pool = new PythonProcessPool(1);
await pool.init();
expect(pool.stats.total).toBe(1);
const worker = (pool as any).idleWorkers[0];
worker.proc.stdin!.destroy();
(pool as any).pingWorker(worker);
await new Promise((r) => setTimeout(r, 3000));
expect(pool.stats.total).toBe(1);
const result = await pool.execute({
code: `def main():\n return {'replaced': True}`,
variables: {}
});
expect(result.success).toBe(true);
}, 10000);
it('health check invalid response: worker 返回错误类型时被替换', async () => {
pool = new PythonProcessPool(1);
await pool.init();
expect(pool.stats.total).toBe(1);
const worker = (pool as any).idleWorkers[0];
const origWrite = worker.proc.stdin!.write.bind(worker.proc.stdin!);
let intercepted = false;
worker.proc.stdin!.write = (...args: any[]) => {
if (!intercepted) {
intercepted = true;
setTimeout(() => worker.rl.emit('line', JSON.stringify({ type: 'wrong' })), 50);
return true;
}
return origWrite(...args);
};
(pool as any).pingWorker(worker);
await new Promise((r) => setTimeout(r, 3000));
expect(pool.stats.total).toBe(1);
const result = await pool.execute({
code: `def main():\n return {'invalidResp': True}`,
variables: {}
});
expect(result.success).toBe(true);
}, 10000);
it('returnToIdle with waiter: ping 期间有等待请求时直接分配', async () => {
pool = new PythonProcessPool(1);
await pool.init();
const worker = (pool as any).idleWorkers[0];
(pool as any).pingWorker(worker);
const p1 = pool.execute({
code: `def main():\n return {'fromWaiter': True}`,
variables: {}
});
const result = await p1;
expect(result.success).toBe(true);
expect(result.data?.codeReturn.fromWaiter).toBe(true);
});
it('health check parse error: worker 返回非 JSON 时被替换', async () => {
pool = new PythonProcessPool(1);
await pool.init();
expect(pool.stats.total).toBe(1);
const worker = (pool as any).idleWorkers[0];
const origWrite = worker.proc.stdin!.write.bind(worker.proc.stdin!);
let intercepted = false;
worker.proc.stdin!.write = (...args: any[]) => {
if (!intercepted) {
intercepted = true;
setTimeout(() => worker.rl.emit('line', 'not-json'), 50);
return true;
}
return origWrite(...args);
};
(pool as any).pingWorker(worker);
await new Promise((r) => setTimeout(r, 3000));
expect(pool.stats.total).toBe(1);
const result = await pool.execute({
code: `def main():\n return {'parseError': True}`,
variables: {}
});
expect(result.success).toBe(true);
}, 10000);
it('health check write error: stdin.write 抛异常时被替换', async () => {
pool = new PythonProcessPool(1);
await pool.init();
expect(pool.stats.total).toBe(1);
const worker = (pool as any).idleWorkers[0];
worker.proc.stdin!.write = () => {
throw new Error('mock write error');
};
(pool as any).pingWorker(worker);
await new Promise((r) => setTimeout(r, 3000));
expect(pool.stats.total).toBe(1);
const result = await pool.execute({
code: `def main():\n return {'writeError': True}`,
variables: {}
});
expect(result.success).toBe(true);
}, 10000);
});
// ============================================================
// Python PythonProcessPool - shutdown reject waiters
// ============================================================
describe('PythonProcessPool shutdown reject waiters', () => {
it('shutdown 后 waitQueue 中的请求被 reject', async () => {
const pool = new PythonProcessPool(1);
await pool.init();
// 发起一个长时间运行的任务占住唯一 worker
const p1 = pool.execute({
code: `import time\ndef main():\n time.sleep(3)\n return {'done': True}`,
variables: {}
});
// 等一下确保 p1 已经拿到 worker
await new Promise((r) => setTimeout(r, 200));
// 发起第二个请求,它会进入 waitQueue
const p2 = pool.execute({
code: `def main():\n return {'queued': True}`,
variables: {}
});
// 确认有排队请求
expect(pool.stats.queued).toBe(1);
// shutdown 应该 reject waitQueue 中的请求
await pool.shutdown();
// p2 应该被 reject
await expect(p2).rejects.toThrow('shutting down');
// p1 可能成功也可能因 worker 被 kill 而失败,不关心
await p1.catch(() => {});
});
});
// ============================================================
// Python PythonProcessPool
// ============================================================
describe('PythonProcessPool 生命周期', () => {
let pool: PythonProcessPool;
afterEach(async () => {
try {
await pool?.shutdown();
} catch {}
});
it('init 后 stats 正确', async () => {
pool = new PythonProcessPool(2);
await pool.init();
const s = pool.stats;
expect(s.total).toBe(2);
expect(s.idle).toBe(2);
expect(s.busy).toBe(0);
expect(s.queued).toBe(0);
expect(s.poolSize).toBe(2);
});
it('shutdown 后 stats 归零', async () => {
pool = new PythonProcessPool(2);
await pool.init();
await pool.shutdown();
const s = pool.stats;
expect(s.total).toBe(0);
expect(s.idle).toBe(0);
expect(s.busy).toBe(0);
});
it('execute 后 worker 归还到 idle', async () => {
pool = new PythonProcessPool(1);
await pool.init();
await pool.execute({
code: `def main():\n return {'ok': True}`,
variables: {}
});
const s = pool.stats;
expect(s.idle).toBe(1);
expect(s.busy).toBe(0);
});
});
describe('PythonProcessPool Worker 恢复', () => {
let pool: PythonProcessPool;
afterEach(async () => {
try {
await pool?.shutdown();
} catch {}
});
it('超时后 worker 被 kill 并 respawn', async () => {
pool = new PythonProcessPool(1);
await pool.init();
const result = await pool.execute({
code: `def main():\n while True:\n pass`,
variables: {}
});
expect(result.success).toBe(false);
expect(result.message).toContain('timed out');
// 等 respawn
await new Promise((r) => setTimeout(r, 2000));
const result2 = await pool.execute({
code: `def main():\n return {'ok': True}`,
variables: {}
});
expect(result2.success).toBe(true);
});
});
describe('PythonProcessPool 并发与排队', () => {
let pool: PythonProcessPool;
afterEach(async () => {
try {
await pool?.shutdown();
} catch {}
});
it('pool size=23 个并发请求,1 个排队', async () => {
pool = new PythonProcessPool(2);
await pool.init();
const promises = Array.from({ length: 3 }, (_, i) =>
pool.execute({
code: `import time\ndef main(variables):\n time.sleep(0.2)\n return {'idx': variables['idx']}`,
variables: { idx: i }
})
);
const results = await Promise.all(promises);
for (let i = 0; i < 3; i++) {
expect(results[i].success).toBe(true);
expect(results[i].data?.codeReturn.idx).toBe(i);
}
});
it('pool size=1,10 个并发请求全部正确完成(串行排队)', async () => {
pool = new PythonProcessPool(1);
await pool.init();
const promises = Array.from({ length: 10 }, (_, i) =>
pool.execute({
code: `def main(variables):\n return {'n': variables['n'] * 2}`,
variables: { n: i }
})
);
const results = await Promise.all(promises);
for (let i = 0; i < 10; i++) {
expect(results[i].success).toBe(true);
expect(results[i].data?.codeReturn.n).toBe(i * 2);
}
});
});