This commit is contained in:
tree 2023-07-04 03:46:42 +00:00
rodič 8be6dfe34a
revize 874fe92055
9 změnil soubory, kde provedl 1536 přidání a 110 odebrání

Zobrazit soubor

@ -21,7 +21,10 @@ api-ws:
deno run --unstable --allow-net --allow-read --allow-env --allow-sys ./backend/api-ws.js
repo-crawler:
deno run --unstable --allow-net --allow-read --allow-write --allow-env --allow-sys ./backend/repo-crawler.js
deno run --unstable --allow-net --allow-read --allow-write --allow-env --allow-sys --allow-ffi ./backend/repo-crawler.js
repo-worker:
deno run --unstable --allow-net --allow-read --allow-write --allow-env --allow-ffi --allow-sys ./backend/repo-worker.js
fe-rebuild:
cd frontend && npm run build && pm2 restart atscan-fe

28
backend/bull-ui/index.js Normal file
Zobrazit soubor

@ -0,0 +1,28 @@
const express = require('express');
const { Queue } = require('bullmq');
const { createBullBoard } = require('@bull-board/api');
const { BullAdapter } = require('@bull-board/api/bullAdapter');
const { BullMQAdapter } = require('@bull-board/api/bullMQAdapter');
const { ExpressAdapter } = require('@bull-board/express');
const queueMQ = new Queue('repo-inspect');
const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath('/admin/queues');
const { addQueue, removeQueue, setQueues, replaceQueues } = createBullBoard({
queues: [new BullMQAdapter(queueMQ)],
serverAdapter: serverAdapter,
});
const app = express();
app.use('/admin/queues', serverAdapter.getRouter());
// other configurations of your server
app.listen(3055, () => {
console.log('Running on 3055...');
console.log('For the UI, open http://localhost:3055/admin/queues');
console.log('Make sure Redis is running on port 6379 by default');
});

1338
backend/bull-ui/package-lock.json vygenerováno Normal file

Rozdílový obsah nebyl zobrazen, protože je příliš veliký Načíst rozdílové porovnání

Zobrazit soubor

@ -0,0 +1,16 @@
{
"name": "bull-ui",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"author": "",
"license": "ISC",
"dependencies": {
"@bull-board/express": "^5.6.0",
"bullmq": "^4.2.0",
"express": "^4.18.2"
}
}

Zobrazit soubor

