flowith2api / server.js
dotaevo's picture
Update server.js
59125cd verified
const crypto = require("node:crypto");
const express = require("express");
const app = express();
app.disable("x-powered-by");
function openAiError(res, status, message, type = "invalid_request_error", code = null) {
return res.status(status).json({
error: { message, type, param: null, code }
});
}
function parseAuthorizationHeader(req) {
const raw = req.headers.authorization;
if (!raw || typeof raw !== "string") return { ok: false, error: "Missing Authorization header." };
if (!raw.startsWith("Bearer ")) return { ok: false, error: "Authorization must start with 'Bearer '." };
const token = raw.slice("Bearer ".length).trim();
const [apiKey, repoId, ...rest] = token.split("|");
if (!apiKey || !repoId || rest.length > 0) {
return { ok: false, error: "Authorization token must be 'Bearer <apiKey>|<repoId>'." };
}
return { ok: true, apiKey, repoId };
}
app.use((req, _res, next) => {
// Requirement: read Authorization before handling any request.
req.flowithAuth = parseAuthorizationHeader(req);
next();
});
app.use(express.json({ limit: "2mb" }));
app.get("/", (_req, res) => {
res.status(200).type("html").send(`<!doctype html>
<html lang="en">
<head><meta charset="utf-8" /><meta name="viewport" content="width=device-width, initial-scale=1" /><title>health</title></head>
<body><h1>ok</h1><p>flowith2api is running.</p></body>
</html>`);
});
app.get("/v1/models", async (req, res) => {
if (!req.flowithAuth.ok) return openAiError(res, 401, req.flowithAuth.error, "authentication_error");
const { apiKey } = req.flowithAuth;
try {
const upstream = await fetch("https://edge.flowith.net/external/use/knowledge-base/models", {
method: "GET",
headers: {
Authorization: `Bearer ${apiKey}`,
"Content-Type": "application/json",
Host: "edge.flowith.net"
}
});
const text = await upstream.text();
if (!upstream.ok) {
return openAiError(res, upstream.status, `Upstream error: ${text || upstream.statusText}`, "upstream_error");
}
let data;
try {
data = JSON.parse(text);
} catch {
return openAiError(res, 502, "Upstream returned non-JSON response.", "upstream_error");
}
const models = Array.isArray(data?.models) ? data.models : [];
const created = Math.floor(Date.now() / 1000);
return res.json({
object: "list",
data: models.map((id) => ({ id, object: "model", created, owned_by: "flowith" }))
});
} catch (err) {
return openAiError(res, 502, `Upstream request failed: ${err?.message || String(err)}`, "upstream_error");
}
});
function writeSse(res, data) {
if (res.writableEnded || res.destroyed) return;
res.write(`data: ${data}\n\n`);
}
async function proxyChatCompletionsStream({ req, res, apiKey, repoId, body }) {
const created = Math.floor(Date.now() / 1000);
const id = `chatcmpl-${crypto.randomUUID().replace(/-/g, "")}`;
const model = body?.model || "unknown";
const upstream = await fetch("https://edge.flowith.net/external/use/knowledge-base/seek", {
method: "POST",
headers: {
Authorization: `Bearer ${apiKey}`,
"Content-Type": "application/json",
Host: "edge.flowith.net"
},
body: JSON.stringify({
messages: body?.messages,
model: body?.model,
stream: true,
kb_list: [repoId]
})
});
if (!upstream.ok) {
const text = await upstream.text().catch(() => "");
return openAiError(res, upstream.status, `Upstream error: ${text || upstream.statusText}`, "upstream_error");
}
res.status(200);
res.setHeader("Content-Type", "text/event-stream; charset=utf-8");
res.setHeader("Cache-Control", "no-cache, no-transform");
res.setHeader("Connection", "keep-alive");
res.flushHeaders?.();
let buffer = "";
let sentRole = false;
let finished = false;
const sendDelta = (content) => {
if (typeof content !== "string" || content.length === 0) return;
const chunk = {
id,
object: "chat.completion.chunk",
created,
model,
choices: [
{
index: 0,
delta: sentRole ? { content } : { role: "assistant", content },
finish_reason: null
}
]
};
sentRole = true;
writeSse(res, JSON.stringify(chunk));
};
const sendStop = () => {
if (finished) return;
finished = true;
const chunk = {
id,
object: "chat.completion.chunk",
created,
model,
choices: [{ index: 0, delta: {}, finish_reason: "stop" }]
};
writeSse(res, JSON.stringify(chunk));
writeSse(res, "[DONE]");
if (!res.writableEnded) res.end();
};
try {
for await (const part of upstream.body) {
buffer += Buffer.from(part).toString("utf8");
while (true) {
const idx = buffer.indexOf("\n");
if (idx === -1) break;
const line = buffer.slice(0, idx).trimEnd();
buffer = buffer.slice(idx + 1);
if (!line) continue;
if (!line.startsWith("data:")) continue;
const payload = line.slice("data:".length).trim();
if (payload === "[DONE]") {
sendStop();
return;
}
let parsed;
try {
parsed = JSON.parse(payload);
} catch {
continue;
}
// Requirement: every chunk's `content` is the chat content we should stream back.
if (parsed?.tag === 'final' && typeof parsed?.content === "string") sendDelta(parsed.content);
}
}
} catch (err) {
// if (controller.signal.aborted) return;
const errChunk = {
id,
object: "chat.completion.chunk",
created,
model,
choices: [{ index: 0, delta: {}, finish_reason: "error" }]
};
writeSse(res, JSON.stringify(errChunk));
} finally {
sendStop();
}
}
app.post("/v1/chat/completions", async (req, res) => {
if (!req.flowithAuth.ok) return openAiError(res, 401, req.flowithAuth.error, "authentication_error");
const { apiKey, repoId } = req.flowithAuth;
const body = req.body || {};
const stream = Boolean(body.stream);
if (!Array.isArray(body.messages)) {
return openAiError(res, 400, "Invalid request: body.messages must be an array.", "invalid_request_error");
}
if (typeof body.model !== "string" || body.model.length === 0) {
return openAiError(res, 400, "Invalid request: body.model must be a non-empty string.", "invalid_request_error");
}
if (stream) {
try {
await proxyChatCompletionsStream({ req, res, apiKey, repoId, body });
} catch (err) {
return openAiError(res, 502, `Upstream request failed: ${err?.message || String(err)}`, "upstream_error");
}
return;
}
try {
const upstream = await fetch("https://edge.flowith.net/external/use/knowledge-base/seek", {
method: "POST",
headers: {
Authorization: `Bearer ${apiKey}`,
"Content-Type": "application/json",
Host: "edge.flowith.net"
},
body: JSON.stringify({
messages: body.messages,
model: body.model,
stream: false,
kb_list: [repoId]
})
});
const text = await upstream.text();
if (!upstream.ok) {
return openAiError(res, upstream.status, `Upstream error: ${text || upstream.statusText}`, "upstream_error");
}
let data;
try {
data = JSON.parse(text);
} catch {
return openAiError(res, 502, "Upstream returned non-JSON response.", "upstream_error");
}
const content = typeof data?.content === "string" ? data.content : "";
const created = Math.floor(Date.now() / 1000);
const id = `chatcmpl-${crypto.randomUUID().replace(/-/g, "")}`;
return res.json({
id,
object: "chat.completion",
created,
model: body.model,
choices: [
{
index: 0,
message: { role: "assistant", content },
finish_reason: "stop"
}
]
});
} catch (err) {
return openAiError(res, 502, `Upstream request failed: ${err?.message || String(err)}`, "upstream_error");
}
});
app.use((req, res) => openAiError(res, 404, `Not found: ${req.method} ${req.path}`, "invalid_request_error"));
const port = Number.parseInt(process.env.PORT || "4400", 10);
app.listen(port, () => {
console.log(`flowith2api listening on :${port}`);
});