update indexer, pds crawler, new tokyo node

This commit is contained in:
tree 2023-07-03 05:59:13 +00:00
rodič 15260cd5f0
revize 79bb80dad3
8 změnil soubory, kde provedl 193 přidání a 98 odebrání

Zobrazit soubor

@ -51,9 +51,12 @@ function prepareObject(type, item) {
? "unknown"
: (item.inspect?.current.err ? "offline" : "online");
const respTimes = Object.keys(item.inspect).filter((k) =>
!["current", "lastOnline"].includes(k)
).map((k) => item.inspect[k].ms || null).filter((k) => k);
const respTimes = item.inspect
? Object.keys(item.inspect).filter((k) =>
!["current", "lastOnline"].includes(k)
).map((k) => item.inspect[k].ms || null).filter((k) => k)
: [];
item.responseTime = !item.inspect || item.status !== "online"
? null
: (respTimes.length > 0 ? _.mean(respTimes) : null);
@ -115,6 +118,7 @@ router
lastMod: { key: "lastMod" },
pds: { key: "pds" },
size: { key: "repo.size" },
commits: { key: "repo.commits" },
};
let inputSort = ctx.request.url.searchParams.get("sort");

Zobrazit soubor

@ -8,6 +8,7 @@ async function index(ats) {
const didsCount = await ats.db.did.countDocuments({
"pds": { $in: [pds.url] },
});
const host = pds.url.replace(/^https?:\/\//, "");
const stages = [
{ $match: { pds: { $in: [pds.url] } } },
@ -22,9 +23,13 @@ async function index(ats) {
];
const sizeRes = await ats.db.did.aggregate(stages).toArray();
const size = sizeRes[0].sum;
//console.log(`${pds.url}: ${size}`);
await ats.db.pds.updateOne({ url: pds.url }, { $set: { didsCount, size } });
await ats.writeInflux("pds_dids_count", "intField", didsCount, [[
"pds",
host,
]]);
await ats.writeInflux("pds_size", "intField", size, [["pds", host]]);
}
console.log("indexer round finished");
//console.log(await whoiser("dev.otaso-sky.blue"));

Zobrazit soubor

@ -30,9 +30,14 @@ export async function readRaw(data) {
export async function read(data, did, signingKey) {
const { root, blocks } = await readRaw(data);
console.log(`read done: ${did}`)
const storage = new MemoryBlockstore(blocks);
const checkout = await verifyCheckout(storage, root, did, signingKey);
console.log(`checkout done: ${did}`)
const history = await verifyFullHistory(storage, root, did, signingKey);
console.log(`fullHistory done: ${did}`)
return {
root,

Zobrazit soubor

@ -20,6 +20,7 @@ console.log(`connected to ${nc.getServer()}`);
const hosts = {
local: {},
texas: {},
tokyo: {},
};
async function crawlUrl(url, host = "local") {

Zobrazit soubor

@ -6,12 +6,160 @@ import { ATScan } from "./lib/atscan.js";
import { inspect } from "./lib/car.js";
import { timeout } from "./lib/utils.js";
import _ from "npm:lodash";
import { filesize as _filesize } from "npm:filesize";
const filesize = fsize.filesize;
const DB_PATH = "./backend/db/repo";
await ensureDir(DB_PATH);
async function processPDSRepos(ats, repos) {
for (const repo of repos.repos) {
const did = await ats.db.did.findOne({ did: repo.did });
if (!did) {
console.error(`DID ${repo.did} not exists?? (!!)`);
continue;
}
console.log(
repo.did,
repo.head,
did.repo?.root,
repo.head === did.repo?.root,
);
if (repo.head !== did.repo?.root) {
await saveRepo(ats, did);
}
}
}
async function getPDSRepos(item, cursor = null) {
console.log(`Updating PDS=${item.url} ..`);
const reposUrl = item.url + "/xrpc/com.atproto.sync.listRepos?limit=1000" +
(cursor ? "&cursor=" + cursor : "");
const reposRes = await fetch(reposUrl);
if (!reposRes.ok) {
console.error(`Bad request: ${reposRes.statusText}`);
return;
}
const repos = await reposRes.json();
return repos;
}
async function traversePDSRepos(ats, item, cursor = null) {
const repos = await getPDSRepos(item, cursor);
if (repos?.repos) {
await processPDSRepos(ats, repos);
if (repos.repos.length === 1000) {
await traversePDSRepos(ats, item, repos.cursor);
}
}
}
async function crawlNew(ats) {
const pds = await ats.db.pds.find({}).toArray();
const results = pooledMap(4, _.shuffle(pds), async (item) => {
if (!item.inspect.current || item.inspect.current.err) {
return null;
}
await traversePDSRepos(ats, item);
});
for await (const _ of results) {}
}
async function saveRepo(ats, didInfo) {
if (didInfo.skipRepo) {
return null;
}
const did = didInfo.did;
const signingKey = didInfo.revs[didInfo.revs.length - 1].operation
.verificationMethods?.atproto;
if (!signingKey) {
await ats.db.did.updateOne({ did }, {
$set: { repo: { error: "no signing key", time: new Date() } },
});
return;
}
const pds = didInfo.pds[0];
//console.log(`[${did}@${pds}] Getting repo ..`);
// fetch remote repo
const url = `${pds}/xrpc/com.atproto.sync.getRepo?did=${did}`;
console.log(url);
let repoRes;
try {
[repoRes] = await timeout(20 * 1000, fetch(url));
} catch (e) {
repoRes = { ok: false };
console.error(e);
await ats.db.did.updateOne({ did }, {
$set: { repo: { error: e.message, time: new Date() } },
});
return;
}
if (!repoRes.ok) {
let message = null;
if ([403, 500].includes(repoRes.status)) {
let err;
try {
err = await repoRes.json();
} catch {}
message = err?.message;
}
console.error(url, message);
await ats.db.did.updateOne({ did }, {
$set: { repo: { error: message, time: new Date() } },
});
return;
}
//console.log(`[${did}@${pds}] Inspecting CAR ..`);
const data = new Uint8Array(await repoRes.arrayBuffer());
console.log(`downloaded: ${url} [${filesize(data.length)}]`);
let repo;
try {
repo = await inspect(data, did, signingKey);
} catch (e) {
console.error(e);
await ats.db.did.updateOne({ did }, {
$set: { repo: { error: e.message, time: new Date() } },
});
return;
}
const carFn = join(DB_PATH, `${did}.car`);
await Deno.writeFile(carFn, data);
//console.log(`[${did}@${pds}] File written: ${carFn}`);
const indexFn = join(DB_PATH, `${did}.json`);
await Deno.writeTextFile(
indexFn,
JSON.stringify(
{ did, signingKey, pds, root: repo.root, commits: repo.commits },
null,
2,
),
);
//console.log(`[${did}@${pds}] File written: ${indexFn}`);
console.log(
`[${did}@${pds}] displayName=${
JSON.stringify(repo.profile?.displayName)
} [${filesize(repo.size)}]`,
);
/*console.log(
`[${did}@${pds}] Done [${
Object.keys(repo.collections).map(
(c) => [c + ":" + repo.collections[c]],
).join(", ")
}]`,
);*/
await ats.db.did.updateOne({ did }, { $set: { repo } });
//console.log(out)
}
async function crawl(ats) {
let expiry = new Date();
expiry.setDate(expiry.getDate() - 10);
@ -21,99 +169,13 @@ async function crawl(ats) {
$or: [{ "repo.time": { $lte: expiry } }, { "repo": { $exists: false } }],
}).limit(10000).toArray();
const results = pooledMap(1, _.shuffle(dids), async (didInfo) => {
const did = didInfo.did;
const signingKey = didInfo.revs[didInfo.revs.length - 1].operation
.verificationMethods?.atproto;
if (!signingKey) {
await ats.db.did.updateOne({ did }, {
$set: { repo: { error: "no signing key", time: new Date() } },
});
return;
}
const pds = didInfo.pds[0];
//console.log(`[${did}@${pds}] Getting repo ..`);
// fetch remote repo
const url = `${pds}/xrpc/com.atproto.sync.getRepo?did=${did}`;
console.log(url);
let repoRes;
try {
[repoRes] = await timeout(20 * 1000, fetch(url));
} catch (e) {
repoRes = { ok: false };
console.error(e);
await ats.db.did.updateOne({ did }, {
$set: { repo: { error: e.message, time: new Date() } },
});
return;
}
console.log(`downloaded: ${url}`);
if (!repoRes.ok) {
let message = null;
if ([403, 500].includes(repoRes.status)) {
let err;
try {
err = await repoRes.json();
} catch {}
message = err?.message;
}
console.error(url, message);
await ats.db.did.updateOne({ did }, {
$set: { repo: { error: message, time: new Date() } },
});
return;
}
//console.log(`[${did}@${pds}] Inspecting CAR ..`);
const data = new Uint8Array(await repoRes.arrayBuffer());
let repo;
try {
repo = await inspect(data, did, signingKey);
} catch (e) {
console.error(e);
await ats.db.did.updateOne({ did }, {
$set: { repo: { error: e.message, time: new Date() } },
});
return;
}
const carFn = join(DB_PATH, `${did}.car`);
await Deno.writeFile(carFn, data);
//console.log(`[${did}@${pds}] File written: ${carFn}`);
const indexFn = join(DB_PATH, `${did}.json`);
await Deno.writeTextFile(
indexFn,
JSON.stringify(
{ did, signingKey, pds, root: repo.root, commits: repo.commits },
null,
2,
),
);
//console.log(`[${did}@${pds}] File written: ${indexFn}`);
console.log(
`[${did}@${pds}] displayName=${
JSON.stringify(repo.profile?.displayName)
} [${filesize(repo.size)}]`,
);
/*console.log(
`[${did}@${pds}] Done [${
Object.keys(repo.collections).map(
(c) => [c + ":" + repo.collections[c]],
).join(", ")
}]`,
);*/
await ats.db.did.updateOne({ did }, { $set: { repo } });
//console.log(out)
const results = pooledMap(3, _.shuffle(dids), async (didInfo) => {
});
for await (const _ of results) {}
}
if (Deno.args[0] === "daemon") {
const wait = 60;
const wait = 60 * 15;
console.log("Initializing ATScan ..");
const ats = new ATScan();
@ -122,7 +184,7 @@ if (Deno.args[0] === "daemon") {
console.log("repo-crawler daemon started");
console.log("Performing initial crawl ..");
// initial crawl
await crawl(ats);
await crawlNew(ats);
console.log(`Initial crawl done`);
ats.debug = false;
console.log(`Processing events [wait=${wait}s] ..`);
@ -130,6 +192,6 @@ if (Deno.args[0] === "daemon") {
} else {
const ats = new ATScan({ debug: true });
await ats.init();
await crawl(ats);
await crawlNew(ats);
Deno.exit();
}

Zobrazit soubor

@ -10,6 +10,7 @@
getDIDProfileUrl,
filesize
} from '$lib/utils.js';
import { format } from 'echarts';
export let sourceData;
export let data;
@ -81,6 +82,9 @@
if (key === 'url') {
val = `/${row.did}`;
}
if (key === 'commits') {
val = row.repo?.commits ? formatNumber(row.repo.commits) : '-';
}
if (key === 'size') {
val =
'<div class="text-lg">' +
@ -97,10 +101,17 @@
}
$: tableSimple = {
// A list of heading labels.
head: ['', ['DID', 'did'], ['PDS (PLC)', 'pds'], ['Repo size', 'size'], ['Updated', 'lastMod']],
head: [
'',
['DID', 'did'],
['PDS (PLC)', 'pds'],
['Repo size', 'size'],
['Commits', 'commits'],
['Updated', 'lastMod']
],
body: customTableMapper(
sourceData || [],
['img', 'did', 'srcHost', 'size', 'lastMod'],
['img', 'did', 'srcHost', 'size', 'commits', 'lastMod'],
tableMap
),
meta: customTableMapper(sourceData || [], ['did_raw', 'url'], tableMap)

Zobrazit soubor

@ -46,7 +46,7 @@
val = `<a href="/pds/${val}" class=""><span class="font-semibold text-lg">${val}</span></a>`;
}
if (key === 'responseTime') {
val = row.responseTime ? '~' + row.responseTime + 'ms' : '-';
val = row.responseTime ? '~' + Math.round(row.responseTime) + 'ms' : '-';
}
if (key === 'location') {
val =

Zobrazit soubor

@ -103,9 +103,14 @@
region: 'Central Europe'
},
texas: {
location: 'Texas, US',
location: 'Dallas, TX, US',
country: 'us',
region: 'North America'
},
tokyo: {
location: 'Tokyo, JP',
country: 'jp',
region: 'Southeast Asia'
}
};
@ -216,6 +221,8 @@
>{#if item.inspect[crawlerId]?.err}
<i class="fa-solid fa-circle text-red-500 text-xs mr-1" /> Error:
<span class="code">{item.inspect[crawlerId].err}</span>
{:else if !item.inspect[crawlerId]}
<i class="fa-solid fa-circle text-gray-500 text-xs mr-1" /> Unknown
{:else}
<i class="fa-solid fa-circle text-green-500 text-xs mr-1" /> OK
{/if}</td