zrcadlo https://github.com/atscan/atscan
firehose updates
This commit is contained in:
rodič
a8c48dca4c
revize
ba593bd9f7
|
@ -0,0 +1,90 @@
|
|||
import {
|
||||
ComAtprotoSyncSubscribeRepos,
|
||||
subscribeRepos,
|
||||
} from "npm:atproto-firehose";
|
||||
|
||||
import * as atprotoApi from "npm:@atproto/api";
|
||||
|
||||
const HTTP_PORT = Deno.env.get("ATSCAN_FIREHOSE_PORT") || "6990";
|
||||
const SERVER = Deno.env.get("ATSCAN_FIREHOSE_SERVER") || "hex";
|
||||
const BGS_HOSTNAME = Deno.env.get("ATSCAN_FIREHOSE_BGS_HOSTNAME") ||
|
||||
"bsky.social";
|
||||
const TICK_REPO = Deno.env.get("ATSCAN_FIREHOSE_TICK_REPO") ||
|
||||
"did:plc:pzovq4a22hpji6pfzofgk7gc";
|
||||
const TICK_ARRAY_SIZE = parseInt(
|
||||
Deno.env.get("ATSCAN_FIREHOSE_TICK_ARRAY_SIZE") || 5,
|
||||
);
|
||||
//const TICK_POST_PATH = Deno.env.get('ATSCAN_FIREHOSE_TICK_POST') || 'app.bsky.feed.post/3k2bwjgozws2q';
|
||||
|
||||
import { Application, Router } from "https://deno.land/x/oak/mod.ts";
|
||||
|
||||
const app = new Application();
|
||||
const router = new Router();
|
||||
|
||||
let totalCommits = 0;
|
||||
const counters = {};
|
||||
const postLatency = [];
|
||||
|
||||
const client = subscribeRepos(`wss://${BGS_HOSTNAME}`, { decodeRepoOps: true });
|
||||
client.on("message", (m) => {
|
||||
const receivedTime = new Date();
|
||||
if (ComAtprotoSyncSubscribeRepos.isHandle(m)) {
|
||||
console.log("handle", m);
|
||||
}
|
||||
if (ComAtprotoSyncSubscribeRepos.isCommit(m)) {
|
||||
m.ops.forEach(async (op) => {
|
||||
if (!counters[op.action]) {
|
||||
counters[op.action] = {};
|
||||
}
|
||||
let type = op.payload?.$type;
|
||||
if (op.action === "delete") {
|
||||
const delMatch = op.path.match(/^([^\/]+)\//);
|
||||
if (delMatch) {
|
||||
type = delMatch[1];
|
||||
}
|
||||
}
|
||||
if (
|
||||
m.repo === TICK_REPO && op.path.startsWith("app.bsky.feed.post/") &&
|
||||
op.action === "create"
|
||||
) {
|
||||
const postDate = new Date(op.payload.createdAt);
|
||||
const latency = receivedTime - postDate;
|
||||
if (postLatency.length >= TICK_ARRAY_SIZE) {
|
||||
postLatency.shift();
|
||||
}
|
||||
postLatency.push(latency);
|
||||
console.log(JSON.stringify(postLatency));
|
||||
}
|
||||
if (type) {
|
||||
if (!counters[op.action][type]) {
|
||||
counters[op.action][type] = 0;
|
||||
}
|
||||
counters[op.action][type]++;
|
||||
}
|
||||
});
|
||||
totalCommits++;
|
||||
}
|
||||
});
|
||||
|
||||
router
|
||||
.get("/counters", (ctx) => {
|
||||
ctx.response.body = counters;
|
||||
})
|
||||
.get("/_metrics", (ctx) => {
|
||||
const avgLatency = postLatency.reduce((ps, a) => ps + a, 0) /
|
||||
postLatency.length;
|
||||
ctx.response.body = Object.keys(counters).map((mod) => {
|
||||
return Object.keys(counters[mod]).map((type) => {
|
||||
const val = counters[mod][type];
|
||||
return `firehose_event{server="${SERVER}",bgs="${BGS_HOSTNAME}",mod="${mod}",type="${type}"} ${val}`;
|
||||
}).filter((v) => v.trim()).join("\n");
|
||||
}).join("\n") + "\n" +
|
||||
`post_latency{server="${SERVER}",bgs="${BGS_HOSTNAME}"} ${avgLatency}` +
|
||||
"\n";
|
||||
});
|
||||
|
||||
app.use(router.routes());
|
||||
|
||||
app.listen({ port: HTTP_PORT });
|
||||
|
||||
console.log(`ATScan Firehose metrics API started at port ${HTTP_PORT}`);
|
|
@ -9,6 +9,8 @@ import * as atprotoApi from "npm:@atproto/api";
|
|||
const { AppBskyActorProfile } = atprotoApi.default;
|
||||
|
||||
const HTTP_PORT = "6990";
|
||||
const SERVER = "hex";
|
||||
const BGS_HOSTNAME = "bsky.social";
|
||||
|
||||
const ats = new ATScan({ enableQueues: true });
|
||||
ats.debug = true;
|
||||
|
@ -19,9 +21,10 @@ import { Application, Router } from "https://deno.land/x/oak/mod.ts";
|
|||
const app = new Application();
|
||||
const router = new Router();
|
||||
|
||||
let totalCommits = 0;
|
||||
const counters = {};
|
||||
|
||||
const client = subscribeRepos(`wss://bsky.social`, { decodeRepoOps: true });
|
||||
const client = subscribeRepos(`wss://${BGS_HOSTNAME}`, { decodeRepoOps: true });
|
||||
client.on("message", (m) => {
|
||||
if (ComAtprotoSyncSubscribeRepos.isHandle(m)) {
|
||||
console.log("handle", m);
|
||||
|
@ -31,11 +34,18 @@ client.on("message", (m) => {
|
|||
if (!counters[op.action]) {
|
||||
counters[op.action] = {};
|
||||
}
|
||||
if (op.payload?.$type) {
|
||||
if (!counters[op.action][op.payload.$type]) {
|
||||
counters[op.action][op.payload.$type] = 0;
|
||||
let type = op.payload?.$type;
|
||||
if (op.action === "delete") {
|
||||
const delMatch = op.path.match(/^([^\/]+)\//);
|
||||
if (delMatch) {
|
||||
type = delMatch[1];
|
||||
}
|
||||
counters[op.action][op.payload.$type]++;
|
||||
}
|
||||
if (type) {
|
||||
if (!counters[op.action][type]) {
|
||||
counters[op.action][type] = 0;
|
||||
}
|
||||
counters[op.action][type]++;
|
||||
}
|
||||
if (op.payload?.$type === "app.bsky.actor.profile") {
|
||||
if (AppBskyActorProfile.isRecord(op.payload)) {
|
||||
|
@ -55,6 +65,7 @@ client.on("message", (m) => {
|
|||
}
|
||||
}
|
||||
});
|
||||
totalCommits++;
|
||||
}
|
||||
});
|
||||
|
||||
|
@ -66,7 +77,7 @@ router
|
|||
ctx.response.body = Object.keys(counters).map((mod) => {
|
||||
return Object.keys(counters[mod]).map((type) => {
|
||||
const val = counters[mod][type];
|
||||
return `firehose_event{mod="${mod}",type="${type}"} ${val}`;
|
||||
return `firehose_event{server="${SERVER}",bgs="${BGS_HOSTNAME}",mod="${mod}",type="${type}"} ${val}`;
|
||||
}).filter((v) => v.trim()).join("\n");
|
||||
}).join("\n") + "\n";
|
||||
});
|
||||
|
|
Načítá se…
Odkázat v novém úkolu