Jax 8df0a58dfa feat: migrate verified template implementation into main repo
Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-opencode)

Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
2026-03-11 22:10:25 +08:00

221 lines
5.7 KiB
TypeScript

import { randomUUID } from "node:crypto";
import { createServer, type IncomingMessage, type Server, type ServerResponse } from "node:http";
import { resolve } from "node:path";
import { pathToFileURL } from "node:url";
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
import { isInitializeRequest } from "@modelcontextprotocol/sdk/types.js";
import { parseRuntimeConfig } from "./config/index.js";
import { createMcpCore, type McpCore } from "./core/index.js";
import { asError, toErrorPayload } from "./lib/errors.js";
import { createLogger } from "./lib/logger.js";
const logger = createLogger({ name: "mcp-http" });
type SessionState = {
core: McpCore;
transport: StreamableHTTPServerTransport;
};
export type HttpServerOptions = {
host?: string;
port?: number;
};
const MCP_ROUTE = "/mcp";
function readSessionIdHeader(req: IncomingMessage): string | undefined {
const header = req.headers["mcp-session-id"];
if (Array.isArray(header)) {
return header[0];
}
return header;
}
function respondJson(
res: ServerResponse,
statusCode: number,
payload: Record<string, unknown>,
): void {
const body = JSON.stringify(payload);
res.statusCode = statusCode;
res.setHeader("content-type", "application/json");
res.setHeader("content-length", Buffer.byteLength(body));
res.end(body);
}
function respondJsonRpcError(
res: ServerResponse,
statusCode: number,
message: string,
code: number,
): void {
respondJson(res, statusCode, {
jsonrpc: "2.0",
error: {
code,
message,
},
id: null,
});
}
async function readJsonBody(req: IncomingMessage): Promise<unknown> {
const chunks: Buffer[] = [];
for await (const chunk of req) {
chunks.push(typeof chunk === "string" ? Buffer.from(chunk) : chunk);
}
if (chunks.length === 0) {
return undefined;
}
const raw = Buffer.concat(chunks).toString("utf8").trim();
if (raw.length === 0) {
return undefined;
}
try {
return JSON.parse(raw);
} catch {
throw new Error("Invalid JSON body");
}
}
async function handleMcpRequest(
req: IncomingMessage,
res: ServerResponse,
sessions: Map<string, SessionState>,
): Promise<void> {
const method = req.method ?? "GET";
const parsedBody = method === "POST" ? await readJsonBody(req) : undefined;
const sessionId = readSessionIdHeader(req);
if (sessionId !== undefined) {
const existing = sessions.get(sessionId);
if (existing === undefined) {
respondJsonRpcError(res, 404, "Session not found", -32001);
return;
}
await existing.transport.handleRequest(req, res, parsedBody);
return;
}
if (method === "POST" && isInitializeRequest(parsedBody)) {
const core = createMcpCore();
const state: SessionState = {
core,
transport: new StreamableHTTPServerTransport({
sessionIdGenerator: () => randomUUID(),
onsessioninitialized: (newSessionId) => {
sessions.set(newSessionId, state);
},
}),
};
state.transport.onclose = () => {
if (state.transport.sessionId !== undefined) {
sessions.delete(state.transport.sessionId);
}
};
await core.server.connect(state.transport);
await state.transport.handleRequest(req, res, parsedBody);
return;
}
respondJsonRpcError(res, 400, "Bad Request: No valid session ID provided", -32000);
}
async function handleHttpRequest(
req: IncomingMessage,
res: ServerResponse,
sessions: Map<string, SessionState>,
): Promise<void> {
const path = new URL(req.url ?? "/", `http://${req.headers.host ?? "localhost"}`).pathname;
if (path !== MCP_ROUTE) {
respondJson(res, 404, {
error: "Not Found",
message: `Route '${path}' is not available. Use '${MCP_ROUTE}'.`,
});
return;
}
const method = req.method ?? "GET";
if (method !== "GET" && method !== "POST" && method !== "DELETE") {
respondJson(res, 405, {
error: "Method Not Allowed",
message: `Method '${method}' is not supported on '${MCP_ROUTE}'.`,
});
return;
}
await handleMcpRequest(req, res, sessions);
}
export async function startHttpServer(options: HttpServerOptions = {}): Promise<Server> {
const runtime = parseRuntimeConfig();
const host = options.host ?? "127.0.0.1";
const port = options.port ?? runtime.port;
const sessions = new Map<string, SessionState>();
const server = createServer((req, res) => {
void handleHttpRequest(req, res, sessions).catch((error: unknown) => {
logger.error("Failed to process HTTP request", toErrorPayload(asError(error)));
if (!res.headersSent) {
respondJsonRpcError(res, 500, "Internal server error", -32603);
}
});
});
server.on("close", () => {
for (const state of sessions.values()) {
void state.transport.close().catch(() => {
logger.warn("Failed to close streamable HTTP transport");
});
}
sessions.clear();
});
await new Promise<void>((resolvePromise, reject) => {
server.once("error", reject);
server.listen(port, host, () => {
server.off("error", reject);
resolvePromise();
});
});
logger.info("MCP HTTP server started", {
host,
port,
route: MCP_ROUTE,
});
return server;
}
export async function runHttpEntrypoint(): Promise<void> {
try {
await startHttpServer();
} catch (error: unknown) {
logger.error("Fatal HTTP startup error", toErrorPayload(asError(error)));
process.exitCode = 1;
}
}
function isDirectExecution(): boolean {
const entryPath = process.argv[1];
if (entryPath === undefined) {
return false;
}
return import.meta.url === pathToFileURL(resolve(entryPath)).href;
}
if (isDirectExecution()) {
void runHttpEntrypoint();
}