@ -27,17 +27,26 @@ export async function readRaw(data) {
};
}
export async function read(data, did, signingKey) {
export async function read(data, did, signingKey, job = null) {
const { root, blocks } = await readRaw(data);
console.log(`read done: ${did}`)
if (job) {
await job.log(`read done: ${did}`);
await job.updateProgress(35);
}
const storage = new MemoryBlockstore(blocks);
const checkout = await verifyCheckout(storage, root, did, signingKey);
console.log(`checkout done: ${did}`)
if (job) {
await job.log(`checkout done: ${did}`);
await job.updateProgress(60);
}
const history = await verifyFullHistory(storage, root, did, signingKey);
console.log(`fullHistory done: ${did}`)
if (job) {
await job.log(`fullHistory done: ${did}`);
await job.updateProgress(90);
}
return {
root,

98
backend/lib/repo.js Normal file
Zobrazit soubor

@ -0,0 +1,98 @@
import { inspect } from "./car.js";
import { timeout } from "./utils.js";
import { join } from "https://deno.land/std@0.192.0/path/posix.ts";
import { ensureDir } from "https://deno.land/std@0.192.0/fs/ensure_dir.ts";
import * as fsize from "npm:filesize";
const filesize = fsize.filesize;
const DB_PATH = "./backend/db/repo";
await ensureDir(DB_PATH);
export async function saveRepo(ats, didInfo, job = null) {
if (didInfo.skipRepo) {
return null;
}
const did = didInfo.did;
const signingKey = didInfo.revs[didInfo.revs.length - 1].operation
.verificationMethods?.atproto;
if (!signingKey) {
await ats.db.did.updateOne({ did }, {
$set: { repo: { error: "no signing key", time: new Date() } },
});
return;
}
const pds = didInfo.pds[0];
// fetch remote repo
const url = `${pds}/xrpc/com.atproto.sync.getRepo?did=${did}`;
if (job) {
job.log(url);
}
let repoRes;
try {
[repoRes] = await timeout(20 * 1000, fetch(url));
} catch (e) {
repoRes = { ok: false };
console.error(e);
await ats.db.did.updateOne({ did }, {
$set: { repo: { error: e.message, time: new Date() } },
});
return;
}
if (!repoRes.ok) {
let message = null;
if ([403, 500].includes(repoRes.status)) {
let err;
try {
err = await repoRes.json();
} catch {}
message = err?.message;
}
console.error(url, message);
await ats.db.did.updateOne({ did }, {
$set: { repo: { error: message, time: new Date() } },
});
return;
}
// car inspect
const data = new Uint8Array(await repoRes.arrayBuffer());
if (job) {
job.log(`downloaded: ${url} [${filesize(data.length)}]`);
await job.updateProgress(15);
}
let repo;
try {
repo = await inspect(data, did, signingKey, job);
} catch (e) {
console.error(e);
await ats.db.did.updateOne({ did }, {
$set: { repo: { error: e.message, time: new Date() } },
});
return;
}
// write files
const carFn = join(DB_PATH, `${did}.car`);
await Deno.writeFile(carFn, data);
const indexFn = join(DB_PATH, `${did}.json`);
await Deno.writeTextFile(
indexFn,
JSON.stringify(
{ did, signingKey, pds, root: repo.root, commits: repo.commits },
null,
2,
),
);
if (job) {
await job.log(`[${did}@${pds}] displayName=${
JSON.stringify(repo.profile?.displayName)
} [${filesize(repo.size)}]`);
await job.updateProgress(99);
}
await ats.db.did.updateOne({ did }, { $set: { repo } });
}

Zobrazit soubor

@ -1,17 +1,12 @@
import { ensureDir } from "https://deno.land/std@0.192.0/fs/ensure_dir.ts";
import { join } from "https://deno.land/std@0.192.0/path/posix.ts";
import { pooledMap } from "https://deno.land/std/async/mod.ts";
import * as fsize from "npm:filesize";
import { ATScan } from "./lib/atscan.js";
import { inspect } from "./lib/car.js";
import { timeout } from "./lib/utils.js";
import _ from "npm:lodash";
import { filesize as _filesize } from "npm:filesize";
import { Queue } from "npm:bullmq";
const filesize = fsize.filesize;
const DB_PATH = "./backend/db/repo";
await ensureDir(DB_PATH);
const repoQueue = new Queue("repo-inspect", {
connection: { host: "localhost", port: "6379" },
});
const counters = {};
async function processPDSRepos(ats, repos) {
for (const repo of repos.repos) {
@ -27,12 +22,17 @@ async function processPDSRepos(ats, repos) {
repo.head === did.repo?.root,
);
if (repo.head !== did.repo?.root) {
await saveRepo(ats, did);
await repoQueue.add(repo.did, did, { priority: repo.head ? 10 : 5 });
}
}
}
async function getPDSRepos(item, cursor = null) {
console.log(`Updating PDS=${item.url} ..`);
if (!counters[item.url]) {
counters[item.url] = 1;
} else {
counters[item.url]++;
}
console.log(`Updating PDS=${item.url} [${counters[item.url]}] ..`);
const reposUrl = item.url + "/xrpc/com.atproto.sync.listRepos?limit=1000" +
(cursor ? "&cursor=" + cursor : "");
const reposRes = await fetch(reposUrl);
@ -67,99 +67,6 @@ async function crawlNew(ats) {
for await (const _ of results) {}
}
async function saveRepo(ats, didInfo) {
if (didInfo.skipRepo) {
return null;
}
const did = didInfo.did;
const signingKey = didInfo.revs[didInfo.revs.length - 1].operation
.verificationMethods?.atproto;
if (!signingKey) {
await ats.db.did.updateOne({ did }, {
$set: { repo: { error: "no signing key", time: new Date() } },
});
return;
}
const pds = didInfo.pds[0];
//console.log(`[${did}@${pds}] Getting repo ..`);
// fetch remote repo
const url = `${pds}/xrpc/com.atproto.sync.getRepo?did=${did}`;
console.log(url);
let repoRes;
try {
[repoRes] = await timeout(20 * 1000, fetch(url));
} catch (e) {
repoRes = { ok: false };
console.error(e);
await ats.db.did.updateOne({ did }, {
$set: { repo: { error: e.message, time: new Date() } },
});
return;
}
if (!repoRes.ok) {
let message = null;
if ([403, 500].includes(repoRes.status)) {
let err;
try {
err = await repoRes.json();
} catch {}
message = err?.message;
}
console.error(url, message);
await ats.db.did.updateOne({ did }, {
$set: { repo: { error: message, time: new Date() } },
});
return;
}
//console.log(`[${did}@${pds}] Inspecting CAR ..`);
const data = new Uint8Array(await repoRes.arrayBuffer());
console.log(`downloaded: ${url} [${filesize(data.length)}]`);
let repo;
try {
repo = await inspect(data, did, signingKey);
} catch (e) {
console.error(e);
await ats.db.did.updateOne({ did }, {
$set: { repo: { error: e.message, time: new Date() } },
});
return;
}
const carFn = join(DB_PATH, `${did}.car`);
await Deno.writeFile(carFn, data);
//console.log(`[${did}@${pds}] File written: ${carFn}`);
const indexFn = join(DB_PATH, `${did}.json`);
await Deno.writeTextFile(
indexFn,
JSON.stringify(
{ did, signingKey, pds, root: repo.root, commits: repo.commits },
null,
2,
),
);
//console.log(`[${did}@${pds}] File written: ${indexFn}`);
console.log(
`[${did}@${pds}] displayName=${
JSON.stringify(repo.profile?.displayName)
} [${filesize(repo.size)}]`,
);
/*console.log(
`[${did}@${pds}] Done [${
Object.keys(repo.collections).map(
(c) => [c + ":" + repo.collections[c]],
).join(", ")
}]`,
);*/
await ats.db.did.updateOne({ did }, { $set: { repo } });
//console.log(out)
}
async function crawl(ats) {
let expiry = new Date();
expiry.setDate(expiry.getDate() - 10);

16
backend/repo-worker.js Normal file
Zobrazit soubor

@ -0,0 +1,16 @@
import { Worker } from "npm:bullmq";
import { ATScan } from "./lib/atscan.js";
import { saveRepo } from "./lib/repo.js";
const ats = new ATScan();
ats.debug = true;
await ats.init();
async function processJob(job) {
await saveRepo(ats, job.data, job);
return Promise.resolve();
}
const worker = new Worker("repo-inspect", processJob, {
connection: { host: "localhost", port: "6379" },
});

Zobrazit soubor

@ -42,5 +42,16 @@ module.exports = {
interpreterArgs: "run --unstable --allow-net --allow-read --allow-env --allow-sys",
//watch: true,
ignore_watch: [ 'frontend' ],
}, {
name: "atscan-worker",
script: "./backend/repo-worker.js",
interpreter: "deno",
interpreterArgs: "run --unstable --allow-net --allow-read --allow-write --allow-env --allow-ffi --allow-sys ./backend/repo-worker.js",
instances: 6,
}, {
name: "bull-ui",
script: "index.js",
cwd: "./backend/bull-ui"
}],
};