206 lines
5.3 KiB
TypeScript
206 lines
5.3 KiB
TypeScript
interface QueueOptions<T, R> {
|
|
name: string;
|
|
fn: (item: T, spin: Spinner) => Promise<R>;
|
|
getItemText?: (item: T) => string;
|
|
maxJobs?: number;
|
|
passive?: boolean;
|
|
}
|
|
|
|
// Process multiple items in parallel, queue up as many.
|
|
export class Queue<T, R> {
|
|
#name: string;
|
|
#fn: (item: T, spin: Spinner) => Promise<R>;
|
|
#maxJobs: number;
|
|
#getItemText: (item: T) => string;
|
|
#passive: boolean;
|
|
|
|
#active: Spinner[] = [];
|
|
#queue: Array<[T] | [T, (result: R) => void, (err: unknown) => void]> = [];
|
|
|
|
#cachedProgress: Progress<{ active: Spinner[] }> | null = null;
|
|
#done: number = 0;
|
|
#total: number = 0;
|
|
#onComplete: (() => void) | null = null;
|
|
#estimate: number | null = null;
|
|
#errors: unknown[] = [];
|
|
|
|
constructor(options: QueueOptions<T, R>) {
|
|
this.#name = options.name;
|
|
this.#fn = options.fn;
|
|
this.#maxJobs = options.maxJobs ?? 5;
|
|
this.#getItemText = options.getItemText ?? defaultGetItemText;
|
|
this.#passive = options.passive ?? false;
|
|
}
|
|
|
|
get bar() {
|
|
const cached = this.#cachedProgress;
|
|
if (!cached) {
|
|
const bar = this.#cachedProgress = new Progress({
|
|
spinner: null,
|
|
text: ({ active }) => {
|
|
const now = performance.now();
|
|
let text = `[${this.#done}/${this.#total}] ${this.#name}`;
|
|
let n = 0;
|
|
for (const item of active) {
|
|
let itemText = "- " + item.format(now);
|
|
text += `\n` +
|
|
itemText.slice(0, Math.max(0, process.stdout.columns - 1));
|
|
if (n > 10) {
|
|
text += `\n ... + ${active.length - n} more`;
|
|
break;
|
|
}
|
|
n++;
|
|
}
|
|
return text;
|
|
},
|
|
props: {
|
|
active: [] as Spinner[],
|
|
},
|
|
});
|
|
bar.value = 0;
|
|
return bar;
|
|
}
|
|
return cached;
|
|
}
|
|
|
|
add(args: T) {
|
|
this.#total += 1;
|
|
this.updateTotal();
|
|
if (this.#active.length > this.#maxJobs) {
|
|
const { promise, resolve, reject } = Promise.withResolvers<R>();
|
|
this.#queue.push([args, resolve, reject]);
|
|
return promise;
|
|
}
|
|
return this.#run(args);
|
|
}
|
|
|
|
addMany(items: T[]) {
|
|
this.#total += items.length;
|
|
this.updateTotal();
|
|
|
|
const runNowCount = this.#maxJobs - this.#active.length;
|
|
const runNow = items.slice(0, runNowCount);
|
|
const runLater = items.slice(runNowCount);
|
|
this.#queue.push(...runLater.reverse().map<[T]>((x) => [x]));
|
|
runNow.map((item) => this.#run(item).catch(() => {}));
|
|
}
|
|
|
|
async #run(args: T): Promise<R> {
|
|
const bar = this.bar;
|
|
const itemText = this.#getItemText(args);
|
|
const spinner = new Spinner(itemText);
|
|
spinner.stop();
|
|
const active = this.#active;
|
|
try {
|
|
active.unshift(spinner);
|
|
bar.props = { active };
|
|
const result = await this.#fn(args, spinner);
|
|
this.#done++;
|
|
return result;
|
|
} catch (err) {
|
|
if (err && typeof err === "object") {
|
|
(err as any).job = itemText;
|
|
}
|
|
this.#errors.push(err);
|
|
throw err;
|
|
} finally {
|
|
active.splice(active.indexOf(spinner), 1);
|
|
bar.props = { active };
|
|
bar.value = this.#done;
|
|
|
|
// Process next item
|
|
const next = this.#queue.shift();
|
|
if (next) {
|
|
const args = next[0];
|
|
this.#run(args)
|
|
.then((result) => next[1]?.(result))
|
|
.catch((err) => next[2]?.(err));
|
|
} else if (this.#active.length === 0) {
|
|
if (this.#passive) {
|
|
this.bar.stop();
|
|
this.#cachedProgress = null;
|
|
}
|
|
this.#onComplete?.();
|
|
}
|
|
}
|
|
}
|
|
|
|
updateTotal() {
|
|
const bar = this.bar;
|
|
bar.total = Math.max(this.#total, this.#estimate ?? 0);
|
|
}
|
|
|
|
set estimate(e: number) {
|
|
this.#estimate = e;
|
|
if (this.#cachedProgress) {
|
|
this.updateTotal();
|
|
}
|
|
}
|
|
|
|
async done(o: { method: "success" | "stop" }) {
|
|
if (this.#active.length === 0) {
|
|
this.#end(o);
|
|
return;
|
|
}
|
|
|
|
const { promise, resolve } = Promise.withResolvers<void>();
|
|
this.#onComplete = resolve;
|
|
await promise;
|
|
this.#end(o);
|
|
}
|
|
|
|
#end(
|
|
{ method = this.#passive ? "stop" : "success" }: {
|
|
method: "success" | "stop";
|
|
},
|
|
) {
|
|
const bar = this.#cachedProgress;
|
|
if (this.#errors.length > 0) {
|
|
if (bar) bar.stop();
|
|
throw new AggregateError(
|
|
this.#errors,
|
|
this.#errors.length + " jobs failed in '" + this.#name + "'",
|
|
);
|
|
}
|
|
|
|
if (bar) bar[method]();
|
|
}
|
|
}
|
|
|
|
const cwd = process.cwd();
|
|
function defaultGetItemText(item: unknown) {
|
|
let itemText = "";
|
|
if (typeof item === "string") {
|
|
itemText = item;
|
|
} else if (typeof item === "object" && item !== null) {
|
|
const { path, label, id } = item as any;
|
|
itemText = label ?? path ?? id ?? JSON.stringify(item);
|
|
} else {
|
|
itemText = JSON.stringify(item);
|
|
}
|
|
|
|
if (itemText.startsWith(cwd)) {
|
|
itemText = path.relative(cwd, itemText);
|
|
}
|
|
return itemText;
|
|
}
|
|
|
|
export class OnceMap<T> {
|
|
private ongoing = new Map<string, Promise<T>>();
|
|
|
|
get(key: string, compute: () => Promise<T>) {
|
|
if (this.ongoing.has(key)) {
|
|
return this.ongoing.get(key)!;
|
|
}
|
|
|
|
const result = compute();
|
|
this.ongoing.set(key, result);
|
|
result.finally(() => this.ongoing.delete(key));
|
|
return result;
|
|
}
|
|
}
|
|
|
|
import { Progress } from "@paperclover/console/Progress";
|
|
import { Spinner } from "@paperclover/console/Spinner";
|
|
import * as path from "node:path";
|
|
import process from "node:process";
|