update repo crawler

This commit is contained in:
tree 2023-07-04 06:37:55 +00:00
rodič abed2a63b7
revize 5bd630ba7a
9 změnil soubory, kde provedl 213 přidání a 160 odebrání

Zobrazit soubor

@ -1,6 +1,10 @@
IPINFO_TOKEN=YOUR_TOKEN
ATSCAN_DB_PATH=./db
ATSCAN_ECOSYSTEM_URL=https://mirror.ecosystem.atscan.net/index.json
MONGODB_URL=mongodb://127.0.0.1:27017
NATS_SERVERS=XXXX
REDIS_HOST=localhost
REDIT_PORT=6379
IPINFO_TOKEN=YOUR_TOKEN
BLUESKY_USERNAME=YOUR_USERNAME
BLUESKY_PASSWORD=YOUR_PASSWORD
INFLUXDB_HOST=http://localhost:8086

Zobrazit soubor

@ -1,28 +1,27 @@
const express = require('express');
const { Queue } = require('bullmq');
const { createBullBoard } = require('@bull-board/api');
const { BullAdapter } = require('@bull-board/api/bullAdapter');
const { BullMQAdapter } = require('@bull-board/api/bullMQAdapter');
const { ExpressAdapter } = require('@bull-board/express');
const express = require("express");
const { Queue, QueueEvents } = require("bullmq");
const { createBullBoard } = require("@bull-board/api");
const { BullMQAdapter } = require("@bull-board/api/bullMQAdapter");
const { ExpressAdapter } = require("@bull-board/express");
const queueMQ = new Queue('repo-inspect');
const queues = [
["repo-snapshot"],
];
const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath('/admin/queues');
serverAdapter.setBasePath("/admin/queues");
const { addQueue, removeQueue, setQueues, replaceQueues } = createBullBoard({
queues: [new BullMQAdapter(queueMQ)],
queues: queues.map((q) => new BullMQAdapter(new Queue(q))),
serverAdapter: serverAdapter,
});
const app = express();
app.use('/admin/queues', serverAdapter.getRouter());
// other configurations of your server
app.use("/admin/queues", serverAdapter.getRouter());
app.listen(3055, () => {
console.log('Running on 3055...');
console.log('For the UI, open http://localhost:3055/admin/queues');
console.log('Make sure Redis is running on port 6379 by default');
});
console.log("Running on 3055...");
console.log("For the UI, open http://localhost:3055/admin/queues");
console.log("Make sure Redis is running on port 6379 by default");
});

Zobrazit soubor

