indexer rework & use redis

This commit is contained in:
tree 2023-07-12 16:44:25 +00:00
rodič 24f6e61dd6
revize a8c48dca4c
2 změnil soubory, kde provedl 99 přidání a 56 odebrání

Zobrazit soubor

@ -1,61 +1,92 @@
import { ATScan } from "./lib/atscan.js";
import whoiser from "npm:whoiser";
const wait = 60 * 1;
const indexers = {
pdsIndex: {
interval: 60 * 1000, // 1 min
handler: async (ats) => {
for (const pds of await ats.db.pds.find().toArray()) {
const didsCount = await ats.db.did.countDocuments({
"pds": { $in: [pds.url] },
});
const host = pds.url.replace(/^https?:\/\//, "");
async function index(ats) {
for (const pds of await ats.db.pds.find().toArray()) {
const didsCount = await ats.db.did.countDocuments({
"pds": { $in: [pds.url] },
});
const host = pds.url.replace(/^https?:\/\//, "");
const stages = [
{ $match: { pds: { $in: [pds.url] } } },
{
$group: {
_id: "$groupField",
sum: {
$sum: "$repo.size",
const stages = [
{ $match: { pds: { $in: [pds.url] } } },
{
$group: {
_id: "$groupField",
sum: {
$sum: "$repo.size",
},
},
},
},
},
];
const sizeRes = await ats.db.did.aggregate(stages).toArray();
const size = sizeRes[0].sum;
];
const sizeRes = await ats.db.did.aggregate(stages).toArray();
const size = sizeRes[0].sum;
await ats.db.pds.updateOne({ url: pds.url }, { $set: { didsCount, size } });
await ats.writeInflux("pds_dids_count", "intField", didsCount, [[
"pds",
host,
]]);
await ats.writeInflux("pds_size", "intField", size, [["pds", host]]);
ats.nats.publish(
"ats.service.pds.update",
ats.JSONCodec.encode({ url: pds.url }),
);
}
console.log("indexer round finished");
//console.log(await whoiser("dev.otaso-sky.blue"));
}
if (Deno.args[0] === "daemon") {
console.log("Initializing ATScan ..");
const ats = new ATScan({ enableNats: true });
ats.debug = true;
await ats.init();
console.log("indexer daemon started");
console.log("Performing initial index round ..");
// initial crawl
await index(ats);
console.log(`Initial index round done`);
ats.debug = false;
console.log(`Processing [wait=${wait}s] ..`);
setInterval(() => index(ats), wait * 1000);
} else {
const ats = new ATScan({ enableNats: true, debug: true });
await ats.init();
await index(ats);
Deno.exit(0);
await ats.db.pds.updateOne({ url: pds.url }, {
$set: { didsCount, size },
});
await ats.writeInflux("pds_dids_count", "intField", didsCount, [[
"pds",
host,
]]);
await ats.writeInflux("pds_size", "intField", size, [["pds", host]]);
ats.nats.publish(
"ats.service.pds.update",
ats.JSONCodec.encode({ url: pds.url }),
);
}
//console.log("indexer round finished");
},
},
bgsIndex: {
interval: 10 * 1000,
handler: async (ats) => {
await Promise.all(ats.ecosystem.data["bgs-instances"].map(async (bgs) => {
const host = bgs.url.replace(/^https?:\/\//, "");
const query = `
from(bucket: "ats-nodes")
|> range(start: -1m)
|> filter(fn: (r) => r["_measurement"] == "firehose_event")
|> filter(fn: (r) => r["server"] == "snowden")
|> filter(fn: (r) => r["bgs"] == "${host}")
|> filter(fn: (r) => r["_field"] == "value")
|> group(columns: ["_time"], mode:"by")
|> sum()
|> group()
|> derivative(unit: 1s, nonNegative: true)
|> mean()`;
const data = await ats.influxQuery.collectRows(query);
const value = data[0]?._value;
if (value) {
await ats.redis.HSET(`ats:bgs:${host}`, "ops", value); // 'OK'
}
}));
},
},
};
// start
console.log("Initializing ATScan ..");
const ats = new ATScan({ enableNats: true });
ats.debug = true;
await ats.init();
console.log("indexer daemon started");
// initial crawl
console.log("Performing initial index round ..");
await Promise.all(Object.keys(indexers).map((k) => indexers[k].handler(ats)));
console.log(`Initial index round done`);
// intervals
const intervals = [];
for (const key of Object.keys(indexers)) {
const indexer = indexers[key];
const interval = indexer.interval || 60 * 1000;
console.log(`Setting up indexer: ${key} interval=${interval / 1000}s`);
intervals.push(setInterval(() => indexer.handler(ats), interval));
}

Zobrazit soubor

@ -3,6 +3,7 @@ import { load as envLoad } from "https://deno.land/std@0.192.0/dotenv/mod.ts";
import { parse, stringify } from "https://deno.land/std@0.192.0/yaml/mod.ts";
import { MongoClient } from "npm:mongodb";
import { InfluxDB } from "npm:@influxdata/influxdb-client";
import { createClient as redisCreateClient } from "npm:redis@^4.6";
import { makeQueues } from "./queues.js";
import {
connect as NATSConnect,
@ -16,18 +17,28 @@ export class ATScan {
this.debug = opts.debug;
this.enableQueues = opts.enableQueues || false;
this.enableNats = opts.enableNats || false;
console.log(this.enableQueues);
//console.log(this.enableQueues);
}
async init() {
this.env = Object.assign(Deno.env.toObject(), await envLoad());
await this.ecosystemLoad();
// redis
const redisUrl = "redis://localhost:6379";
this.redis = redisCreateClient({
url: redisUrl,
pingInterval: 1000,
});
await this.redis.connect();
console.log(`Connected to Redis: ${redisUrl}`);
// influxdb
const influxConfig = {
url: this.env.INFLUXDB_HOST,
token: this.env.INFLUXDB_TOKEN,
};
this.influx = new InfluxDB(influxConfig);
this.influxQuery = this.influx.getQueryApi(this.env.INFLUXDB_ORG);
// monbodb
this.client = new MongoClient(this.env.MONGODB_URL);
await this.client.connect();
this.dbRaw = this.client.db("test");
@ -37,6 +48,7 @@ export class ATScan {
meta: this.dbRaw.collection("meta"),
};
console.log(`Connected to MongoDB: ${this.env.MONGODB_URL}`);
// nats - optional
if (this.enableNats) {
await (async () => {
this.nats = await NATSConnect({