diff --git a/src/databases/Postgres.ts b/src/databases/Postgres.ts index cdcc3a1..5b384e8 100644 --- a/src/databases/Postgres.ts +++ b/src/databases/Postgres.ts @@ -1,10 +1,10 @@ import { Logger } from "../utils/logger"; import { IDatabase, QueryOption, QueryType } from "./IDatabase"; -import { Client, Pool, types } from "pg"; +import { Client, Pool, QueryResult, types } from "pg"; import fs from "fs"; import { CustomPostgresConfig, CustomPostgresReadOnlyConfig } from "../types/config.model"; -import { promiseTimeout } from "../utils/promiseTimeout"; +import { timeoutPomise, PromiseWithState, savePromiseState, nextFulfilment } from "../utils/promiseTimeout"; // return numeric (pg_type oid=1700) as float types.setTypeParser(1700, function(val) { @@ -104,6 +104,8 @@ export class Postgres implements IDatabase { Logger.debug(`prepare (postgres): type: ${type}, query: ${query}, params: ${params}`); + const pendingQueries: PromiseWithState>[] = []; + let tries = 0; let lastPool: Pool = null; do { @@ -111,7 +113,11 @@ export class Postgres implements IDatabase { try { lastPool = this.getPool(type, options); - const queryResult = await promiseTimeout(lastPool.query({ text: query, values: params }), options.useReplica ? this.readTimeout : null); + + pendingQueries.push(savePromiseState(lastPool.query({ text: query, values: params }))); + const currentPromises = [...pendingQueries]; + if (options.useReplica) currentPromises.push(savePromiseState(timeoutPomise(this.readTimeout))); + const queryResult = await nextFulfilment(currentPromises); switch (type) { case "get": { diff --git a/src/routes/getSkipSegments.ts b/src/routes/getSkipSegments.ts index 34d410c..5c2dd6a 100644 --- a/src/routes/getSkipSegments.ts +++ b/src/routes/getSkipSegments.ts @@ -11,7 +11,7 @@ import { Logger } from "../utils/logger"; import { QueryCacher } from "../utils/queryCacher"; import { getReputation } from "../utils/reputation"; import { getService } from "../utils/getService"; -import { promiseTimeout } from "../utils/promiseTimeout"; +import { promiseOrTimeout } from "../utils/promiseTimeout"; async function prepareCategorySegments(req: Request, videoID: VideoID, service: Service, segments: DBSegment[], cache: SegmentCache = { shadowHiddenSegmentIPs: {} }, useCache: boolean): Promise { @@ -40,7 +40,7 @@ async function prepareCategorySegments(req: Request, videoID: VideoID, service: const fetchData = () => privateDB.prepare("all", 'SELECT "hashedIP" FROM "sponsorTimes" WHERE "videoID" = ? AND "timeSubmitted" = ? AND "service" = ?', [videoID, segment.timeSubmitted, service], { useReplica: true }) as Promise<{ hashedIP: HashedIP }[]>; try { - cache.shadowHiddenSegmentIPs[videoID][segment.timeSubmitted] = await promiseTimeout(QueryCacher.get(fetchData, shadowHiddenIPKey(videoID, segment.timeSubmitted, service)), 150); + cache.shadowHiddenSegmentIPs[videoID][segment.timeSubmitted] = await promiseOrTimeout(QueryCacher.get(fetchData, shadowHiddenIPKey(videoID, segment.timeSubmitted, service)), 150); } catch (e) { // give up on shadowhide for now cache.shadowHiddenSegmentIPs[videoID][segment.timeSubmitted] = null; diff --git a/src/utils/promiseTimeout.ts b/src/utils/promiseTimeout.ts index 2053367..8a6ecf8 100644 --- a/src/utils/promiseTimeout.ts +++ b/src/utils/promiseTimeout.ts @@ -1,11 +1,50 @@ -export function promiseTimeout(promise: Promise, timeout: number): Promise { +export class PromiseTimeoutError extends Error { + promise?: Promise; + + constructor(promise?: Promise) { + super("Promise timed out"); + + this.promise = promise; + } +} + +export interface PromiseWithState extends Promise { + isResolved: boolean; + isRejected: boolean; +} + +export function promiseOrTimeout(promise: Promise, timeout?: number): Promise { + return Promise.race([timeoutPomise(timeout), promise]); +} + +export function timeoutPomise(timeout?: number): Promise { return new Promise((resolve, reject) => { if (timeout) { setTimeout(() => { - reject(new Error("Promise timed out")); + reject(new PromiseTimeoutError()); }, timeout); } - - promise.then(resolve, reject); }); +} + +export function savePromiseState(promise: Promise): PromiseWithState { + const p = promise as PromiseWithState; + p.isResolved = false; + p.isRejected = false; + + p.then(() => { + p.isResolved = true; + }).catch(() => { + p.isRejected = true; + }); + + return p; +} + +/** + * Allows rejection or resolve + * Allows past resolves too, but not past rejections + */ +export function nextFulfilment(promises: PromiseWithState[]): Promise { + return Promise.race(promises.filter((p) => !p.isRejected)); } \ No newline at end of file