sitegen/framework/lib/async.ts
2025-07-07 20:58:02 -07:00

299 lines
7.3 KiB
TypeScript

const five_minutes = 5 * 60 * 1000;
interface QueueOptions<T, R> {
name: string;
fn: (item: T, spin: Spinner) => Promise<R>;
getItemText?: (item: T) => string;
maxJobs?: number;
passive?: boolean;
}
// Process multiple items in parallel, queue up as many.
export class Queue<T, R> {
#name: string;
#fn: (item: T, spin: Spinner) => Promise<R>;
#maxJobs: number;
#getItemText: (item: T) => string;
#passive: boolean;
#active: Spinner[] = [];
#queue: Array<[T] | [T, (result: R) => void, (err: unknown) => void]> = [];
#cachedProgress: Progress<{ active: Spinner[] }> | null = null;
#done: number = 0;
#total: number = 0;
#onComplete: (() => void) | null = null;
#estimate: number | null = null;
#errors: unknown[] = [];
constructor(options: QueueOptions<T, R>) {
this.#name = options.name;
this.#fn = options.fn;
this.#maxJobs = options.maxJobs ?? 5;
this.#getItemText = options.getItemText ?? defaultGetItemText;
this.#passive = options.passive ?? false;
}
cancel() {
const bar = this.#cachedProgress;
bar?.stop();
this.#queue = [];
}
get bar() {
const cached = this.#cachedProgress;
if (!cached) {
const bar = this.#cachedProgress = new Progress({
spinner: null,
text: ({ active }) => {
const now = performance.now();
let text = `[${this.#done}/${this.#total}] ${this.#name}`;
let n = 0;
for (const item of active) {
let itemText = "- " + item.format(now);
text += `\n` +
itemText.slice(0, Math.max(0, process.stdout.columns - 1));
if (n > 10) {
text += `\n ... + ${active.length - n} more`;
break;
}
n++;
}
return text;
},
props: {
active: [] as Spinner[],
},
});
bar.value = 0;
return bar;
}
return cached;
}
addReturn(args: T) {
this.#total += 1;
this.updateTotal();
if (this.#active.length >= this.#maxJobs) {
const { promise, resolve, reject } = Promise.withResolvers<R>();
this.#queue.push([args, resolve, reject]);
return promise;
}
return this.#run(args);
}
add(args: T) {
return this.addReturn(args).then(() => {}, () => {});
}
addMany(items: T[]) {
this.#total += items.length;
this.updateTotal();
const runNowCount = this.#maxJobs - this.#active.length;
const runNow = items.slice(0, runNowCount);
const runLater = items.slice(runNowCount);
this.#queue.push(...runLater.reverse().map<[T]>((x) => [x]));
runNow.map((item) => this.#run(item).catch(() => {}));
}
async #run(args: T): Promise<R> {
const bar = this.bar;
const itemText = this.#getItemText(args);
const spinner = new Spinner(itemText);
spinner.stop();
(spinner as any).redraw = () => (bar as any).redraw();
const active = this.#active;
try {
active.unshift(spinner);
bar.props = { active };
// console.log(this.#name + ": " + itemText);
const result = await this.#fn(args, spinner);
this.#done++;
return result;
} catch (err) {
if (err && typeof err === "object") {
(err as any).job = itemText;
}
this.#errors.push(err);
console.error(util.inspect(err, false, Infinity, true));
throw err;
} finally {
active.splice(active.indexOf(spinner), 1);
bar.props = { active };
bar.value = this.#done;
// Process next item
const next = this.#queue.shift();
if (next) {
const args = next[0];
this.#run(args)
.then((result) => next[1]?.(result))
.catch((err) => next[2]?.(err));
} else if (this.#active.length === 0) {
if (this.#passive) {
this.bar.stop();
this.#cachedProgress = null;
}
this.#onComplete?.();
}
}
}
updateTotal() {
const bar = this.bar;
bar.total = Math.max(this.#total, this.#estimate ?? 0);
}
set estimate(e: number) {
this.#estimate = e;
if (this.#cachedProgress) {
this.updateTotal();
}
}
async done(o?: { method: "success" | "stop" }) {
if (this.#active.length === 0) {
this.#end(o);
return;
}
const { promise, resolve } = Promise.withResolvers<void>();
this.#onComplete = resolve;
await promise;
this.#end(o);
}
#end(
{ method = this.#passive ? "stop" : "success" }: {
method?: "success" | "stop";
} = {},
) {
const bar = this.#cachedProgress;
if (this.#errors.length > 0) {
if (bar) bar.stop();
throw new AggregateError(
this.#errors,
this.#errors.length + " jobs failed in '" + this.#name + "'",
);
}
if (bar) bar[method]();
}
get active(): boolean {
return this.#active.length !== 0;
}
[Symbol.dispose]() {
if (this.active) {
this.cancel();
}
}
}
const cwd = process.cwd();
function defaultGetItemText(item: unknown) {
let itemText = "";
if (typeof item === "string") {
itemText = item;
} else if (typeof item === "object" && item !== null) {
const { path, label, id } = item as any;
itemText = label ?? path ?? id ?? JSON.stringify(item);
} else {
itemText = JSON.stringify(item);
}
if (itemText.startsWith(cwd)) {
itemText = path.relative(cwd, itemText);
}
return itemText;
}
export class OnceMap<T> {
private ongoing = new Map<string, Promise<T>>();
get(key: string, compute: () => Promise<T>) {
if (this.ongoing.has(key)) {
return this.ongoing.get(key)!;
}
const result = compute();
this.ongoing.set(key, result);
return result;
}
}
interface ARCEValue<T> {
value: T;
[Symbol.dispose]: () => void;
}
export function RefCountedExpirable<T>(
init: () => Promise<T>,
deinit: (value: T) => void,
expire: number = five_minutes,
): () => Promise<ARCEValue<T>> {
let refs = 0;
let item: ARCEValue<T> | null = null;
let loading: Promise<ARCEValue<T>> | null = null;
let timer: ReturnType<typeof setTimeout> | null = null;
function deref() {
ASSERT(item !== null);
if (--refs !== 0) return;
ASSERT(timer === null);
timer = setTimeout(() => {
ASSERT(refs === 0);
ASSERT(loading === null);
ASSERT(item !== null);
deinit(item.value);
item = null;
timer = null;
}, expire);
}
return async function () {
if (timer !== null) {
clearTimeout(timer);
timer = null;
}
if (item !== null) {
refs++;
return item;
}
if (loading !== null) {
refs++;
return loading;
}
const p = Promise.withResolvers<ARCEValue<T>>();
loading = p.promise;
try {
const value = await init();
item = { value, [Symbol.dispose]: deref };
refs++;
p.resolve(item);
return item;
} catch (e) {
p.reject(e);
throw e;
} finally {
loading = null;
}
};
}
export function once<T>(fn: () => Promise<T>): () => Promise<T> {
let result: T | Promise<T> | null = null;
return async () => {
if (result) return result;
result = await fn();
return result;
};
}
import { Progress } from "@paperclover/console/Progress";
import { Spinner } from "@paperclover/console/Spinner";
import * as path from "node:path";
import process from "node:process";
import * as util from "node:util";