add profile info to did page, update pds sorting

This commit is contained in:
tree 2023-07-06 22:58:42 +00:00
rodič 8693a52a71
revize 548b9c5906
10 změnil soubory, kde provedl 114 přidání a 30 odebrání

Zobrazit soubor

@ -8,6 +8,8 @@ const ats = new ATScan({ enableQueues: true, enableNats: true });
await ats.init();
ats.startDaemon();
const servers = ["local", "texas", "tokyo"];
if (Number(ats.env.PORT) === 6677) {
const didUpdatedSub = ats.nats.subscribe("ats.service.plc.did.*");
(async () => {
@ -21,6 +23,7 @@ if (Number(ats.env.PORT) === 6677) {
continue;
}
Object.assign(item, prepareObject("did", item));
item.revs = [item.revs[item.revs.length - 1]];
ats.nats.publish(
`ats.api.did.${
sub === "ats.service.plc.did.create" ? "create" : "update"
@ -87,15 +90,31 @@ function findDIDFed(item) {
return ff ? ff.id : null;
}
function getPDSStatus(item) {
if (!item.inspect) {
return "unknown";
}
const bare = Object.keys(item.inspect).filter((k) => servers.includes(k));
if (bare.length === 0) {
return "unknown";
}
const offlineNum = bare.reduce(
(acc, k) => acc += item.inspect[k].err ? 1 : 0,
0,
);
if (bare.length === offlineNum) {
return "offline";
}
return offlineNum > 0 ? "degraded" : "online";
}
function prepareObject(type, item) {
switch (type) {
case "pds":
item.host = item.url.replace(/^https?:\/\//, "");
item.fed = findPDSFed(item);
item.err = Boolean(item.inspect?.current?.err);
item.status = !item.inspect
? "unknown"
: (item.inspect?.current.err ? "offline" : "online");
item.status = getPDSStatus(item);
const respTimes = item.inspect
? Object.keys(item.inspect).filter((k) =>
@ -311,6 +330,12 @@ router
metrics[`queue_waiting{name="${queueName}"}`] = [
await queue.getWaitingCount(),
];
metrics[`queue_waiting_children{name="${queueName}"}`] = [
await queue.getWaitingChildrenCount(),
];
metrics[`queue_prioritized{name="${queueName}"}`] = [
await queue.getPrioritizedCount(),
];
}
ctx.response.body = Object.keys(metrics).map((m) => {
const [data, help, type] = metrics[m];

Zobrazit soubor

@ -3,10 +3,15 @@ import {
subscribeRepos,
//SubscribeReposMessage,
} from "npm:atproto-firehose";
import { ATScan } from "./lib/atscan.js";
import * as atprotoApi from "npm:@atproto/api";
const { AppBskyActorProfile } = atprotoApi.default;
const ats = new ATScan({ enableQueues: true });
ats.debug = true;
await ats.init();
const client = subscribeRepos(`wss://bsky.social`, { decodeRepoOps: true });
client.on("message", (m) => {
if (ComAtprotoSyncSubscribeRepos.isHandle(m)) {
@ -14,28 +19,24 @@ client.on("message", (m) => {
}
if (ComAtprotoSyncSubscribeRepos.isCommit(m)) {
//console.log(m)
m.ops.forEach((op) => {
console.log(op.payload?.$type);
if (!op.payload) {
console.log(m);
}
m.ops.forEach(async (op) => {
if (op.payload?.$type === "app.bsky.actor.profile") {
if (AppBskyActorProfile.isRecord(op.payload)) {
console.log(`Profile updated: ${m.repo}`);
const did = m.repo;
const didObj = await ats.db.did.findOne({ did });
if (!didObj) {
return;
}
await ats.queues.repoSnapshot.add(did, didObj, {
//priority: 1,
jobId: did,
priority: 10,
delay: 15,
});
console.log(`Added to queue: ${did}`);
//console.log(`Profile updated: ${m.repo}`);
}
}
});
}
if (ComAtprotoSyncSubscribeRepos.isHandle(m)) {
console.log("handle update", m);
}
if (ComAtprotoSyncSubscribeRepos.isMigrate(m)) {
console.log("migrate update", m);
}
if (ComAtprotoSyncSubscribeRepos.isTombstone(m)) {
console.log("tombstone update", m);
}
if (ComAtprotoSyncSubscribeRepos.isInfo(m)) {
console.log("info update", m);
}
});

Zobrazit soubor

@ -87,7 +87,11 @@ async function processPlcExport(ats, plc, after = null) {
ats.JSONCodec.encode({ did: obj.did }),
);
}
await ats.queues.repoSnapshot.add(obj.did, obj, { jobId: obj.did });
await ats.queues.repoSnapshot.add(obj.did, obj, {
jobId: obj.did,
delay: 5000,
priority: 1,
});
// update pds
if (pdsUrl) {
@ -115,10 +119,11 @@ async function processPlcExport(ats, plc, after = null) {
});
}
// update PDS stats
const didsCount = await ats.db.did.countDocuments({
/*const didsCount = await ats.db.did.countDocuments({
"pds": { $in: [pdsUrl] },
});
await ats.db.pds.updateOne({ url: pdsUrl }, { $set: { didsCount } });
*/
ats.nats.publish(
"ats.service.pds.update",
ats.JSONCodec.encode({ url: pdsUrl }),
@ -138,7 +143,7 @@ async function processPlcExport(ats, plc, after = null) {
}
if (Deno.args[0] === "daemon") {
const wait = 15;
const wait = 10;
console.log("Initializing ATScan ..");
const ats = new ATScan({ enableNats: true, enableQueues: true });

Zobrazit soubor

@ -31,10 +31,11 @@ async function processPDSRepos(ats, repos, item) {
continue;
}
}
await ats.queues.repoSnapshot.add(repo.did, didObj, {
//priority: 1,
const task = await ats.queues.repoSnapshot.add(repo.did, didObj, {
priority: 100,
jobId: repo.did,
});
//console.log(task)
count++;
}
}
@ -77,6 +78,9 @@ async function traversePDSRepos(ats, item, cursor = null) {
async function crawlNew(ats) {
const pds = await ats.db.pds.find({}).toArray();
const results = pooledMap(CONCURRENCY, _.shuffle(pds), async (item) => {
if (item.url === "https://bsky.social") {
return null;
}
if (!item.inspect.current || item.inspect.current.err) {
return null;
}

Zobrazit soubor

@ -1,6 +1,6 @@
{
"name": "atscan-fe",
"version": "0.7.0-alpha",
"version": "0.7.1-alpha",
"private": true,
"scripts": {
"dev": "vite dev",

Zobrazit soubor

@ -63,6 +63,8 @@ export function getPDSStatus(row) {
const [color, ico, text] =
row.status === 'unknown'
? ['text-gray-500', null, 'Status unknown']
: row.status === 'degraded'
? ['text-orange-500', null, 'Partially degraded']
: row.status === 'offline'
? ['text-red-500', null, 'Offline']
: ['text-green-500', null, 'Online'];

Zobrazit soubor

@ -99,7 +99,9 @@
<svelte:fragment slot="header">
<div class="h-1.5 bg-surface-100-800-token">
{#if $navigating}
<div class="w-full"><ProgressBar meter="bg-primary-500" track="bg-surface-100-800-token" /></div>
<div class="w-full">
<ProgressBar meter="bg-primary-500" track="bg-surface-100-800-token" />
</div>
{/if}
</div>
<!-- App Bar -->

Zobrazit soubor

@ -107,6 +107,46 @@
</div>
</div>
{#if item.repo && item.repo.profile}
<div class="table-container">
<!-- Native Table Element -->
<table class="table table-hover">
<tbody>
<tr>
<th class="text-right md:w-64">Name</th>
<td>{item.repo.profile.displayName || '-'}</td>
</tr>
<tr>
<th class="text-right">Description</th>
<td>{item.repo.profile.description || '-'}</td>
</tr>
{#if item.repo.profile.avatar}
<tr>
<th class="text-right">Avatar</th>
<td
><img
src={`${item.pds[0]}/xrpc/com.atproto.sync.getBlob?did=${item.did}&cid=${item.repo.profile.avatar.ref.$link}`}
class="w-40"
/></td
>
</tr>
{/if}
{#if item.repo.profile.banner}
<tr>
<th class="text-right">Banner</th>
<td
><img
src={`${item.pds[0]}/xrpc/com.atproto.sync.getBlob?did=${item.did}&cid=${item.repo.profile.banner.ref.$link}`}
class="w-40"
/></td
>
</tr>
{/if}
</tbody>
</table>
</div>
{/if}
<h2 class="h2">History <span class="font-normal text-2xl">({sourceData.length})</span></h2>
<Table source={historyTable} />

Zobrazit soubor

@ -108,7 +108,12 @@
}
base = orderBy(base, [sortKey], [sortDirection === -1 ? 'desc' : 'asc']);
} else {
base = orderBy(base, ['env', 'err', 'didsCount'], ['asc', 'asc', 'desc']);
base = base.map((x) => {
x.statusSort =
x.status === 'online' ? 0 : x.status === 'degraded' ? 1 : x.status === 'offline' ? 2 : 3;
return x;
});
base = orderBy(base, ['env', 'statusSort', 'didsCount'], ['asc', 'asc', 'desc']);
}
return base;
}

Zobrazit soubor

@ -63,7 +63,7 @@ module.exports = {
script: "./backend/repo-worker.js",
interpreter: "mullvad-exclude",
interpreterArgs: "deno run --unstable --allow-net --allow-read --allow-write --allow-env --allow-ffi --allow-sys ./backend/repo-worker.js",
instances: 6,
instances: 4,
}, {
name: "bull-ui",
script: "index.js",