import fs from "fs/promises"; import path from "path"; import { fileURLToPath } from "url"; const __filename = fileURLToPath(import.meta.url); const __dirname = path.dirname(__filename); const repoRoot = path.resolve(__dirname, ".."); const kbRoot = path.join(repoRoot, "docs", "kb"); const requestTagCatalogPath = path.join(kbRoot, "tags.json"); const DEFAULT_DEEPSEEK_BASE_URL = "https://api.deepseek.com"; const DEFAULT_DEEPSEEK_MODEL = "deepseek-v4-flash"; const DEFAULT_PROVIDER = process.env.REQUEST_ANALYZER_PROVIDER || (String(process.env.DEEPSEEK_API_KEY || "").trim() ? "deepseek" : "openai-compatible"); const DEFAULTS = { apiBase: process.env.REQUEST_ANALYZER_API_BASE || `http://127.0.0.1:${process.env.PORT || "5180"}`, provider: DEFAULT_PROVIDER, modelBaseUrl: process.env.REQUEST_ANALYZER_MODEL_BASE_URL || "", model: process.env.REQUEST_ANALYZER_MODEL || "", apiKey: process.env.REQUEST_ANALYZER_API_KEY || process.env.DEEPSEEK_API_KEY || "", adminPassword: process.env.REQUEST_ANALYZER_ADMIN_PASSWORD || process.env.LAUNCHER_ADMIN_PASSWORD || "", limit: Number(process.env.REQUEST_ANALYZER_LIMIT || 5), promoteThreshold: Number(process.env.REQUEST_ANALYZER_PROMOTE_THRESHOLD || 0.85), maxTokens: Math.max(512, Number(process.env.REQUEST_ANALYZER_MAX_TOKENS || 4000)), thinking: String(process.env.REQUEST_ANALYZER_THINKING || "disabled").trim().toLowerCase() === "enabled" ? "enabled" : "disabled", poll: false, intervalMs: Number(process.env.REQUEST_ANALYZER_INTERVAL_MS || 30000), dryRun: false, requestId: "", }; function printHelp() { console.log(`Worldshaper request analysis worker Usage: npm run analyze:requests -- [options] Options: --provider Model provider: deepseek or openai-compatible. --request-id Analyze one pending request only. --limit Maximum pending requests to process in one pass. --poll Keep polling for new pending requests. --interval-ms Poll interval when --poll is enabled. --dry-run Do not write anything back to the API. --api-base Worldshaper API base URL. --model-base-url Model API base URL. Defaults to DeepSeek when provider=deepseek. --model Model id to use. Defaults to ${DEFAULT_DEEPSEEK_MODEL} for DeepSeek. --api-key API key for the model endpoint. --max-tokens Max output tokens for the model response. --thinking DeepSeek thinking mode: enabled or disabled. --promote-threshold Minimum per-item confidence for auto-promotion. --help Show this message. Environment variables: REQUEST_ANALYZER_API_BASE REQUEST_ANALYZER_PROVIDER REQUEST_ANALYZER_MODEL_BASE_URL REQUEST_ANALYZER_MODEL REQUEST_ANALYZER_API_KEY REQUEST_ANALYZER_ADMIN_PASSWORD REQUEST_ANALYZER_MAX_TOKENS REQUEST_ANALYZER_THINKING REQUEST_ANALYZER_LIMIT REQUEST_ANALYZER_INTERVAL_MS REQUEST_ANALYZER_PROMOTE_THRESHOLD DEEPSEEK_API_KEY LAUNCHER_ADMIN_PASSWORD Notes: - DeepSeek uses ${DEFAULT_DEEPSEEK_BASE_URL}/chat/completions. - DeepSeek JSON mode is enabled automatically for this worker. - Run the Worldshaper API server first so the worker can read and patch requests. `); } function parseArgs(argv) { const config = { ...DEFAULTS }; for (let index = 0; index < argv.length; index += 1) { const arg = String(argv[index] || "").trim(); if (!arg) { continue; } if (arg === "--help" || arg === "-h") { config.help = true; continue; } if (arg === "--provider") { config.provider = String(argv[index + 1] || "").trim().toLowerCase() || config.provider; index += 1; continue; } if (arg === "--poll") { config.poll = true; continue; } if (arg === "--dry-run") { config.dryRun = true; continue; } if (arg === "--request-id") { config.requestId = String(argv[index + 1] || "").trim(); index += 1; continue; } if (arg === "--limit") { config.limit = Math.max(1, Math.floor(Number(argv[index + 1]) || config.limit)); index += 1; continue; } if (arg === "--interval-ms") { config.intervalMs = Math.max(1000, Math.floor(Number(argv[index + 1]) || config.intervalMs)); index += 1; continue; } if (arg === "--api-base") { config.apiBase = String(argv[index + 1] || "").trim() || config.apiBase; index += 1; continue; } if (arg === "--model-base-url") { config.modelBaseUrl = String(argv[index + 1] || "").trim() || config.modelBaseUrl; index += 1; continue; } if (arg === "--model") { config.model = String(argv[index + 1] || "").trim() || config.model; index += 1; continue; } if (arg === "--api-key") { config.apiKey = String(argv[index + 1] || "").trim() || config.apiKey; index += 1; continue; } if (arg === "--max-tokens") { config.maxTokens = Math.max(512, Math.floor(Number(argv[index + 1]) || config.maxTokens)); index += 1; continue; } if (arg === "--thinking") { const thinkingValue = String(argv[index + 1] || "").trim().toLowerCase(); config.thinking = thinkingValue === "enabled" ? "enabled" : "disabled"; index += 1; continue; } if (arg === "--promote-threshold") { const parsed = Number(argv[index + 1]); if (Number.isFinite(parsed)) { config.promoteThreshold = Math.max(0, Math.min(1, parsed)); } index += 1; continue; } throw new Error(`Unknown option: ${arg}`); } return config; } function finalizeConfig(config) { const provider = String(config.provider || "").trim().toLowerCase(); const nextConfig = { ...config, provider: provider === "deepseek" ? "deepseek" : "openai-compatible", }; if (!String(nextConfig.modelBaseUrl || "").trim()) { nextConfig.modelBaseUrl = nextConfig.provider === "deepseek" ? DEFAULT_DEEPSEEK_BASE_URL : "http://127.0.0.1:1234/v1"; } if (!String(nextConfig.model || "").trim() && nextConfig.provider === "deepseek") { nextConfig.model = DEFAULT_DEEPSEEK_MODEL; } return nextConfig; } function sleep(ms) { return new Promise((resolve) => { setTimeout(resolve, ms); }); } function withTrailingSlash(value) { return String(value || "").endsWith("/") ? String(value || "") : `${String(value || "")}/`; } function buildUrl(base, pathname) { return new URL(String(pathname || "").replace(/^\//, ""), withTrailingSlash(base)).toString(); } async function fetchJson(url, init = {}) { const response = await fetch(url, init); if (!response.ok) { const responseText = await response.text().catch(() => ""); throw new Error(`${response.status} ${response.statusText}${responseText ? `: ${responseText.slice(0, 240)}` : ""}`); } return response.json(); } function buildAdminHeaders(config, baseHeaders = {}) { const nextHeaders = { ...baseHeaders, }; const adminPassword = String(config?.adminPassword || "").trim(); if (adminPassword) { nextHeaders["x-worldshaper-admin-password"] = adminPassword; } return nextHeaders; } function tokenize(value) { return String(value || "") .toLowerCase() .split(/[^a-z0-9_/-]+/i) .map((entry) => entry.trim()) .filter((entry) => entry.length >= 2); } function uniqueStrings(values) { const seen = new Set(); const next = []; for (const value of Array.isArray(values) ? values : []) { const normalized = String(value || "").replace(/\s+/g, " ").trim(); const key = normalized.toLowerCase(); if (!normalized || seen.has(key)) { continue; } seen.add(key); next.push(normalized); } return next; } function normalizeRequestTagLookupValue(value) { return String(value || "").replace(/\s+/g, " ").trim().toLowerCase(); } async function loadRequestTagCatalog() { const payload = JSON.parse(await fs.readFile(requestTagCatalogPath, "utf8")); const tags = Array.isArray(payload?.tags) ? payload.tags .map((entry) => ({ id: String(entry?.id || "").trim(), label: String(entry?.label || "").trim(), aliases: Array.isArray(entry?.aliases) ? entry.aliases.map((alias) => String(alias || "").trim()).filter(Boolean) : [], })) .filter((entry) => entry.id && entry.label) : []; if (tags.length === 0) { throw new Error("Request tag catalog did not contain any valid tags."); } return tags; } function buildRequestTagLookup(tagDefinitions) { return new Map( (Array.isArray(tagDefinitions) ? tagDefinitions : []).flatMap((entry) => [ [normalizeRequestTagLookupValue(entry.label), entry.label], ...(Array.isArray(entry.aliases) ? entry.aliases.map((alias) => [normalizeRequestTagLookupValue(alias), entry.label]) : []), ]), ); } function normalizeRequestTags(values, tagLookup, fallback = []) { const normalized = uniqueStrings(Array.isArray(values) ? values : []) .map((entry) => tagLookup.get(normalizeRequestTagLookupValue(entry)) || "") .filter(Boolean); if (normalized.length > 0) { return uniqueStrings(normalized); } return uniqueStrings(Array.isArray(fallback) ? fallback : []) .map((entry) => tagLookup.get(normalizeRequestTagLookupValue(entry)) || "") .filter(Boolean); } function clampConfidence(value) { const parsed = Number(value); if (!Number.isFinite(parsed)) { return null; } return Math.max(0, Math.min(1, parsed)); } function normalizeItemStatusRecommendation(value) { const normalized = String(value || "").trim().toLowerCase(); if (normalized === "active" || normalized === "duplicate" || normalized === "blocked" || normalized === "needs_review") { return normalized; } return "needs_review"; } function normalizeProblemType(value) { const normalized = String(value || "").trim().toLowerCase(); if (normalized === "feature" || normalized === "bug" || normalized === "workflow" || normalized === "performance" || normalized === "ux" || normalized === "content") { return normalized; } return "unknown"; } function buildFallbackTitle(text, fallback = "Pending request") { const normalized = String(text || "").replace(/\s+/g, " ").trim(); if (!normalized) { return fallback; } const firstSentence = normalized.split(/[\r\n.!?]+/).map((entry) => entry.trim()).find(Boolean) || normalized; return firstSentence.length > 72 ? `${firstSentence.slice(0, 69).trim()}...` : firstSentence; } function normalizeAnalysisItem(rawItem, index = 0, request = null, relevantSystems = [], kb = null) { const source = rawItem && typeof rawItem === "object" && !Array.isArray(rawItem) ? rawItem : null; if (!source) { return null; } const fallbackText = String(source.rawExcerpt || request?.sourceText || "").trim(); const title = String(source.title || "").trim() || buildFallbackTitle(fallbackText, `Analyzed request ${index + 1}`); const parsedInterpretation = String(source.parsedInterpretation || source.summary || "").trim(); const implementationApproach = String(source.implementationApproach || source.implementationNotes || "").trim(); if (!title || !parsedInterpretation || !implementationApproach) { return null; } const primaryCategory = String(source.primaryCategory || source.category || "").trim() || String(relevantSystems[0]?.name || "Unsorted"); const affectedSystems = uniqueStrings(source.affectedSystems); const defaultTags = [ primaryCategory, ...affectedSystems, ]; const tags = normalizeRequestTags( Array.isArray(source.tags) ? source.tags : defaultTags, kb?.requestTagLookup || new Map(), defaultTags, ); const reviewRationale = String(source.reviewRationale || source.reviewReason || source.notes || "").trim(); const reviewOptions = uniqueStrings(source.reviewOptions); return { title, primaryCategory, tags: tags.length > 0 ? tags : ["General"], statusRecommendation: normalizeItemStatusRecommendation(source.statusRecommendation || source.status), parsedInterpretation, implementationApproach, affectedSystems, affectedFiles: uniqueStrings(source.affectedFiles), problemType: normalizeProblemType(source.problemType), rawExcerpt: String(source.rawExcerpt || "").trim(), confidence: clampConfidence(source.confidence), reviewRationale, reviewOptions, notes: String(source.notes || "").trim(), }; } function normalizeAnalysisResult(rawResult, request, relevantSystems, kb = null) { const source = rawResult && typeof rawResult === "object" && !Array.isArray(rawResult) ? rawResult : (Array.isArray(rawResult) ? { items: rawResult } : null); if (!source) { throw new Error("Model response was not a JSON object."); } const items = (Array.isArray(source.items) ? source.items : []) .map((item, index) => normalizeAnalysisItem(item, index, request, relevantSystems, kb)) .filter(Boolean); if (items.length === 0) { throw new Error("Model response did not contain any valid request items."); } const finiteConfidences = items.map((item) => item.confidence).filter((value) => Number.isFinite(value)); const averageConfidence = finiteConfidences.length > 0 ? finiteConfidences.reduce((total, value) => total + value, 0) / finiteConfidences.length : null; const minimumConfidence = finiteConfidences.length > 0 ? Math.min(...finiteConfidences) : null; return { submissionId: String(source.submissionId || request.id || "").trim() || request.id, sourceText: String(source.sourceText || request.sourceText || "").trim() || request.sourceText, confidence: clampConfidence(source.confidence) ?? averageConfidence, minimumConfidence, items, }; } async function loadKnowledgeBase() { const systemsIndexPath = path.join(kbRoot, "systems.json"); const requestSchemaPath = path.join(kbRoot, "request-analysis-schema.json"); const systemsIndex = JSON.parse(await fs.readFile(systemsIndexPath, "utf8")); const requestSchema = JSON.parse(await fs.readFile(requestSchemaPath, "utf8")); const requestTagDefinitions = await loadRequestTagCatalog(); const docsById = new Map(); for (const system of Array.isArray(systemsIndex.systems) ? systemsIndex.systems : []) { const docPath = path.join(repoRoot, String(system.docPath || "").replace(/\//g, path.sep)); const docText = await fs.readFile(docPath, "utf8").catch(() => ""); docsById.set(String(system.id || ""), docText); } return { systemsIndex, requestSchema, requestTagDefinitions, requestTagLookup: buildRequestTagLookup(requestTagDefinitions), docsById, }; } function buildSystemSearchText(system, docText) { const parts = [ system?.id, system?.name, ...(Array.isArray(system?.aliases) ? system.aliases : []), ...(Array.isArray(system?.tags) ? system.tags : []), ...(Array.isArray(system?.uiSurfaces) ? system.uiSurfaces : []), ...(Array.isArray(system?.keyFiles) ? system.keyFiles : []), ...(Array.isArray(system?.apiEndpoints) ? system.apiEndpoints : []), docText, ]; return parts.join(" ").toLowerCase(); } function pickRelevantSystems(kb, requestText, limit = 4) { const systems = Array.isArray(kb?.systemsIndex?.systems) ? kb.systemsIndex.systems : []; const queryTokens = tokenize(requestText); const requestLower = String(requestText || "").toLowerCase(); const ranked = systems .map((system) => { const docText = kb.docsById.get(String(system.id || "")) || ""; const corpus = buildSystemSearchText(system, docText); let score = 0; for (const token of queryTokens) { if (corpus.includes(token)) { score += 2; } } for (const alias of Array.isArray(system.aliases) ? system.aliases : []) { const normalizedAlias = String(alias || "").trim().toLowerCase(); if (normalizedAlias && requestLower.includes(normalizedAlias)) { score += 5; } } if (String(system.priorityForRequestTriage || "").trim().toLowerCase() === "high") { score += 1; } return { system, docText, score, }; }) .sort((left, right) => right.score - left.score || String(left.system?.name || "").localeCompare(String(right.system?.name || ""))); const positive = ranked.filter((entry) => entry.score > 0).slice(0, limit); if (positive.length > 0) { return positive; } return ranked.slice(0, limit); } function buildPrompt(request, relevantSystems, schema, kb) { const schemaSummary = JSON.stringify(schema, null, 2); const tagCatalogSummary = Array.isArray(kb?.requestTagDefinitions) ? kb.requestTagDefinitions.map((entry) => `- ${entry.label}: ${entry.description || ""}`.trim()).join("\n") : ""; const systemDocs = relevantSystems.map(({ system, docText }) => { return [ `System: ${system.name}`, `System ID: ${system.id}`, `Tags: ${(Array.isArray(system.tags) ? system.tags : []).join(", ")}`, `Key files: ${(Array.isArray(system.keyFiles) ? system.keyFiles : []).join(", ")}`, `API endpoints: ${(Array.isArray(system.apiEndpoints) ? system.apiEndpoints : []).join(", ") || "(none)"}`, docText, ].join("\n"); }).join("\n\n---\n\n"); return [ { role: "system", content: [ "You are processing Worldshaper editor requests.", "Split a submission into one or more atomic requests.", "Ground your decisions in the provided KB systems only.", "Use only the standardized tags listed in the provided tag catalog.", "Do not expose or simulate hidden chain-of-thought. Provide short structured review rationale instead.", "Return only valid JSON.", "Do not wrap the JSON in markdown fences.", "If you are unsure, lower confidence and use statusRecommendation = \"needs_review\".", ].join("\n"), }, { role: "user", content: [ `Submission id: ${request.id}`, "Raw submission:", request.sourceText, "", "Return JSON matching this schema:", schemaSummary, "", "Standardized tags you may use:", tagCatalogSummary, "", "Relevant KB systems:", systemDocs, ].join("\n"), }, ]; } function extractJsonString(text) { const rawText = String(text || "").trim(); if (!rawText) { throw new Error("Model returned empty content."); } const fencedMatch = rawText.match(/```(?:json)?\s*([\s\S]*?)```/i); if (fencedMatch && fencedMatch[1]) { return fencedMatch[1].trim(); } const firstBrace = rawText.indexOf("{"); const lastBrace = rawText.lastIndexOf("}"); if (firstBrace >= 0 && lastBrace > firstBrace) { return rawText.slice(firstBrace, lastBrace + 1); } throw new Error("Could not find JSON object in model response."); } function readMessageContent(messageContent) { if (typeof messageContent === "string") { return messageContent; } if (Array.isArray(messageContent)) { return messageContent .map((part) => { if (typeof part === "string") { return part; } if (part && typeof part === "object" && typeof part.text === "string") { return part.text; } return ""; }) .filter(Boolean) .join("\n"); } return ""; } async function callModelApi(config, messages) { const headers = { "Content-Type": "application/json", }; if (config.apiKey) { headers.Authorization = `Bearer ${config.apiKey}`; } const requestBody = { model: config.model, temperature: 0.2, max_tokens: config.maxTokens, messages, }; if (config.provider === "deepseek") { requestBody.response_format = { type: "json_object" }; requestBody.thinking = { type: config.thinking }; } const payload = await fetchJson(buildUrl(config.modelBaseUrl, "chat/completions"), { method: "POST", headers, body: JSON.stringify(requestBody), }); const text = readMessageContent(payload?.choices?.[0]?.message?.content); return JSON.parse(extractJsonString(text)); } async function getLauncherRequests(config) { return fetchJson(buildUrl(config.apiBase, "/api/launcher-requests")); } async function patchLauncherRequest(config, requestId, body) { return fetchJson(buildUrl(config.apiBase, `/api/launcher-requests/${encodeURIComponent(requestId)}`), { method: "PATCH", headers: buildAdminHeaders(config, { "Content-Type": "application/json", }), body: JSON.stringify(body), }); } async function processLauncherRequestAnalysis(config, requestId, body) { return fetchJson(buildUrl(config.apiBase, `/api/launcher-requests/${encodeURIComponent(requestId)}/process-analysis`), { method: "POST", headers: buildAdminHeaders(config, { "Content-Type": "application/json", }), body: JSON.stringify(body), }); } function shouldPromoteAnalysis(result, config) { return result.items.length > 0 && result.items.every((item) => ( item.statusRecommendation === "active" && Number.isFinite(item.confidence) && item.confidence >= config.promoteThreshold )); } async function markRequestProcessing(config, request) { const now = new Date().toISOString(); return patchLauncherRequest(config, request.id, { analysis: { ...(request.analysis && typeof request.analysis === "object" && !Array.isArray(request.analysis) ? request.analysis : {}), state: "processing", model: config.model, submissionId: request.id, sourceTextSnapshot: request.sourceText, createdAt: request.analysis?.createdAt || now, updatedAt: now, error: "", }, }); } async function markRequestError(config, request, errorMessage) { return processLauncherRequestAnalysis(config, request.id, { action: "error", model: config.model, error: String(errorMessage || "Unknown analysis failure."), analysis: { submissionId: request.id, sourceTextSnapshot: request.sourceText, }, }); } async function markRequestReview(config, request, result) { return processLauncherRequestAnalysis(config, request.id, { action: "review", model: config.model, analysis: { submissionId: request.id, sourceTextSnapshot: request.sourceText, confidence: result.confidence, items: result.items, }, }); } async function promoteRequest(config, request, result) { return processLauncherRequestAnalysis(config, request.id, { action: "promote", model: config.model, analysis: { submissionId: request.id, sourceTextSnapshot: request.sourceText, confidence: result.confidence, items: result.items, }, }); } async function analyzeRequest(config, kb, request) { const relevantSystems = pickRelevantSystems(kb, request.sourceText, 4); const systemNames = relevantSystems.map(({ system }) => system.name); console.log(`Analyzing ${request.id}: ${request.title}`); console.log(` Systems: ${systemNames.join(", ")}`); if (!config.dryRun) { await markRequestProcessing(config, request); } const prompt = buildPrompt(request, relevantSystems, kb.requestSchema, kb); const modelResult = await callModelApi(config, prompt); const normalizedResult = normalizeAnalysisResult(modelResult, request, relevantSystems.map((entry) => entry.system), kb); const action = shouldPromoteAnalysis(normalizedResult, config) ? "promote" : "review"; console.log(` Result: ${normalizedResult.items.length} item(s), action=${action}, confidence=${normalizedResult.confidence ?? "n/a"}`); if (config.dryRun) { console.log(JSON.stringify({ requestId: request.id, action, result: normalizedResult, }, null, 2)); return { action, result: normalizedResult }; } if (action === "promote") { await promoteRequest(config, request, normalizedResult); } else { await markRequestReview(config, request, normalizedResult); } return { action, result: normalizedResult }; } function selectPendingRequests(payload, config) { const requests = Array.isArray(payload?.requests) ? payload.requests : []; const pending = requests.filter((request) => String(request?.status || "").trim().toLowerCase() === "pending"); if (config.requestId) { return pending.filter((request) => String(request?.id || "").trim() === config.requestId); } return pending .filter((request) => { const analysisState = String(request?.analysis?.state || "").trim().toLowerCase(); return !analysisState || analysisState === "unprocessed"; }) .slice(0, Math.max(1, config.limit)); } async function runSinglePass(config, kb) { const payload = await getLauncherRequests(config); const pendingRequests = selectPendingRequests(payload, config); if (pendingRequests.length === 0) { console.log("No eligible pending requests found."); return 0; } let processedCount = 0; for (const request of pendingRequests) { try { await analyzeRequest(config, kb, request); processedCount += 1; } catch (error) { console.error(` Failed to analyze ${request.id}: ${String(error)}`); if (!config.dryRun) { try { await markRequestError(config, request, String(error)); } catch (secondaryError) { console.error(` Failed to record analysis error for ${request.id}: ${String(secondaryError)}`); } } } } return processedCount; } async function main() { const config = finalizeConfig(parseArgs(process.argv.slice(2))); if (config.help) { printHelp(); return; } if (!config.dryRun && !config.model) { throw new Error("Missing local model id. Set REQUEST_ANALYZER_MODEL or pass --model."); } if (!config.dryRun && !config.apiKey) { throw new Error( config.provider === "deepseek" ? "Missing DeepSeek API key. Set DEEPSEEK_API_KEY or REQUEST_ANALYZER_API_KEY." : "Missing model API key. Set REQUEST_ANALYZER_API_KEY or pass --api-key." ); } const kb = await loadKnowledgeBase(); if (config.poll) { console.log(`Watching pending requests every ${config.intervalMs}ms`); // eslint-disable-next-line no-constant-condition while (true) { await runSinglePass(config, kb); await sleep(config.intervalMs); } } await runSinglePass(config, kb); } main().catch((error) => { console.error(String(error)); process.exitCode = 1; });