zrcadlo https://github.com/atscan/atscan
backend sockets
This commit is contained in:
rodič
5bd630ba7a
revize
6a6d5fb807
|
@ -10,4 +10,6 @@ BLUESKY_PASSWORD=YOUR_PASSWORD
|
|||
INFLUXDB_HOST=http://localhost:8086
|
||||
INFLUXDB_TOKEN=XXXX
|
||||
INFLUXDB_ORG=XXXX
|
||||
INFLUXDB_BUCKET=XXXX
|
||||
INFLUXDB_BUCKET=XXXX
|
||||
TYPESENSE_HOST=http://localhost:8108
|
||||
TYPESENSE_API_KEY=XXXXX
|
|
@ -1,15 +0,0 @@
|
|||
import { ATScan } from "./lib/atscan.js";
|
||||
|
||||
const ats = new ATScan();
|
||||
await ats.init();
|
||||
|
||||
function main() {
|
||||
const streams = {};
|
||||
streams.did = ats.db.did.watch();
|
||||
|
||||
streams.did.on("change", (ev) => {
|
||||
console.log(ev);
|
||||
});
|
||||
}
|
||||
|
||||
main();
|
|
@ -4,18 +4,63 @@ import { oakCors } from "https://deno.land/x/cors/mod.ts";
|
|||
import { minidenticon } from "npm:minidenticons@4.2.0";
|
||||
import _ from "npm:lodash";
|
||||
|
||||
const ats = new ATScan();
|
||||
const ats = new ATScan({ enableQueues: true, enableNats: true });
|
||||
await ats.init();
|
||||
ats.startDaemon();
|
||||
|
||||
const HTTP_PORT = 6677;
|
||||
if (Number(ats.env.PORT) === 6677) {
|
||||
const didUpdatedSub = ats.nats.subscribe("ats.service.plc.did.*");
|
||||
(async () => {
|
||||
for await (const m of didUpdatedSub) {
|
||||
const sub = m.subject;
|
||||
const codec = ats.JSONCodec;
|
||||
const did = codec.decode(m.data)?.did;
|
||||
|
||||
const item = await ats.db.did.findOne({ did });
|
||||
if (!item) {
|
||||
continue;
|
||||
}
|
||||
Object.assign(item, prepareObject("did", item));
|
||||
ats.nats.publish(
|
||||
`ats.api.did.${
|
||||
sub === "ats.service.plc.did.create" ? "create" : "update"
|
||||
}`,
|
||||
codec.encode(item),
|
||||
);
|
||||
}
|
||||
})();
|
||||
|
||||
const pdsUpdatedSub = ats.nats.subscribe("ats.service.pds.*");
|
||||
(async () => {
|
||||
for await (const m of pdsUpdatedSub) {
|
||||
const sub = m.subject;
|
||||
const codec = ats.JSONCodec;
|
||||
const url = codec.decode(m.data)?.url;
|
||||
|
||||
const item = await ats.db.pds.findOne({ url });
|
||||
if (!item) {
|
||||
continue;
|
||||
}
|
||||
Object.assign(item, prepareObject("pds", item));
|
||||
ats.nats.publish(
|
||||
`ats.api.pds.${sub === "ats.service.pds.create" ? "create" : "update"}`,
|
||||
codec.encode(item),
|
||||
);
|
||||
}
|
||||
})();
|
||||
}
|
||||
|
||||
const HTTP_PORT = ats.env.PORT || 6677;
|
||||
const app = new Application();
|
||||
|
||||
function perf(ctx) {
|
||||
if (ctx.request.url.toString().startsWith("http://localhost:")) {
|
||||
return null;
|
||||
}
|
||||
console.log(
|
||||
`GET ${ctx.request.url} [${performance.now() - ctx.perf}ms] ${
|
||||
ctx.request.headers.get("user-agent")
|
||||
}`,
|
||||
`[${HTTP_PORT}] GET ${ctx.request.url} [${
|
||||
performance.now() - ctx.perf
|
||||
}ms] ${ctx.request.headers.get("user-agent")}`,
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -61,6 +106,11 @@ function prepareObject(type, item) {
|
|||
? null
|
||||
: (respTimes.length > 0 ? _.mean(respTimes) : null);
|
||||
break;
|
||||
|
||||
case "did":
|
||||
item.srcHost = item.src.replace(/^https?:\/\//, "");
|
||||
item.fed = findDIDFed(item);
|
||||
break;
|
||||
}
|
||||
return item;
|
||||
}
|
||||
|
@ -119,6 +169,7 @@ router
|
|||
pds: { key: "pds" },
|
||||
size: { key: "repo.size" },
|
||||
commits: { key: "repo.commits" },
|
||||
lastSnapshot: { key: "repo.time" },
|
||||
};
|
||||
|
||||
let inputSort = ctx.request.url.searchParams.get("sort");
|
||||
|
@ -186,8 +237,7 @@ router
|
|||
const did
|
||||
of (await ats.db.did.find(query).sort(sort).limit(limit).toArray())
|
||||
) {
|
||||
did.srcHost = did.src.replace(/^https?:\/\//, "");
|
||||
did.fed = findDIDFed(did);
|
||||
Object.assign(did, prepareObject("did", did));
|
||||
out.push(did);
|
||||
}
|
||||
|
||||
|
@ -240,13 +290,31 @@ router
|
|||
})
|
||||
.get("/_metrics", async (ctx) => {
|
||||
const metrics = {
|
||||
pds_count: [await ats.db.pds.count(), "PDS count", "counter"],
|
||||
did_count: [await ats.db.did.count(), "DID count", "counter"],
|
||||
pds_count: [await ats.db.pds.count()],
|
||||
did_count: [await ats.db.did.count()],
|
||||
};
|
||||
for (const queueName of Object.keys(ats.queues)) {
|
||||
const queue = ats.queues[queueName];
|
||||
const getMetric = async (name) =>
|
||||
(await queue.getMetrics(name)).meta.count;
|
||||
|
||||
metrics[`queue_metric_completed{name="${queueName}"}`] = [
|
||||
await getMetric("completed"),
|
||||
];
|
||||
metrics[`queue_metric_failed{name="${queueName}"}`] = [
|
||||
await getMetric("failed"),
|
||||
];
|
||||
metrics[`queue_active{name="${queueName}"}`] = [
|
||||
await queue.getActiveCount(),
|
||||
];
|
||||
metrics[`queue_waiting{name="${queueName}"}`] = [
|
||||
await queue.getWaitingCount(),
|
||||
];
|
||||
}
|
||||
ctx.response.body = Object.keys(metrics).map((m) => {
|
||||
const [data, help, type] = metrics[m];
|
||||
return `# HELP ${m} ${help}\n# TYPE ${m} ${type}\n${m} ${data}\n`;
|
||||
}).join("\n");
|
||||
return `${m} ${data}`;
|
||||
}).join("\n") + "\n";
|
||||
perf(ctx);
|
||||
})
|
||||
.get("/:id.svg", async (ctx) => {
|
||||
|
|
|
@ -0,0 +1,15 @@
|
|||
import {
|
||||
ComAtprotoSyncSubscribeRepos,
|
||||
subscribeRepos,
|
||||
SubscribeReposMessage,
|
||||
} from "npm:atproto-firehose";
|
||||
|
||||
const client = subscribeRepos(`wss://bsky.social`, { decodeRepoOps: true });
|
||||
client.on("message", (m) => {
|
||||
//console.log(m)
|
||||
if (ComAtprotoSyncSubscribeRepos.isCommit(m)) {
|
||||
m.ops.forEach((op) => {
|
||||
//console.log(op.payload)
|
||||
});
|
||||
}
|
||||
});
|
|
@ -1,7 +1,7 @@
|
|||
import { ATScan } from "./lib/atscan.js";
|
||||
import whoiser from "npm:whoiser";
|
||||
|
||||
const wait = 60 * 5;
|
||||
const wait = 60 * 1;
|
||||
|
||||
async function index(ats) {
|
||||
for (const pds of await ats.db.pds.find().toArray()) {
|
||||
|
@ -30,6 +30,10 @@ async function index(ats) {
|
|||
host,
|
||||
]]);
|
||||
await ats.writeInflux("pds_size", "intField", size, [["pds", host]]);
|
||||
ats.nats.publish(
|
||||
"ats.service.pds.update",
|
||||
ats.JSONCodec.encode({ url: pds.url }),
|
||||
);
|
||||
}
|
||||
console.log("indexer round finished");
|
||||
//console.log(await whoiser("dev.otaso-sky.blue"));
|
||||
|
@ -37,7 +41,7 @@ async function index(ats) {
|
|||
|
||||
if (Deno.args[0] === "daemon") {
|
||||
console.log("Initializing ATScan ..");
|
||||
const ats = new ATScan();
|
||||
const ats = new ATScan({ enableNats: true });
|
||||
ats.debug = true;
|
||||
await ats.init();
|
||||
console.log("indexer daemon started");
|
||||
|
@ -49,7 +53,7 @@ if (Deno.args[0] === "daemon") {
|
|||
console.log(`Processing [wait=${wait}s] ..`);
|
||||
setInterval(() => index(ats), wait * 1000);
|
||||
} else {
|
||||
const ats = new ATScan({ debug: true });
|
||||
const ats = new ATScan({ enableNats: true, debug: true });
|
||||
await ats.init();
|
||||
await index(ats);
|
||||
|
||||
|
|
|
@ -4,17 +4,23 @@ import { parse, stringify } from "https://deno.land/std@0.192.0/yaml/mod.ts";
|
|||
import { MongoClient } from "npm:mongodb";
|
||||
import { InfluxDB } from "npm:@influxdata/influxdb-client";
|
||||
import { makeQueues } from "./queues.js";
|
||||
import {
|
||||
connect as NATSConnect,
|
||||
JSONCodec,
|
||||
StringCodec,
|
||||
} from "https://deno.land/x/nats/src/mod.ts";
|
||||
|
||||
export class ATScan {
|
||||
constructor(opts = {}) {
|
||||
this.verbose = opts.verbose;
|
||||
this.debug = opts.debug;
|
||||
this.enableQueues = opts.enableQueues || false;
|
||||
this.enableNats = opts.enableNats || false;
|
||||
console.log(this.enableQueues);
|
||||
}
|
||||
|
||||
async init() {
|
||||
this.env = await envLoad();
|
||||
this.env = Object.assign(Deno.env.toObject(), await envLoad());
|
||||
await this.ecosystemLoad();
|
||||
const influxConfig = {
|
||||
url: this.env.INFLUXDB_HOST,
|
||||
|
@ -31,113 +37,19 @@ export class ATScan {
|
|||
meta: this.dbRaw.collection("meta"),
|
||||
};
|
||||
console.log(`Connected to MongoDB: ${this.env.MONGODB_URL}`);
|
||||
if (this.enableNats) {
|
||||
this.nats = await NATSConnect({
|
||||
servers: this.env.NATS_SERVERS,
|
||||
});
|
||||
this.JSONCodec = JSONCodec();
|
||||
console.log(`Connected to NATS: ${this.env.NATS_SERVERS}`);
|
||||
}
|
||||
if (this.enableQueues) {
|
||||
this.queues = await makeQueues(this);
|
||||
console.log(`Queues initialized: ${Object.keys(this.queues).join(", ")}`);
|
||||
}
|
||||
}
|
||||
|
||||
async processPlcExport(plc, after = null) {
|
||||
const url = plc.url + "/export?after=" + (after || "");
|
||||
if (this.debug) {
|
||||
console.log(`ProcessPlcExport: ${url}`);
|
||||
}
|
||||
const req = await fetch(url);
|
||||
const lines = await req.text();
|
||||
if (!lines) {
|
||||
console.error(`No output from PLC! [${url}]`);
|
||||
return null;
|
||||
}
|
||||
const arr = lines.split("\n").map((l) => JSON.parse(l));
|
||||
|
||||
for (const data of arr) {
|
||||
const pdsUrl = data.operation.services?.atproto_pds?.endpoint;
|
||||
const matcher = { did: data.did, src: plc.url };
|
||||
const obj = {
|
||||
did: data.did,
|
||||
src: plc.url,
|
||||
revs: [data],
|
||||
time: new Date().toISOString(),
|
||||
lastMod: data.createdAt,
|
||||
pds: pdsUrl ? [pdsUrl] : [],
|
||||
};
|
||||
let didRev = 0;
|
||||
const found = await this.db.did.findOne(matcher);
|
||||
if (found) {
|
||||
const revFound = found.revs.find((r) => r.cid === data.cid);
|
||||
let updated = false;
|
||||
if (!revFound) {
|
||||
updated = true;
|
||||
didRev = found.revs.length;
|
||||
found.revs.push(data);
|
||||
//found.time = new Date().toISOString()
|
||||
console.log(
|
||||
`${
|
||||
(new Date()).toISOString()
|
||||
} DID: Adding new DID revision: ${data.did}@${didRev}`,
|
||||
);
|
||||
}
|
||||
if (pdsUrl && !found.pds.includes(pdsUrl)) {
|
||||
updated = true;
|
||||
found.pds.push(pdsUrl);
|
||||
}
|
||||
if (updated) {
|
||||
await this.db.did.updateOne(matcher, {
|
||||
$set: {
|
||||
time: new Date().toISOString(),
|
||||
revs: found.revs,
|
||||
pds: found.pds,
|
||||
lastMod: found.revs[found.revs.length - 1].createdAt,
|
||||
},
|
||||
});
|
||||
}
|
||||
} else {
|
||||
console.log(
|
||||
`${
|
||||
(new Date()).toISOString()
|
||||
} DID: Adding new DID revision: ${data.did}@0 (init)`,
|
||||
);
|
||||
await this.db.did.insertOne(obj);
|
||||
}
|
||||
if (pdsUrl) {
|
||||
const pdsFound = await this.db.pds.findOne({ url: pdsUrl });
|
||||
const didId = [data.did, didRev].join("@");
|
||||
if (pdsFound) {
|
||||
if (!pdsFound.plcs.includes(plc.url)) {
|
||||
pdsFound.plcs.push(plcUrl);
|
||||
console.log(
|
||||
`${
|
||||
(new Date()).toISOString()
|
||||
} PDS [${pdsUrl}]: Adding new PLC: ${plc.url}`,
|
||||
);
|
||||
await this.db.pds.updateOne({ url: pdsUrl }, {
|
||||
$set: {
|
||||
plcs: pdsFound.plcs,
|
||||
},
|
||||
});
|
||||
}
|
||||
} else {
|
||||
await this.db.pds.insertOne({
|
||||
url: pdsUrl,
|
||||
plcs: [plc.url],
|
||||
time: new Date().toISOString(),
|
||||
});
|
||||
}
|
||||
// update PDS stats
|
||||
const didsCount = await this.db.did.countDocuments({
|
||||
"pds": { $in: [pdsUrl] },
|
||||
});
|
||||
await this.db.pds.updateOne({ url: pdsUrl }, { $set: { didsCount } });
|
||||
}
|
||||
}
|
||||
|
||||
const key = `lastUpdate:${plc.url}`;
|
||||
await this.db.meta.updateOne({ key }, {
|
||||
$set: { key, value: arr[arr.length - 1].createdAt },
|
||||
}, { upsert: true });
|
||||
return arr.length !== 1 ? arr[arr.length - 1].createdAt : false;
|
||||
}
|
||||
|
||||
async ecosystemLoad() {
|
||||
const res = await fetch(this.env.ATSCAN_ECOSYSTEM_URL);
|
||||
this.ecosystem = await res.json();
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
import { Queue } from "npm:bullmq";
|
||||
import { Queue } from "npm:bullmq@4.2.0";
|
||||
|
||||
export async function makeQueues(ats) {
|
||||
const connection = ats.redisConnectionOptions();
|
||||
|
|
|
@ -102,4 +102,7 @@ export async function saveRepo(ats, didInfo, job = null) {
|
|||
}
|
||||
// update db record
|
||||
await ats.db.did.updateOne({ did }, { $set: { repo } });
|
||||
|
||||
// send notification to socket
|
||||
ats.nats.publish("ats.service.plc.did.update", ats.JSONCodec.encode({ did }));
|
||||
}
|
||||
|
|
|
@ -1,29 +1,17 @@
|
|||
import { ATScan } from "./lib/atscan.js";
|
||||
import { pooledMap } from "https://deno.land/std/async/mod.ts";
|
||||
import { timeout } from "./lib/utils.js";
|
||||
import {
|
||||
connect,
|
||||
JSONCodec,
|
||||
StringCodec,
|
||||
} from "https://deno.land/x/nats/src/mod.ts";
|
||||
import "https://deno.land/std@0.192.0/dotenv/load.ts";
|
||||
|
||||
const WAIT = 1000 * 60 * 2;
|
||||
const TIMEOUT = 2500;
|
||||
|
||||
const nc = await connect({
|
||||
servers: Deno.env.get("NATS_SERVERS"),
|
||||
});
|
||||
const jc = JSONCodec();
|
||||
console.log(`connected to ${nc.getServer()}`);
|
||||
|
||||
const hosts = {
|
||||
local: {},
|
||||
texas: {},
|
||||
tokyo: {},
|
||||
};
|
||||
|
||||
async function crawlUrl(url, host = "local") {
|
||||
async function crawlUrl(ats, url, host = "local") {
|
||||
if (host === "local") {
|
||||
try {
|
||||
const [, ms] = await timeout(
|
||||
|
@ -67,10 +55,15 @@ async function crawlUrl(url, host = "local") {
|
|||
console.error(`Unknown host: ${host}`);
|
||||
return { err: "unknown host" };
|
||||
}
|
||||
const resp = await nc.request(`ats-nodes.${host}.http`, jc.encode({ url }), {
|
||||
timeout: 60000,
|
||||
});
|
||||
const { err, data, ms } = jc.decode(resp.data);
|
||||
const codec = ats.JSONCodec;
|
||||
const resp = await ats.nats.request(
|
||||
`ats-nodes.${host}.http`,
|
||||
codec.encode({ url }),
|
||||
{
|
||||
timeout: 60000,
|
||||
},
|
||||
);
|
||||
const { err, data, ms } = codec.decode(resp.data);
|
||||
return { err, data, ms };
|
||||
}
|
||||
|
||||
|
@ -105,7 +98,7 @@ async function crawl(ats) {
|
|||
let ip;
|
||||
try {
|
||||
ip = await (await fetch(
|
||||
`http://ipinfo.io/${ipAddr}?token=${Deno.env.get("IPINFO_TOKEN")}`,
|
||||
`http://ipinfo.io/${ipAddr}?token=${ats.env.IPINFO_TOKEN}`,
|
||||
))
|
||||
.json();
|
||||
} catch (e) {}
|
||||
|
@ -127,7 +120,7 @@ async function crawl(ats) {
|
|||
const url = `${i.url}/xrpc/com.atproto.server.describeServer`;
|
||||
await Promise.all(
|
||||
Object.keys(hosts).map(async (chost) => {
|
||||
const { err, data, ms } = await crawlUrl(url, chost);
|
||||
const { err, data, ms } = await crawlUrl(ats, url, chost);
|
||||
const inspect = {
|
||||
err,
|
||||
data,
|
||||
|
@ -152,6 +145,10 @@ async function crawl(ats) {
|
|||
["crawler", chost],
|
||||
]);
|
||||
}
|
||||
ats.nats.publish(
|
||||
"ats.service.pds.update",
|
||||
ats.JSONCodec.encode({ url: i.url }),
|
||||
);
|
||||
console.log(
|
||||
`[${chost}] -> ${i.url} ${ms ? "[" + ms + "ms]" : ""} ${
|
||||
err ? "error = " + err : ""
|
||||
|
@ -165,8 +162,7 @@ async function crawl(ats) {
|
|||
|
||||
if (Deno.args[0] === "daemon") {
|
||||
console.log("Initializing ATScan ..");
|
||||
//console.log('IPINFO_TOKEN', Deno.env.get("IPINFO_TOKEN"));
|
||||
const ats = new ATScan();
|
||||
const ats = new ATScan({ enableNats: true });
|
||||
ats.debug = true;
|
||||
await ats.init();
|
||||
console.log("pds-crawl daemon started");
|
||||
|
@ -178,7 +174,7 @@ if (Deno.args[0] === "daemon") {
|
|||
console.log(`Processing events [wait=${WAIT / 1000}s] ..`);
|
||||
setInterval(() => crawl(ats), WAIT);
|
||||
} else {
|
||||
const ats = new ATScan({ debug: true });
|
||||
const ats = new ATScan({ enableNats: true, debug: true });
|
||||
await ats.init();
|
||||
await crawl(ats);
|
||||
}
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
import { ATScan } from "./lib/atscan.js";
|
||||
|
||||
async function crawl(ats) {
|
||||
for (const plc of ats.ecosystem.data["plc-directories"]) {
|
||||
for await (const plc of ats.ecosystem.data["plc-directories"]) {
|
||||
let start = 0;
|
||||
if (Deno.args[0] !== "init") {
|
||||
const item = await ats.db.meta.findOne({ key: `lastUpdate:${plc.url}` });
|
||||
|
@ -9,10 +9,131 @@ async function crawl(ats) {
|
|||
start = item.value;
|
||||
}
|
||||
}
|
||||
let after = await ats.processPlcExport(plc, start);
|
||||
while (after) {
|
||||
after = await ats.processPlcExport(plc, after);
|
||||
await processPlcExport(ats, plc, start);
|
||||
}
|
||||
}
|
||||
|
||||
async function processPlcExport(ats, plc, after = null) {
|
||||
const url = plc.url + "/export?after=" + (after || "");
|
||||
if (ats.debug) {
|
||||
console.log(`ProcessPlcExport: ${url}`);
|
||||
}
|
||||
const req = await fetch(url);
|
||||
const lines = await req.text();
|
||||
if (!lines) {
|
||||
console.error(`No output from PLC! [${url}]`);
|
||||
return null;
|
||||
}
|
||||
const arr = lines.split("\n").map((l) => JSON.parse(l));
|
||||
|
||||
if (after && arr.length === 1) {
|
||||
return null;
|
||||
}
|
||||
|
||||
for (const data of arr) {
|
||||
const pdsUrl = data.operation.services?.atproto_pds?.endpoint;
|
||||
const matcher = { did: data.did, src: plc.url };
|
||||
const obj = {
|
||||
did: data.did,
|
||||
src: plc.url,
|
||||
revs: [data],
|
||||
time: new Date().toISOString(),
|
||||
lastMod: data.createdAt,
|
||||
pds: pdsUrl ? [pdsUrl] : [],
|
||||
};
|
||||
let didRev = 0;
|
||||
const found = await ats.db.did.findOne(matcher);
|
||||
if (found) {
|
||||
const revFound = found.revs.find((r) => r.cid === data.cid);
|
||||
let updated = false;
|
||||
if (!revFound) {
|
||||
updated = true;
|
||||
didRev = found.revs.length;
|
||||
found.revs.push(data);
|
||||
//found.time = new Date().toISOString()
|
||||
console.log(
|
||||
`${
|
||||
(new Date()).toISOString()
|
||||
} DID: Adding new DID revision: ${data.did}@${didRev}`,
|
||||
);
|
||||
}
|
||||
if (pdsUrl && !found.pds.includes(pdsUrl)) {
|
||||
updated = true;
|
||||
found.pds.push(pdsUrl);
|
||||
}
|
||||
if (updated) {
|
||||
await ats.db.did.updateOne(matcher, {
|
||||
$set: {
|
||||
time: new Date().toISOString(),
|
||||
revs: found.revs,
|
||||
pds: found.pds,
|
||||
lastMod: found.revs[found.revs.length - 1].createdAt,
|
||||
},
|
||||
});
|
||||
ats.nats.publish(
|
||||
"ats.service.plc.did.update",
|
||||
ats.JSONCodec.encode({ did: obj.did }),
|
||||
);
|
||||
}
|
||||
} else {
|
||||
console.log(
|
||||
`${
|
||||
(new Date()).toISOString()
|
||||
} DID: Adding new DID revision: ${data.did}@0 (init)`,
|
||||
);
|
||||
await ats.db.did.insertOne(obj);
|
||||
ats.nats.publish(
|
||||
"ats.service.plc.did.create",
|
||||
ats.JSONCodec.encode({ did: obj.did }),
|
||||
);
|
||||
}
|
||||
await ats.queues.repoSnapshot.add(obj.did, obj, { jobId: obj.did });
|
||||
|
||||
// update pds
|
||||
if (pdsUrl) {
|
||||
const pdsFound = await ats.db.pds.findOne({ url: pdsUrl });
|
||||
const didId = [data.did, didRev].join("@");
|
||||
if (pdsFound) {
|
||||
if (!pdsFound.plcs.includes(plc.url)) {
|
||||
pdsFound.plcs.push(plcUrl);
|
||||
console.log(
|
||||
`${
|
||||
(new Date()).toISOString()
|
||||
} PDS [${pdsUrl}]: Adding new PLC: ${plc.url}`,
|
||||
);
|
||||
await ats.db.pds.updateOne({ url: pdsUrl }, {
|
||||
$set: {
|
||||
plcs: pdsFound.plcs,
|
||||
},
|
||||
});
|
||||
}
|
||||
} else {
|
||||
await ats.db.pds.insertOne({
|
||||
url: pdsUrl,
|
||||
plcs: [plc.url],
|
||||
time: new Date().toISOString(),
|
||||
});
|
||||
}
|
||||
// update PDS stats
|
||||
const didsCount = await ats.db.did.countDocuments({
|
||||
"pds": { $in: [pdsUrl] },
|
||||
});
|
||||
await ats.db.pds.updateOne({ url: pdsUrl }, { $set: { didsCount } });
|
||||
ats.nats.publish(
|
||||
"ats.service.pds.update",
|
||||
ats.JSONCodec.encode({ url: pdsUrl }),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
const key = `lastUpdate:${plc.url}`;
|
||||
await ats.db.meta.updateOne({ key }, {
|
||||
$set: { key, value: arr[arr.length - 1].createdAt },
|
||||
}, { upsert: true });
|
||||
|
||||
const next = arr.length > 0 ? arr[arr.length - 1].createdAt : false;
|
||||
if (next) {
|
||||
await processPlcExport(ats, plc, next);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -20,7 +141,7 @@ if (Deno.args[0] === "daemon") {
|
|||
const wait = 15;
|
||||
|
||||
console.log("Initializing ATScan ..");
|
||||
const ats = new ATScan();
|
||||
const ats = new ATScan({ enableNats: true, enableQueues: true });
|
||||
ats.debug = true;
|
||||
await ats.init();
|
||||
console.log("plc-crawl daemon started");
|
||||
|
@ -32,7 +153,7 @@ if (Deno.args[0] === "daemon") {
|
|||
console.log(`Processing events [wait=${wait}s] ..`);
|
||||
setInterval(() => crawl(ats), wait * 1000);
|
||||
} else {
|
||||
const ats = new ATScan({ debug: true });
|
||||
const ats = new ATScan({ debug: true, enableNats: true, enableQueues: true });
|
||||
await ats.init();
|
||||
await crawl(ats);
|
||||
}
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
import { pooledMap } from "https://deno.land/std/async/mod.ts";
|
||||
import { ATScan } from "./lib/atscan.js";
|
||||
import { differenceInMinutes } from "npm:date-fns";
|
||||
import _ from "npm:lodash";
|
||||
|
||||
const CONCURRENCY = 4;
|
||||
|
@ -17,13 +18,19 @@ async function processPDSRepos(ats, repos, item) {
|
|||
console.error(`DID ${repo.did} not exists?? (!!)`);
|
||||
continue;
|
||||
}
|
||||
/*console.log(
|
||||
repo.did,
|
||||
repo.head,
|
||||
did.repo?.root,
|
||||
repo.head === did.repo?.root,
|
||||
);*/
|
||||
|
||||
if (repo.head !== didObj.repo?.root) {
|
||||
// ignore dids which was updated in last 20h hours
|
||||
if (didObj.repo) {
|
||||
const diff = differenceInMinutes(
|
||||
new Date(),
|
||||
new Date(didObj.repo.time),
|
||||
);
|
||||
const size = didObj.repo.size || 0;
|
||||
if (diff < (20 * 60) && size > 20000) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
await ats.queues.repoSnapshot.add(repo.did, didObj, {
|
||||
//priority: 1,
|
||||
jobId: repo.did,
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
import { Worker } from "npm:bullmq";
|
||||
import { MetricsTime, Worker } from "npm:bullmq";
|
||||
import { ATScan } from "./lib/atscan.js";
|
||||
import { saveRepo } from "./lib/repo.js";
|
||||
|
||||
const ats = new ATScan();
|
||||
const ats = new ATScan({ enableNats: true });
|
||||
ats.debug = true;
|
||||
await ats.init();
|
||||
|
||||
|
@ -13,4 +13,7 @@ async function processJob(job) {
|
|||
|
||||
const worker = new Worker("repo-snapshot", processJob, {
|
||||
connection: ats.redisConnectionOptions(),
|
||||
metrics: {
|
||||
maxDataPoints: MetricsTime.ONE_WEEK * 2,
|
||||
},
|
||||
});
|
||||
|
|
|
@ -0,0 +1,68 @@
|
|||
import * as typesense from "npm:typesense";
|
||||
const Typesense = typesense.default;
|
||||
import { ATScan } from "./lib/atscan.js";
|
||||
|
||||
//import { Client, Errors, SearchClient } from "https://raw.githubusercontent.com/bradenmacdonald/typesense-deno/v1.1.3-deno/mod.ts";
|
||||
|
||||
const ats = new ATScan();
|
||||
await ats.init();
|
||||
|
||||
async function loadItems(limit = 10000, offset = 0) {
|
||||
console.log("offset", offset);
|
||||
const items = await ats.db.did.find({}, {
|
||||
"revs.operation.alsoKnownAs": 1,
|
||||
did: 1,
|
||||
}).limit(limit).skip(offset).toArray();
|
||||
const arr = items.map((item) => {
|
||||
const rev = item.revs[item.revs.length - 1];
|
||||
const handle =
|
||||
rev && rev.operation?.alsoKnownAs && rev.operation?.alsoKnownAs[0]
|
||||
? rev.operation?.alsoKnownAs[0]?.replace(/^at:\/\//, "").replaceAll(
|
||||
".",
|
||||
" ",
|
||||
)
|
||||
: null;
|
||||
|
||||
const prevHandles = item.revs.length > 1
|
||||
? item.revs.slice(0, item.revs.length - 2).map((r) =>
|
||||
r.operation?.alsoKnownAs
|
||||
? r.operation?.alsoKnownAs[0]?.replace(/^at:\/\//, "").replaceAll(
|
||||
".",
|
||||
" ",
|
||||
)
|
||||
: ""
|
||||
)
|
||||
: [];
|
||||
return {
|
||||
id: item.did,
|
||||
did: item.did,
|
||||
handle,
|
||||
prevHandles,
|
||||
desc: item.repo?.profile?.description || "",
|
||||
name: item.repo?.profile?.displayName || "",
|
||||
};
|
||||
});
|
||||
//console.log(arr)
|
||||
const out = await fetch(
|
||||
`${ats.env.TYPESENSE_HOST}/collections/dids/documents/import?action=upsert`,
|
||||
{
|
||||
method: "POST",
|
||||
headers: {
|
||||
"X-TYPESENSE-API-KEY": ats.env.TYPESENSE_API_KEY,
|
||||
"Content-Type": "text/plain",
|
||||
},
|
||||
body: arr.map(JSON.stringify).join("\n"),
|
||||
},
|
||||
);
|
||||
console.log(await out.text());
|
||||
}
|
||||
|
||||
const limit = 10000;
|
||||
const total = await ats.db.did.count();
|
||||
const pages = Math.ceil(total / limit);
|
||||
|
||||
for (let i = 0; i < pages; i++) {
|
||||
await loadItems(limit, limit * i);
|
||||
}
|
||||
|
||||
Deno.exit(0);
|
17
cli/ats.js
17
cli/ats.js
|
@ -11,19 +11,4 @@ await new Command()
|
|||
.usage("<command>")
|
||||
.action(() => { console.log("Please specify command or use `-h`") })
|
||||
.command("repo,r", "Repository tools").executable()
|
||||
.parse(Deno.args)
|
||||
|
||||
//import * as bsky from "npm:@atproto/api";
|
||||
//import "https://deno.land/std@0.192.0/dotenv/load.ts";
|
||||
/*const { BskyAgent } = bsky.default;
|
||||
const agent = new BskyAgent({ service: "https://bsky.social" });
|
||||
|
||||
await agent.login({
|
||||
identifier: Deno.env.get("BLUESKY_USERNAME"),
|
||||
password: Deno.env.get("BLUESKY_PASSWORD"),
|
||||
});*/
|
||||
|
||||
/*const p = await agent.getProfiles({
|
||||
actors: ["did:plc:b5rrmme6ncenhe4lq53y7lpf", "tree.fail"],
|
||||
});*/
|
||||
//console.log(p);
|
||||
.parse(Deno.args)
|
|
@ -35,19 +35,28 @@ module.exports = {
|
|||
PORT: 4000,
|
||||
}
|
||||
}, {
|
||||
name: "atscan-api",
|
||||
name: "atscan-api-master",
|
||||
script: "./backend/api.js",
|
||||
//args : "daemon",
|
||||
interpreter: "deno",
|
||||
interpreterArgs: "run --unstable --allow-net --allow-read --allow-env --allow-sys",
|
||||
//watch: true,
|
||||
ignore_watch: [ 'frontend' ],
|
||||
|
||||
env: {
|
||||
PORT: 6677
|
||||
}
|
||||
}, {
|
||||
name: "atscan-api-slave",
|
||||
script: "./backend/api.js",
|
||||
//args : "daemon",
|
||||
interpreter: "deno",
|
||||
interpreterArgs: "run --unstable --allow-net --allow-read --allow-env --allow-sys",
|
||||
env: {
|
||||
PORT: 6678
|
||||
}
|
||||
}, {
|
||||
name: "atscan-worker",
|
||||
script: "./backend/repo-worker.js",
|
||||
interpreter: "deno",
|
||||
interpreterArgs: "run --unstable --allow-net --allow-read --allow-write --allow-env --allow-ffi --allow-sys ./backend/repo-worker.js",
|
||||
interpreter: "mullvad-exclude",
|
||||
interpreterArgs: "deno run --unstable --allow-net --allow-read --allow-write --allow-env --allow-ffi --allow-sys ./backend/repo-worker.js",
|
||||
instances: 6,
|
||||
}, {
|
||||
name: "bull-ui",
|
||||
|
|
Načítá se…
Odkázat v novém úkolu