Worldshaper/scripts/request-analysis-worker.mjs

694 lines
24 KiB
JavaScript

import fs from "fs/promises";
import path from "path";
import { fileURLToPath } from "url";
const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);
const repoRoot = path.resolve(__dirname, "..");
const kbRoot = path.join(repoRoot, "docs", "kb");
const DEFAULT_DEEPSEEK_BASE_URL = "https://api.deepseek.com";
const DEFAULT_DEEPSEEK_MODEL = "deepseek-v4-flash";
const DEFAULT_PROVIDER = process.env.REQUEST_ANALYZER_PROVIDER
|| (String(process.env.DEEPSEEK_API_KEY || "").trim() ? "deepseek" : "openai-compatible");
const DEFAULTS = {
apiBase: process.env.REQUEST_ANALYZER_API_BASE || `http://127.0.0.1:${process.env.PORT || "5180"}`,
provider: DEFAULT_PROVIDER,
modelBaseUrl: process.env.REQUEST_ANALYZER_MODEL_BASE_URL || "",
model: process.env.REQUEST_ANALYZER_MODEL || "",
apiKey: process.env.REQUEST_ANALYZER_API_KEY || process.env.DEEPSEEK_API_KEY || "",
adminPassword: process.env.REQUEST_ANALYZER_ADMIN_PASSWORD || process.env.LAUNCHER_ADMIN_PASSWORD || "",
limit: Number(process.env.REQUEST_ANALYZER_LIMIT || 5),
promoteThreshold: Number(process.env.REQUEST_ANALYZER_PROMOTE_THRESHOLD || 0.85),
maxTokens: Math.max(512, Number(process.env.REQUEST_ANALYZER_MAX_TOKENS || 4000)),
thinking: String(process.env.REQUEST_ANALYZER_THINKING || "disabled").trim().toLowerCase() === "enabled"
? "enabled"
: "disabled",
poll: false,
intervalMs: Number(process.env.REQUEST_ANALYZER_INTERVAL_MS || 30000),
dryRun: false,
requestId: "",
};
function printHelp() {
console.log(`Worldshaper request analysis worker
Usage:
npm run analyze:requests -- [options]
Options:
--provider <name> Model provider: deepseek or openai-compatible.
--request-id <id> Analyze one pending request only.
--limit <n> Maximum pending requests to process in one pass.
--poll Keep polling for new pending requests.
--interval-ms <ms> Poll interval when --poll is enabled.
--dry-run Do not write anything back to the API.
--api-base <url> Worldshaper API base URL.
--model-base-url <url> Model API base URL. Defaults to DeepSeek when provider=deepseek.
--model <id> Model id to use. Defaults to ${DEFAULT_DEEPSEEK_MODEL} for DeepSeek.
--api-key <key> API key for the model endpoint.
--max-tokens <n> Max output tokens for the model response.
--thinking <mode> DeepSeek thinking mode: enabled or disabled.
--promote-threshold <n> Minimum per-item confidence for auto-promotion.
--help Show this message.
Environment variables:
REQUEST_ANALYZER_API_BASE
REQUEST_ANALYZER_PROVIDER
REQUEST_ANALYZER_MODEL_BASE_URL
REQUEST_ANALYZER_MODEL
REQUEST_ANALYZER_API_KEY
REQUEST_ANALYZER_ADMIN_PASSWORD
REQUEST_ANALYZER_MAX_TOKENS
REQUEST_ANALYZER_THINKING
REQUEST_ANALYZER_LIMIT
REQUEST_ANALYZER_INTERVAL_MS
REQUEST_ANALYZER_PROMOTE_THRESHOLD
DEEPSEEK_API_KEY
LAUNCHER_ADMIN_PASSWORD
Notes:
- DeepSeek uses ${DEFAULT_DEEPSEEK_BASE_URL}/chat/completions.
- DeepSeek JSON mode is enabled automatically for this worker.
- Run the Worldshaper API server first so the worker can read and patch requests.
`);
}
function parseArgs(argv) {
const config = { ...DEFAULTS };
for (let index = 0; index < argv.length; index += 1) {
const arg = String(argv[index] || "").trim();
if (!arg) {
continue;
}
if (arg === "--help" || arg === "-h") {
config.help = true;
continue;
}
if (arg === "--provider") {
config.provider = String(argv[index + 1] || "").trim().toLowerCase() || config.provider;
index += 1;
continue;
}
if (arg === "--poll") {
config.poll = true;
continue;
}
if (arg === "--dry-run") {
config.dryRun = true;
continue;
}
if (arg === "--request-id") {
config.requestId = String(argv[index + 1] || "").trim();
index += 1;
continue;
}
if (arg === "--limit") {
config.limit = Math.max(1, Math.floor(Number(argv[index + 1]) || config.limit));
index += 1;
continue;
}
if (arg === "--interval-ms") {
config.intervalMs = Math.max(1000, Math.floor(Number(argv[index + 1]) || config.intervalMs));
index += 1;
continue;
}
if (arg === "--api-base") {
config.apiBase = String(argv[index + 1] || "").trim() || config.apiBase;
index += 1;
continue;
}
if (arg === "--model-base-url") {
config.modelBaseUrl = String(argv[index + 1] || "").trim() || config.modelBaseUrl;
index += 1;
continue;
}
if (arg === "--model") {
config.model = String(argv[index + 1] || "").trim() || config.model;
index += 1;
continue;
}
if (arg === "--api-key") {
config.apiKey = String(argv[index + 1] || "").trim() || config.apiKey;
index += 1;
continue;
}
if (arg === "--max-tokens") {
config.maxTokens = Math.max(512, Math.floor(Number(argv[index + 1]) || config.maxTokens));
index += 1;
continue;
}
if (arg === "--thinking") {
const thinkingValue = String(argv[index + 1] || "").trim().toLowerCase();
config.thinking = thinkingValue === "enabled" ? "enabled" : "disabled";
index += 1;
continue;
}
if (arg === "--promote-threshold") {
const parsed = Number(argv[index + 1]);
if (Number.isFinite(parsed)) {
config.promoteThreshold = Math.max(0, Math.min(1, parsed));
}
index += 1;
continue;
}
throw new Error(`Unknown option: ${arg}`);
}
return config;
}
function finalizeConfig(config) {
const provider = String(config.provider || "").trim().toLowerCase();
const nextConfig = {
...config,
provider: provider === "deepseek" ? "deepseek" : "openai-compatible",
};
if (!String(nextConfig.modelBaseUrl || "").trim()) {
nextConfig.modelBaseUrl = nextConfig.provider === "deepseek"
? DEFAULT_DEEPSEEK_BASE_URL
: "http://127.0.0.1:1234/v1";
}
if (!String(nextConfig.model || "").trim() && nextConfig.provider === "deepseek") {
nextConfig.model = DEFAULT_DEEPSEEK_MODEL;
}
return nextConfig;
}
function sleep(ms) {
return new Promise((resolve) => {
setTimeout(resolve, ms);
});
}
function withTrailingSlash(value) {
return String(value || "").endsWith("/") ? String(value || "") : `${String(value || "")}/`;
}
function buildUrl(base, pathname) {
return new URL(String(pathname || "").replace(/^\//, ""), withTrailingSlash(base)).toString();
}
async function fetchJson(url, init = {}) {
const response = await fetch(url, init);
if (!response.ok) {
const responseText = await response.text().catch(() => "");
throw new Error(`${response.status} ${response.statusText}${responseText ? `: ${responseText.slice(0, 240)}` : ""}`);
}
return response.json();
}
function buildAdminHeaders(config, baseHeaders = {}) {
const nextHeaders = {
...baseHeaders,
};
const adminPassword = String(config?.adminPassword || "").trim();
if (adminPassword) {
nextHeaders["x-worldshaper-admin-password"] = adminPassword;
}
return nextHeaders;
}
function tokenize(value) {
return String(value || "")
.toLowerCase()
.split(/[^a-z0-9_/-]+/i)
.map((entry) => entry.trim())
.filter((entry) => entry.length >= 2);
}
function uniqueStrings(values) {
const seen = new Set();
const next = [];
for (const value of Array.isArray(values) ? values : []) {
const normalized = String(value || "").replace(/\s+/g, " ").trim();
const key = normalized.toLowerCase();
if (!normalized || seen.has(key)) {
continue;
}
seen.add(key);
next.push(normalized);
}
return next;
}
function clampConfidence(value) {
const parsed = Number(value);
if (!Number.isFinite(parsed)) {
return null;
}
return Math.max(0, Math.min(1, parsed));
}
function normalizeItemStatusRecommendation(value) {
const normalized = String(value || "").trim().toLowerCase();
if (normalized === "active" || normalized === "duplicate" || normalized === "blocked" || normalized === "needs_review") {
return normalized;
}
return "needs_review";
}
function normalizeProblemType(value) {
const normalized = String(value || "").trim().toLowerCase();
if (normalized === "feature" || normalized === "bug" || normalized === "workflow" || normalized === "performance" || normalized === "ux" || normalized === "content") {
return normalized;
}
return "unknown";
}
function buildFallbackTitle(text, fallback = "Pending request") {
const normalized = String(text || "").replace(/\s+/g, " ").trim();
if (!normalized) {
return fallback;
}
const firstSentence = normalized.split(/[\r\n.!?]+/).map((entry) => entry.trim()).find(Boolean) || normalized;
return firstSentence.length > 72 ? `${firstSentence.slice(0, 69).trim()}...` : firstSentence;
}
function normalizeAnalysisItem(rawItem, index = 0, request = null, relevantSystems = []) {
const source = rawItem && typeof rawItem === "object" && !Array.isArray(rawItem)
? rawItem
: null;
if (!source) {
return null;
}
const fallbackText = String(source.rawExcerpt || request?.sourceText || "").trim();
const title = String(source.title || "").trim() || buildFallbackTitle(fallbackText, `Analyzed request ${index + 1}`);
const parsedInterpretation = String(source.parsedInterpretation || source.summary || "").trim();
const implementationApproach = String(source.implementationApproach || source.implementationNotes || "").trim();
if (!title || !parsedInterpretation || !implementationApproach) {
return null;
}
const primaryCategory = String(source.primaryCategory || source.category || "").trim()
|| String(relevantSystems[0]?.name || "Unsorted");
const affectedSystems = uniqueStrings(source.affectedSystems);
const defaultTags = uniqueStrings([
primaryCategory,
...affectedSystems,
]);
const tags = uniqueStrings(Array.isArray(source.tags) ? source.tags : defaultTags);
return {
title,
primaryCategory,
tags: tags.length > 0 ? tags : ["Unsorted"],
statusRecommendation: normalizeItemStatusRecommendation(source.statusRecommendation || source.status),
parsedInterpretation,
implementationApproach,
affectedSystems,
affectedFiles: uniqueStrings(source.affectedFiles),
problemType: normalizeProblemType(source.problemType),
rawExcerpt: String(source.rawExcerpt || "").trim(),
confidence: clampConfidence(source.confidence),
notes: String(source.notes || "").trim(),
};
}
function normalizeAnalysisResult(rawResult, request, relevantSystems) {
const source = rawResult && typeof rawResult === "object" && !Array.isArray(rawResult)
? rawResult
: (Array.isArray(rawResult) ? { items: rawResult } : null);
if (!source) {
throw new Error("Model response was not a JSON object.");
}
const items = (Array.isArray(source.items) ? source.items : [])
.map((item, index) => normalizeAnalysisItem(item, index, request, relevantSystems))
.filter(Boolean);
if (items.length === 0) {
throw new Error("Model response did not contain any valid request items.");
}
const finiteConfidences = items.map((item) => item.confidence).filter((value) => Number.isFinite(value));
const averageConfidence = finiteConfidences.length > 0
? finiteConfidences.reduce((total, value) => total + value, 0) / finiteConfidences.length
: null;
const minimumConfidence = finiteConfidences.length > 0
? Math.min(...finiteConfidences)
: null;
return {
submissionId: String(source.submissionId || request.id || "").trim() || request.id,
sourceText: String(source.sourceText || request.sourceText || "").trim() || request.sourceText,
confidence: clampConfidence(source.confidence) ?? averageConfidence,
minimumConfidence,
items,
};
}
async function loadKnowledgeBase() {
const systemsIndexPath = path.join(kbRoot, "systems.json");
const requestSchemaPath = path.join(kbRoot, "request-analysis-schema.json");
const systemsIndex = JSON.parse(await fs.readFile(systemsIndexPath, "utf8"));
const requestSchema = JSON.parse(await fs.readFile(requestSchemaPath, "utf8"));
const docsById = new Map();
for (const system of Array.isArray(systemsIndex.systems) ? systemsIndex.systems : []) {
const docPath = path.join(repoRoot, String(system.docPath || "").replace(/\//g, path.sep));
const docText = await fs.readFile(docPath, "utf8").catch(() => "");
docsById.set(String(system.id || ""), docText);
}
return {
systemsIndex,
requestSchema,
docsById,
};
}
function buildSystemSearchText(system, docText) {
const parts = [
system?.id,
system?.name,
...(Array.isArray(system?.aliases) ? system.aliases : []),
...(Array.isArray(system?.tags) ? system.tags : []),
...(Array.isArray(system?.uiSurfaces) ? system.uiSurfaces : []),
...(Array.isArray(system?.keyFiles) ? system.keyFiles : []),
...(Array.isArray(system?.apiEndpoints) ? system.apiEndpoints : []),
docText,
];
return parts.join(" ").toLowerCase();
}
function pickRelevantSystems(kb, requestText, limit = 4) {
const systems = Array.isArray(kb?.systemsIndex?.systems) ? kb.systemsIndex.systems : [];
const queryTokens = tokenize(requestText);
const requestLower = String(requestText || "").toLowerCase();
const ranked = systems
.map((system) => {
const docText = kb.docsById.get(String(system.id || "")) || "";
const corpus = buildSystemSearchText(system, docText);
let score = 0;
for (const token of queryTokens) {
if (corpus.includes(token)) {
score += 2;
}
}
for (const alias of Array.isArray(system.aliases) ? system.aliases : []) {
const normalizedAlias = String(alias || "").trim().toLowerCase();
if (normalizedAlias && requestLower.includes(normalizedAlias)) {
score += 5;
}
}
if (String(system.priorityForRequestTriage || "").trim().toLowerCase() === "high") {
score += 1;
}
return {
system,
docText,
score,
};
})
.sort((left, right) => right.score - left.score || String(left.system?.name || "").localeCompare(String(right.system?.name || "")));
const positive = ranked.filter((entry) => entry.score > 0).slice(0, limit);
if (positive.length > 0) {
return positive;
}
return ranked.slice(0, limit);
}
function buildPrompt(request, relevantSystems, schema) {
const schemaSummary = JSON.stringify(schema, null, 2);
const systemDocs = relevantSystems.map(({ system, docText }) => {
return [
`System: ${system.name}`,
`System ID: ${system.id}`,
`Tags: ${(Array.isArray(system.tags) ? system.tags : []).join(", ")}`,
`Key files: ${(Array.isArray(system.keyFiles) ? system.keyFiles : []).join(", ")}`,
`API endpoints: ${(Array.isArray(system.apiEndpoints) ? system.apiEndpoints : []).join(", ") || "(none)"}`,
docText,
].join("\n");
}).join("\n\n---\n\n");
return [
{
role: "system",
content: [
"You are processing Worldshaper editor requests.",
"Split a submission into one or more atomic requests.",
"Ground your decisions in the provided KB systems only.",
"Return only valid JSON.",
"Do not wrap the JSON in markdown fences.",
"If you are unsure, lower confidence and use statusRecommendation = \"needs_review\".",
].join("\n"),
},
{
role: "user",
content: [
`Submission id: ${request.id}`,
"Raw submission:",
request.sourceText,
"",
"Return JSON matching this schema:",
schemaSummary,
"",
"Relevant KB systems:",
systemDocs,
].join("\n"),
},
];
}
function extractJsonString(text) {
const rawText = String(text || "").trim();
if (!rawText) {
throw new Error("Model returned empty content.");
}
const fencedMatch = rawText.match(/```(?:json)?\s*([\s\S]*?)```/i);
if (fencedMatch && fencedMatch[1]) {
return fencedMatch[1].trim();
}
const firstBrace = rawText.indexOf("{");
const lastBrace = rawText.lastIndexOf("}");
if (firstBrace >= 0 && lastBrace > firstBrace) {
return rawText.slice(firstBrace, lastBrace + 1);
}
throw new Error("Could not find JSON object in model response.");
}
function readMessageContent(messageContent) {
if (typeof messageContent === "string") {
return messageContent;
}
if (Array.isArray(messageContent)) {
return messageContent
.map((part) => {
if (typeof part === "string") {
return part;
}
if (part && typeof part === "object" && typeof part.text === "string") {
return part.text;
}
return "";
})
.filter(Boolean)
.join("\n");
}
return "";
}
async function callModelApi(config, messages) {
const headers = {
"Content-Type": "application/json",
};
if (config.apiKey) {
headers.Authorization = `Bearer ${config.apiKey}`;
}
const requestBody = {
model: config.model,
temperature: 0.2,
max_tokens: config.maxTokens,
messages,
};
if (config.provider === "deepseek") {
requestBody.response_format = { type: "json_object" };
requestBody.thinking = { type: config.thinking };
}
const payload = await fetchJson(buildUrl(config.modelBaseUrl, "chat/completions"), {
method: "POST",
headers,
body: JSON.stringify(requestBody),
});
const text = readMessageContent(payload?.choices?.[0]?.message?.content);
return JSON.parse(extractJsonString(text));
}
async function getLauncherRequests(config) {
return fetchJson(buildUrl(config.apiBase, "/api/launcher-requests"));
}
async function patchLauncherRequest(config, requestId, body) {
return fetchJson(buildUrl(config.apiBase, `/api/launcher-requests/${encodeURIComponent(requestId)}`), {
method: "PATCH",
headers: buildAdminHeaders(config, {
"Content-Type": "application/json",
}),
body: JSON.stringify(body),
});
}
async function processLauncherRequestAnalysis(config, requestId, body) {
return fetchJson(buildUrl(config.apiBase, `/api/launcher-requests/${encodeURIComponent(requestId)}/process-analysis`), {
method: "POST",
headers: buildAdminHeaders(config, {
"Content-Type": "application/json",
}),
body: JSON.stringify(body),
});
}
function shouldPromoteAnalysis(result, config) {
return result.items.length > 0 && result.items.every((item) => (
item.statusRecommendation === "active"
&& Number.isFinite(item.confidence)
&& item.confidence >= config.promoteThreshold
));
}
async function markRequestProcessing(config, request) {
const now = new Date().toISOString();
return patchLauncherRequest(config, request.id, {
analysis: {
...(request.analysis && typeof request.analysis === "object" && !Array.isArray(request.analysis) ? request.analysis : {}),
state: "processing",
model: config.model,
submissionId: request.id,
sourceTextSnapshot: request.sourceText,
createdAt: request.analysis?.createdAt || now,
updatedAt: now,
error: "",
},
});
}
async function markRequestError(config, request, errorMessage) {
return processLauncherRequestAnalysis(config, request.id, {
action: "error",
model: config.model,
error: String(errorMessage || "Unknown analysis failure."),
analysis: {
submissionId: request.id,
sourceTextSnapshot: request.sourceText,
},
});
}
async function markRequestReview(config, request, result) {
return processLauncherRequestAnalysis(config, request.id, {
action: "review",
model: config.model,
analysis: {
submissionId: request.id,
sourceTextSnapshot: request.sourceText,
confidence: result.confidence,
items: result.items,
},
});
}
async function promoteRequest(config, request, result) {
return processLauncherRequestAnalysis(config, request.id, {
action: "promote",
model: config.model,
analysis: {
submissionId: request.id,
sourceTextSnapshot: request.sourceText,
confidence: result.confidence,
items: result.items,
},
});
}
async function analyzeRequest(config, kb, request) {
const relevantSystems = pickRelevantSystems(kb, request.sourceText, 4);
const systemNames = relevantSystems.map(({ system }) => system.name);
console.log(`Analyzing ${request.id}: ${request.title}`);
console.log(` Systems: ${systemNames.join(", ")}`);
if (!config.dryRun) {
await markRequestProcessing(config, request);
}
const prompt = buildPrompt(request, relevantSystems, kb.requestSchema);
const modelResult = await callModelApi(config, prompt);
const normalizedResult = normalizeAnalysisResult(modelResult, request, relevantSystems.map((entry) => entry.system));
const action = shouldPromoteAnalysis(normalizedResult, config) ? "promote" : "review";
console.log(` Result: ${normalizedResult.items.length} item(s), action=${action}, confidence=${normalizedResult.confidence ?? "n/a"}`);
if (config.dryRun) {
console.log(JSON.stringify({
requestId: request.id,
action,
result: normalizedResult,
}, null, 2));
return { action, result: normalizedResult };
}
if (action === "promote") {
await promoteRequest(config, request, normalizedResult);
} else {
await markRequestReview(config, request, normalizedResult);
}
return { action, result: normalizedResult };
}
function selectPendingRequests(payload, config) {
const requests = Array.isArray(payload?.requests) ? payload.requests : [];
const pending = requests.filter((request) => String(request?.status || "").trim().toLowerCase() === "pending");
if (config.requestId) {
return pending.filter((request) => String(request?.id || "").trim() === config.requestId);
}
return pending
.filter((request) => {
const analysisState = String(request?.analysis?.state || "").trim().toLowerCase();
return !analysisState || analysisState === "unprocessed";
})
.slice(0, Math.max(1, config.limit));
}
async function runSinglePass(config, kb) {
const payload = await getLauncherRequests(config);
const pendingRequests = selectPendingRequests(payload, config);
if (pendingRequests.length === 0) {
console.log("No eligible pending requests found.");
return 0;
}
let processedCount = 0;
for (const request of pendingRequests) {
try {
await analyzeRequest(config, kb, request);
processedCount += 1;
} catch (error) {
console.error(` Failed to analyze ${request.id}: ${String(error)}`);
if (!config.dryRun) {
try {
await markRequestError(config, request, String(error));
} catch (secondaryError) {
console.error(` Failed to record analysis error for ${request.id}: ${String(secondaryError)}`);
}
}
}
}
return processedCount;
}
async function main() {
const config = finalizeConfig(parseArgs(process.argv.slice(2)));
if (config.help) {
printHelp();
return;
}
if (!config.dryRun && !config.model) {
throw new Error("Missing local model id. Set REQUEST_ANALYZER_MODEL or pass --model.");
}
if (!config.dryRun && !config.apiKey) {
throw new Error(
config.provider === "deepseek"
? "Missing DeepSeek API key. Set DEEPSEEK_API_KEY or REQUEST_ANALYZER_API_KEY."
: "Missing model API key. Set REQUEST_ANALYZER_API_KEY or pass --api-key."
);
}
const kb = await loadKnowledgeBase();
if (config.poll) {
console.log(`Watching pending requests every ${config.intervalMs}ms`);
// eslint-disable-next-line no-constant-condition
while (true) {
await runSinglePass(config, kb);
await sleep(config.intervalMs);
}
}
await runSinglePass(config, kb);
}
main().catch((error) => {
console.error(String(error));
process.exitCode = 1;
});