const five_minutes = 5 * 60 * 1000; interface QueueOptions { name: string; fn: (item: T, spin: Spinner) => Promise; getItemText?: (item: T) => string; maxJobs?: number; passive?: boolean; } // Process multiple items in parallel, queue up as many. export class Queue { #name: string; #fn: (item: T, spin: Spinner) => Promise; #maxJobs: number; #getItemText: (item: T) => string; #passive: boolean; #active: Spinner[] = []; #queue: Array<[T] | [T, (result: R) => void, (err: unknown) => void]> = []; #cachedProgress: Progress<{ active: Spinner[] }> | null = null; #done: number = 0; #total: number = 0; #onComplete: (() => void) | null = null; #estimate: number | null = null; #errors: unknown[] = []; constructor(options: QueueOptions) { this.#name = options.name; this.#fn = options.fn; this.#maxJobs = options.maxJobs ?? 5; this.#getItemText = options.getItemText ?? defaultGetItemText; this.#passive = options.passive ?? false; } cancel() { const bar = this.#cachedProgress; bar?.stop(); this.#queue = []; } get bar() { const cached = this.#cachedProgress; if (!cached) { const bar = this.#cachedProgress = new Progress({ spinner: null, text: ({ active }) => { const now = performance.now(); let text = `[${this.#done}/${this.#total}] ${this.#name}`; let n = 0; for (const item of active) { let itemText = "- " + item.format(now); text += `\n` + itemText.slice(0, Math.max(0, process.stdout.columns - 1)); if (n > 10) { text += `\n ... + ${active.length - n} more`; break; } n++; } return text; }, props: { active: [] as Spinner[], }, }); bar.value = 0; return bar; } return cached; } addReturn(args: T) { this.#total += 1; this.updateTotal(); if (this.#active.length > this.#maxJobs) { const { promise, resolve, reject } = Promise.withResolvers(); this.#queue.push([args, resolve, reject]); return promise; } return this.#run(args); } add(args: T) { return this.addReturn(args).then(() => {}, () => {}); } addMany(items: T[]) { this.#total += items.length; this.updateTotal(); const runNowCount = this.#maxJobs - this.#active.length; const runNow = items.slice(0, runNowCount); const runLater = items.slice(runNowCount); this.#queue.push(...runLater.reverse().map<[T]>((x) => [x])); runNow.map((item) => this.#run(item).catch(() => {})); } async #run(args: T): Promise { const bar = this.bar; const itemText = this.#getItemText(args); const spinner = new Spinner(itemText); spinner.stop(); (spinner as any).redraw = () => (bar as any).redraw(); const active = this.#active; try { active.unshift(spinner); bar.props = { active }; console.log(this.#name + ": " + itemText); const result = await this.#fn(args, spinner); this.#done++; return result; } catch (err) { if (err && typeof err === "object") { (err as any).job = itemText; } this.#errors.push(err); throw err; } finally { active.splice(active.indexOf(spinner), 1); bar.props = { active }; bar.value = this.#done; // Process next item const next = this.#queue.shift(); if (next) { const args = next[0]; this.#run(args) .then((result) => next[1]?.(result)) .catch((err) => next[2]?.(err)); } else if (this.#active.length === 0) { if (this.#passive) { this.bar.stop(); this.#cachedProgress = null; } this.#onComplete?.(); } } } updateTotal() { const bar = this.bar; bar.total = Math.max(this.#total, this.#estimate ?? 0); } set estimate(e: number) { this.#estimate = e; if (this.#cachedProgress) { this.updateTotal(); } } async done(o?: { method: "success" | "stop" }) { if (this.#active.length === 0) { this.#end(o); return; } const { promise, resolve } = Promise.withResolvers(); this.#onComplete = resolve; await promise; this.#end(o); } #end( { method = this.#passive ? "stop" : "success" }: { method?: "success" | "stop"; } = {}, ) { const bar = this.#cachedProgress; if (this.#errors.length > 0) { if (bar) bar.stop(); throw new AggregateError( this.#errors, this.#errors.length + " jobs failed in '" + this.#name + "'", ); } if (bar) bar[method](); } get active(): boolean { return this.#active.length !== 0; } [Symbol.dispose]() { if (this.active) { this.cancel(); } } } const cwd = process.cwd(); function defaultGetItemText(item: unknown) { let itemText = ""; if (typeof item === "string") { itemText = item; } else if (typeof item === "object" && item !== null) { const { path, label, id } = item as any; itemText = label ?? path ?? id ?? JSON.stringify(item); } else { itemText = JSON.stringify(item); } if (itemText.startsWith(cwd)) { itemText = path.relative(cwd, itemText); } return itemText; } export class OnceMap { private ongoing = new Map>(); get(key: string, compute: () => Promise) { if (this.ongoing.has(key)) { return this.ongoing.get(key)!; } const result = compute(); this.ongoing.set(key, result); return result; } } interface ARCEValue { value: T; [Symbol.dispose]: () => void; } export function RefCountedExpirable( init: () => Promise, deinit: (value: T) => void, expire: number = five_minutes, ): () => Promise> { let refs = 0; let item: ARCEValue | null = null; let loading: Promise> | null = null; let timer: ReturnType | null = null; function deref() { ASSERT(item !== null); if (--refs !== 0) return; ASSERT(timer === null); timer = setTimeout(() => { ASSERT(refs === 0); ASSERT(loading === null); ASSERT(item !== null); deinit(item.value); item = null; timer = null; }, expire); } return async function () { if (timer !== null) { clearTimeout(timer); timer = null; } if (item !== null) { refs++; return item; } if (loading !== null) { refs++; return loading; } const p = Promise.withResolvers>(); loading = p.promise; try { const value = await init(); item = { value, [Symbol.dispose]: deref }; refs++; p.resolve(item); return item; } catch (e) { p.reject(e); throw e; } finally { loading = null; } }; } export function once(fn: () => Promise): () => Promise { let result: T | Promise | null = null; return async () => { if (result) return result; result = await fn(); return result; }; } import { Progress } from "@paperclover/console/Progress"; import { Spinner } from "@paperclover/console/Spinner"; import * as path from "node:path"; import process from "node:process";