refactor: use RxJs for WaitForNetworkIdle (#10888)

This commit is contained in:
Nikolay Vitkov 2023-09-12 14:00:04 +02:00 committed by GitHub
parent 27f07ea447
commit 2e650f36e5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 69 additions and 90 deletions

View File

@ -28,7 +28,11 @@ import {
merge, merge,
Observable, Observable,
raceWith, raceWith,
timer, delay,
filter,
of,
switchMap,
startWith,
} from '../../third_party/rxjs/rxjs.js'; } from '../../third_party/rxjs/rxjs.js';
import type {HTTPRequest} from '../api/HTTPRequest.js'; import type {HTTPRequest} from '../api/HTTPRequest.js';
import type {HTTPResponse} from '../api/HTTPResponse.js'; import type {HTTPResponse} from '../api/HTTPResponse.js';
@ -38,7 +42,7 @@ import type {ConsoleMessage} from '../common/ConsoleMessage.js';
import type {Coverage} from '../common/Coverage.js'; import type {Coverage} from '../common/Coverage.js';
import {Device} from '../common/Device.js'; import {Device} from '../common/Device.js';
import {DeviceRequestPrompt} from '../common/DeviceRequestPrompt.js'; import {DeviceRequestPrompt} from '../common/DeviceRequestPrompt.js';
import {TargetCloseError, TimeoutError} from '../common/Errors.js'; import {TargetCloseError} from '../common/Errors.js';
import {EventEmitter, Handler} from '../common/EventEmitter.js'; import {EventEmitter, Handler} from '../common/EventEmitter.js';
import type {FileChooser} from '../common/FileChooser.js'; import type {FileChooser} from '../common/FileChooser.js';
import type {WaitForSelectorOptions} from '../common/IsolatedWorld.js'; import type {WaitForSelectorOptions} from '../common/IsolatedWorld.js';
@ -68,7 +72,7 @@ import {
importFSPromises, importFSPromises,
isNumber, isNumber,
isString, isString,
waitForEvent, timeout,
withSourcePuppeteerURLIfNone, withSourcePuppeteerURLIfNone,
} from '../common/util.js'; } from '../common/util.js';
import type {WebWorker} from '../common/WebWorker.js'; import type {WebWorker} from '../common/WebWorker.js';
@ -1696,62 +1700,33 @@ export abstract class Page
inFlightRequestsCount: () => number; inFlightRequestsCount: () => number;
}, },
idleTime: number, idleTime: number,
timeout: number, ms: number,
closedDeferred: Deferred<TargetCloseError> closedDeferred: Deferred<TargetCloseError>
): Promise<void> { ): Promise<void> {
const idleDeferred = Deferred.create<void>(); await firstValueFrom(
const abortDeferred = Deferred.create<Error>(); merge(
fromEvent(
let idleTimer: NodeJS.Timeout | undefined;
const cleanup = () => {
clearTimeout(idleTimer);
abortDeferred.reject(new Error('abort'));
};
const evaluate = () => {
clearTimeout(idleTimer);
if (networkManager.inFlightRequestsCount() === 0) {
idleTimer = setTimeout(() => {
return idleDeferred.resolve();
}, idleTime);
}
};
const listenToEvent = (event: symbol) => {
return waitForEvent(
networkManager, networkManager,
event, NetworkManagerEmittedEvents.Request as unknown as string
() => { ),
evaluate(); fromEvent(
return false; networkManager,
}, NetworkManagerEmittedEvents.Response as unknown as string
timeout, ),
abortDeferred fromEvent(
); networkManager,
}; NetworkManagerEmittedEvents.RequestFailed as unknown as string
)
const eventPromises = [ ).pipe(
listenToEvent(NetworkManagerEmittedEvents.Request), startWith(null),
listenToEvent(NetworkManagerEmittedEvents.Response), filter(() => {
listenToEvent(NetworkManagerEmittedEvents.RequestFailed), return networkManager.inFlightRequestsCount() === 0;
]; }),
switchMap(v => {
evaluate(); return of(v).pipe(delay(idleTime));
}),
// We don't want to reject the closed deferred when raceWith(timeout(ms), from(closedDeferred.valueOrThrow()))
// the race if finished so we pass the Promise instead )
const closedPromise = closedDeferred.valueOrThrow();
await Deferred.race([idleDeferred, ...eventPromises, closedPromise]).then(
r => {
cleanup();
return r;
},
error => {
cleanup();
throw error;
}
); );
} }
@ -1787,11 +1762,7 @@ export abstract class Page
filterAsync(urlOrPredicate), filterAsync(urlOrPredicate),
first(), first(),
raceWith( raceWith(
timer(ms === 0 ? Infinity : ms).pipe( timeout(ms),
map(() => {
throw new TimeoutError(`Timed out after waiting ${ms}ms`);
})
),
fromEvent(this, PageEmittedEvents.Close).pipe( fromEvent(this, PageEmittedEvents.Close).pipe(
map(() => { map(() => {
throw new TargetCloseError('Page closed.'); throw new TargetCloseError('Page closed.');

View File

@ -36,12 +36,10 @@ import {
raceWith, raceWith,
retry, retry,
tap, tap,
timer,
} from '../../../third_party/rxjs/rxjs.js'; } from '../../../third_party/rxjs/rxjs.js';
import {TimeoutError} from '../../common/Errors.js';
import {EventEmitter} from '../../common/EventEmitter.js'; import {EventEmitter} from '../../common/EventEmitter.js';
import {HandleFor} from '../../common/types.js'; import {HandleFor} from '../../common/types.js';
import {debugError} from '../../common/util.js'; import {debugError, timeout} from '../../common/util.js';
import {BoundingBox, ClickOptions, ElementHandle} from '../ElementHandle.js'; import {BoundingBox, ClickOptions, ElementHandle} from '../ElementHandle.js';
import { import {
@ -213,17 +211,7 @@ export abstract class Locator<T> extends EventEmitter {
) )
); );
} }
if (this._timeout > 0) { candidates.push(timeout(this._timeout));
candidates.push(
timer(this._timeout).pipe(
map(() => {
throw new TimeoutError(
`Timed out after waiting ${this._timeout}ms`
);
})
)
);
}
return pipe( return pipe(
retry({delay: RETRY_DELAY}), retry({delay: RETRY_DELAY}),
raceWith<T, never[]>(...candidates) raceWith<T, never[]>(...candidates)

View File

@ -18,6 +18,7 @@ import type {Readable} from 'stream';
import type {Protocol} from 'devtools-protocol'; import type {Protocol} from 'devtools-protocol';
import {map, NEVER, Observable, timer} from '../../third_party/rxjs/rxjs.js';
import type {ElementHandle} from '../api/ElementHandle.js'; import type {ElementHandle} from '../api/ElementHandle.js';
import type {JSHandle} from '../api/JSHandle.js'; import type {JSHandle} from '../api/JSHandle.js';
import {Page} from '../api/Page.js'; import {Page} from '../api/Page.js';
@ -29,6 +30,7 @@ import {isErrorLike} from '../util/ErrorLike.js';
import type {CDPSession} from './Connection.js'; import type {CDPSession} from './Connection.js';
import {debug} from './Debug.js'; import {debug} from './Debug.js';
import {CDPElementHandle} from './ElementHandle.js'; import {CDPElementHandle} from './ElementHandle.js';
import {TimeoutError} from './Errors.js';
import type {CommonEventEmitter} from './EventEmitter.js'; import type {CommonEventEmitter} from './EventEmitter.js';
import {IsolatedWorld} from './IsolatedWorld.js'; import {IsolatedWorld} from './IsolatedWorld.js';
import {CDPJSHandle} from './JSHandle.js'; import {CDPJSHandle} from './JSHandle.js';
@ -708,3 +710,16 @@ export class Mutex {
resolve(); resolve();
} }
} }
/**
* @internal
*/
export function timeout(ms: number): Observable<never> {
return ms === 0
? NEVER
: timer(ms).pipe(
map(() => {
throw new TimeoutError(`Timed out after waiting ${ms}ms`);
})
);
}

View File

@ -16,28 +16,33 @@
export { export {
catchError, catchError,
defaultIfEmpty, defaultIfEmpty,
defer,
delay,
EMPTY,
filter, filter,
first, first,
ignoreElements,
map,
mergeMap,
raceWith,
retry,
tap,
throwIfEmpty,
firstValueFrom, firstValueFrom,
defer,
EMPTY,
from, from,
fromEvent, fromEvent,
merge,
race,
timer,
OperatorFunction,
identity, identity,
ignoreElements,
map,
merge,
mergeMap,
NEVER,
noop, noop,
pipe,
Observable, Observable,
of,
OperatorFunction,
pipe,
race,
raceWith,
retry,
startWith,
switchMap,
tap,
throwIfEmpty,
timer,
} from 'rxjs'; } from 'rxjs';
import {mergeMap, from, filter, map, type Observable} from 'rxjs'; import {mergeMap, from, filter, map, type Observable} from 'rxjs';