@ -1,31 +1,40 @@
//import { Bson, MongoClient } from "https://deno.land/x/mongo@v0.31.2/mod.ts";
import { parse, stringify } from "https://deno.land/std@0.184.0/yaml/mod.ts";
import { load as envLoad } from "https://deno.land/std@0.192.0/dotenv/mod.ts";
import { parse, stringify } from "https://deno.land/std@0.192.0/yaml/mod.ts";
import { MongoClient } from "npm:mongodb";
import "https://deno.land/std@0.192.0/dotenv/load.ts";
import {InfluxDB} from 'npm:@influxdata/influxdb-client'
const ATSCAN_ECOSYSTEM = "https://mirror.ecosystem.atscan.net/index.json";
import { InfluxDB } from "npm:@influxdata/influxdb-client";
import { makeQueues } from "./queues.js";
export class ATScan {
constructor(opts = {}) {
this.verbose = opts.verbose;
this.debug = opts.debug;
this.enableQueues = opts.enableQueues || false;
console.log(this.enableQueues);
}
async init() {
this.env = await envLoad();
await this.ecosystemLoad();
const influxConfig = {url: Deno.env.get('INFLUXDB_HOST'), token: Deno.env.get('INFLUXDB_TOKEN')}
const influxConfig = {
url: this.env.INFLUXDB_HOST,
token: this.env.INFLUXDB_TOKEN,
};
this.influx = new InfluxDB(influxConfig);
this.influxQuery = this.influx.getQueryApi(Deno.env.get('INFLUXDB_ORG'))
this.client = new MongoClient(Deno.env.get("MONGODB_URL"));
this.influxQuery = this.influx.getQueryApi(this.env.INFLUXDB_ORG);
this.client = new MongoClient(this.env.MONGODB_URL);
await this.client.connect();
console.log(`Connected to MongoDB: ${Deno.env.get("MONGODB_URL")}`);
this.dbRaw = this.client.db("test");
this.db = {
did: this.dbRaw.collection("did"),
pds: this.dbRaw.collection("pds"),
meta: this.dbRaw.collection("meta"),
};
console.log(`Connected to MongoDB: ${this.env.MONGODB_URL}`);
if (this.enableQueues) {
this.queues = await makeQueues(this);
console.log(`Queues initialized: ${Object.keys(this.queues).join(", ")}`);
}
}
async processPlcExport(plc, after = null) {
@ -130,28 +139,40 @@ export class ATScan {
}
async ecosystemLoad() {
const res = await fetch(ATSCAN_ECOSYSTEM);
const res = await fetch(this.env.ATSCAN_ECOSYSTEM_URL);
this.ecosystem = await res.json();
console.log(`Ecosystem updated: ${ATSCAN_ECOSYSTEM}`);
console.log(`Ecosystem updated: ${this.env.ATSCAN_ECOSYSTEM_URL}`);
}
startDaemon() {
console.log("Starting daemon ..");
const ecosInt = setInterval(() => this.ecosystemLoad(), 30 * 1000);
}
async writeInflux (name, type, value, tags = []) {
const point = `${name},${tags.map(t => t.join('=')).join(',')} value=${value} ${Date.now()}`;
const resp = await fetch(`${Deno.env.get('INFLUXDB_HOST')}/api/v2/write?org=${Deno.env.get('INFLUXDB_ORG')}&bucket=${Deno.env.get('INFLUXDB_BUCKET')}&precision=ms`, {
method: 'POST',
headers: {
Authorization: `Token ${Deno.env.get('INFLUXDB_TOKEN')}`
async writeInflux(name, type, value, tags = []) {
const point = `${name},${
tags.map((t) => t.join("=")).join(",")
} value=${value} ${Date.now()}`;
const resp = await fetch(
`${this.env.INFLUXDB_HOST}/api/v2/write?org=${this.env.INFLUXDB_ORG}&bucket=${this.env.INFLUXDB_BUCKET}&precision=ms`,
{
method: "POST",
headers: {
Authorization: `Token ${this.env.INFLUXDB_TOKEN}`,
},
body: point,
},
body: point
})
);
if (resp.status > 299) {
console.error('influx error: '+resp.status, Deno.env.get('INFLUXDB_TOKEN'))
console.error(await resp.json())
console.error("influx error: " + resp.status, this.env.INFLUXDB_TOKEN);
console.error(await resp.json());
}
return true
return true;
}
redisConnectionOptions() {
return {
host: this.env.REDIS_HOST || "localhost",
port: this.env.REDIS_PORT || "6379",
};
}
}

Zobrazit soubor

@ -41,7 +41,7 @@ export async function read(data, did, signingKey, job = null) {
await job.log(`checkout done: ${did}`);
await job.updateProgress(60);
}
const history = await verifyFullHistory(storage, root, did, signingKey);
if (job) {
await job.log(`fullHistory done: ${did}`);

10
backend/lib/queues.js Normal file
Zobrazit soubor

@ -0,0 +1,10 @@
import { Queue } from "npm:bullmq";
export async function makeQueues(ats) {
const connection = ats.redisConnectionOptions();
return {
//repoInspect: new Queue("repo-inspect", { connection }),
repoSnapshot: new Queue("repo-snapshot", { connection }),
};
}

Zobrazit soubor

@ -5,94 +5,101 @@ import { ensureDir } from "https://deno.land/std@0.192.0/fs/ensure_dir.ts";
import * as fsize from "npm:filesize";
const filesize = fsize.filesize;
const DB_PATH = "./backend/db/repo";
await ensureDir(DB_PATH);
export async function saveRepo(ats, didInfo, job = null) {
if (didInfo.skipRepo) {
return null;
if (didInfo.skipRepo) {
return null;
}
const did = didInfo.did;
const signingKey = didInfo.revs[didInfo.revs.length - 1].operation
.verificationMethods?.atproto;
if (!signingKey) {
await ats.db.did.updateOne({ did }, {
$set: { repo: { error: "no signing key", time: new Date() } },
});
return;
}
const pds = didInfo.pds[0];
// fetch remote repo
const url = `${pds}/xrpc/com.atproto.sync.getRepo?did=${did}`;
if (job) {
job.log(url);
}
let repoRes;
try {
[repoRes] = await timeout(20 * 1000, fetch(url));
} catch (e) {
repoRes = { ok: false };
console.error(e);
await ats.db.did.updateOne({ did }, {
$set: { repo: { error: e.message, time: new Date() } },
});
return;
}
if (!repoRes.ok) {
let message = null;
if ([403, 500].includes(repoRes.status)) {
let err;
try {
err = await repoRes.json();
} catch {}
message = err?.message;
}
const did = didInfo.did;
const signingKey = didInfo.revs[didInfo.revs.length - 1].operation
.verificationMethods?.atproto;
if (!signingKey) {
await ats.db.did.updateOne({ did }, {
$set: { repo: { error: "no signing key", time: new Date() } },
});
return;
}
const pds = didInfo.pds[0];
// fetch remote repo
const url = `${pds}/xrpc/com.atproto.sync.getRepo?did=${did}`;
if (job) {
job.log(url);
}
let repoRes;
try {
[repoRes] = await timeout(20 * 1000, fetch(url));
} catch (e) {
repoRes = { ok: false };
console.error(e);
await ats.db.did.updateOne({ did }, {
$set: { repo: { error: e.message, time: new Date() } },
});
return;
}
if (!repoRes.ok) {
let message = null;
if ([403, 500].includes(repoRes.status)) {
let err;
try {
err = await repoRes.json();
} catch {}
message = err?.message;
}
console.error(url, message);
await ats.db.did.updateOne({ did }, {
$set: { repo: { error: message, time: new Date() } },
});
return;
}
// car inspect
const data = new Uint8Array(await repoRes.arrayBuffer());
if (job) {
job.log(`downloaded: ${url} [${filesize(data.length)}]`);
await job.updateProgress(15);
}
let repo;
try {
repo = await inspect(data, did, signingKey, job);
} catch (e) {
console.error(e);
await ats.db.did.updateOne({ did }, {
$set: { repo: { error: e.message, time: new Date() } },
});
return;
}
// write files
const carFn = join(DB_PATH, `${did}.car`);
await Deno.writeFile(carFn, data);
const indexFn = join(DB_PATH, `${did}.json`);
await Deno.writeTextFile(
indexFn,
JSON.stringify(
{ did, signingKey, pds, root: repo.root, commits: repo.commits },
null,
2,
),
console.error(url, message);
await ats.db.did.updateOne({ did }, {
$set: { repo: { error: message, time: new Date() } },
});
return;
}
// car inspect
const data = new Uint8Array(await repoRes.arrayBuffer());
if (job) {
job.log(`downloaded: ${url} [${filesize(data.length)}]`);
await job.updateProgress(15);
}
let repo;
try {
repo = await inspect(data, did, signingKey, job);
} catch (e) {
console.error(e);
await ats.db.did.updateOne({ did }, {
$set: { repo: { error: e.message, time: new Date() } },
});
return;
}
// ensure db directory
const dbPathBase = ats.env.ATSCAN_DB_PATH || "./db";
const dbPath = join(dbPathBase, "repo");
await ensureDir(dbPath);
// write car file
const carFn = join(dbPath, `${did}.car`);
await Deno.writeFile(carFn, data);
// write index file
const indexFn = join(dbPath, `${did}.json`);
await Deno.writeTextFile(
indexFn,
JSON.stringify(
{ did, signingKey, pds, root: repo.root, commits: repo.commits },
null,
2,
),
);
if (job) {
await job.log(
`[${did}@${pds}] displayName=${
JSON.stringify(repo.profile?.displayName)
} [${filesize(repo.size)}] ${carFn}`,
);
if (job) {
await job.log(`[${did}@${pds}] displayName=${
JSON.stringify(repo.profile?.displayName)
} [${filesize(repo.size)}]`);
await job.updateProgress(99);
}
await ats.db.did.updateOne({ did }, { $set: { repo } });
}
await job.updateProgress(99);
}
// update db record
await ats.db.did.updateOne({ did }, { $set: { repo } });
}

Zobrazit soubor

@ -1,38 +1,39 @@
import { pooledMap } from "https://deno.land/std/async/mod.ts";
import { ATScan } from "./lib/atscan.js";
import _ from "npm:lodash";
import { Queue } from "npm:bullmq";
const repoQueue = new Queue("repo-inspect", {
connection: { host: "localhost", port: "6379" },
});
const counters = {};
const CONCURRENCY = 4;
async function processPDSRepos(ats, repos) {
const counters = { total: 0 };
async function processPDSRepos(ats, repos, item) {
let count = 0;
const objs = await ats.db.did.find({
did: { $in: repos.repos.map((r) => r.did) },
}).toArray();
for (const repo of repos.repos) {
const did = await ats.db.did.findOne({ did: repo.did });
if (!did) {
const didObj = objs.find((d) => d.did === repo.did);
if (!didObj) {
console.error(`DID ${repo.did} not exists?? (!!)`);
continue;
}
console.log(
/*console.log(
repo.did,
repo.head,
did.repo?.root,
repo.head === did.repo?.root,
);
if (repo.head !== did.repo?.root) {
await repoQueue.add(repo.did, did, { priority: repo.head ? 10 : 5 });
);*/
if (repo.head !== didObj.repo?.root) {
await ats.queues.repoSnapshot.add(repo.did, didObj, {
//priority: 1,
jobId: repo.did,
});
count++;
}
}
return { count };
}
async function getPDSRepos(item, cursor = null) {
if (!counters[item.url]) {
counters[item.url] = 1;
} else {
counters[item.url]++;
}
console.log(`Updating PDS=${item.url} [${counters[item.url]}] ..`);
const reposUrl = item.url + "/xrpc/com.atproto.sync.listRepos?limit=1000" +
(cursor ? "&cursor=" + cursor : "");
const reposRes = await fetch(reposUrl);
@ -45,9 +46,20 @@ async function getPDSRepos(item, cursor = null) {
}
async function traversePDSRepos(ats, item, cursor = null) {
if (!counters[item.url]) {
counters[item.url] = 1;
} else {
counters[item.url]++;
}
const repos = await getPDSRepos(item, cursor);
if (repos?.repos) {
await processPDSRepos(ats, repos);
const { count } = await processPDSRepos(ats, repos, item);
counters.total += count;
console.log(
`total=${counters.total} [PDS=${item.url} page=${
counters[item.url]
} count=${count}]`,
);
if (repos.repos.length === 1000) {
await traversePDSRepos(ats, item, repos.cursor);
@ -57,7 +69,7 @@ async function traversePDSRepos(ats, item, cursor = null) {
async function crawlNew(ats) {
const pds = await ats.db.pds.find({}).toArray();
const results = pooledMap(4, _.shuffle(pds), async (item) => {
const results = pooledMap(CONCURRENCY, _.shuffle(pds), async (item) => {
if (!item.inspect.current || item.inspect.current.err) {
return null;
}
@ -85,7 +97,7 @@ if (Deno.args[0] === "daemon") {
const wait = 60 * 15;
console.log("Initializing ATScan ..");
const ats = new ATScan();
const ats = new ATScan({ enableQueues: true });
ats.debug = true;
await ats.init();
console.log("repo-crawler daemon started");
@ -97,7 +109,7 @@ if (Deno.args[0] === "daemon") {
console.log(`Processing events [wait=${wait}s] ..`);
setInterval(() => crawl(ats), wait * 1000);
} else {
const ats = new ATScan({ debug: true });
const ats = new ATScan({ debug: true, enableQueues: true });
await ats.init();
await crawlNew(ats);
Deno.exit();

Zobrazit soubor

@ -11,6 +11,6 @@ async function processJob(job) {
return Promise.resolve();
}
const worker = new Worker("repo-inspect", processJob, {
connection: { host: "localhost", port: "6379" },
const worker = new Worker("repo-snapshot", processJob, {
connection: ats.redisConnectionOptions(),
});

Zobrazit soubor

@ -1,14 +1,14 @@
const Typesense = require('typesense')
const Typesense = require("typesense");
let client = new Typesense.Client({
'nodes': [{
'host': 'localhost', // For Typesense Cloud use xxx.a1.typesense.net
'port': '8108', // For Typesense Cloud use 443
'protocol': 'http' // For Typesense Cloud use https
"nodes": [{
"host": "localhost", // For Typesense Cloud use xxx.a1.typesense.net
"port": "8108", // For Typesense Cloud use 443
"protocol": "http", // For Typesense Cloud use https
}],
'apiKey': 'Kaey9ahMo7xoob1haivaithe2Aighoo3azohl2Joo5Aemoh4aishoogugh3Oowim',
'connectionTimeoutSeconds': 2
})
"apiKey": "Kaey9ahMo7xoob1haivaithe2Aighoo3azohl2Joo5Aemoh4aishoogugh3Oowim",
"connectionTimeoutSeconds": 2,
});
/*const schema = {
name: 'dids',
@ -21,4 +21,4 @@ let client = new Typesense.Client({
client.collections().create(schema)
.then(function (data) {
console.log(data)
})*/
})*/