refactor: implement fromEmitterEvent (#11658)

This commit is contained in:
jrandolf 2024-01-09 14:45:45 +01:00 committed by GitHub
parent 8f886a0998
commit d0dd209850
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 61 additions and 70 deletions

View File

@ -9,18 +9,15 @@ import type {ChildProcess} from 'child_process';
import type {Protocol} from 'devtools-protocol'; import type {Protocol} from 'devtools-protocol';
import { import {
filterAsync,
firstValueFrom, firstValueFrom,
from, from,
merge, merge,
raceWith, raceWith,
filterAsync,
fromEvent,
type Observable,
} from '../../third_party/rxjs/rxjs.js'; } from '../../third_party/rxjs/rxjs.js';
import type {ProtocolType} from '../common/ConnectOptions.js'; import type {ProtocolType} from '../common/ConnectOptions.js';
import {EventEmitter, type EventType} from '../common/EventEmitter.js'; import {EventEmitter, type EventType} from '../common/EventEmitter.js';
import {debugError} from '../common/util.js'; import {debugError, fromEmitterEvent, timeout} from '../common/util.js';
import {timeout} from '../common/util.js';
import {asyncDisposeSymbol, disposeSymbol} from '../util/disposable.js'; import {asyncDisposeSymbol, disposeSymbol} from '../util/disposable.js';
import type {BrowserContext} from './BrowserContext.js'; import type {BrowserContext} from './BrowserContext.js';
@ -343,8 +340,8 @@ export abstract class Browser extends EventEmitter<BrowserEvents> {
const {timeout: ms = 30000} = options; const {timeout: ms = 30000} = options;
return await firstValueFrom( return await firstValueFrom(
merge( merge(
fromEvent(this, BrowserEvent.TargetCreated) as Observable<Target>, fromEmitterEvent(this, BrowserEvent.TargetCreated),
fromEvent(this, BrowserEvent.TargetChanged) as Observable<Target>, fromEmitterEvent(this, BrowserEvent.TargetChanged),
from(this.targets()) from(this.targets())
).pipe(filterAsync(predicate), raceWith(timeout(ms))) ).pipe(filterAsync(predicate), raceWith(timeout(ms)))
); );

View File

@ -15,7 +15,6 @@ import {
first, first,
firstValueFrom, firstValueFrom,
from, from,
fromEvent,
map, map,
merge, merge,
of, of,
@ -58,6 +57,7 @@ import type {
} from '../common/types.js'; } from '../common/types.js';
import { import {
debugError, debugError,
fromEmitterEvent,
importFSPromises, importFSPromises,
isString, isString,
timeout, timeout,
@ -1677,25 +1677,16 @@ export abstract class Page extends EventEmitter<PageEvents> {
requestsInFlight = 0 requestsInFlight = 0
): Observable<void> { ): Observable<void> {
return merge( return merge(
fromEvent( fromEmitterEvent(networkManager, NetworkManagerEvent.Request),
networkManager, fromEmitterEvent(networkManager, NetworkManagerEvent.Response),
NetworkManagerEvent.Request as unknown as string fromEmitterEvent(networkManager, NetworkManagerEvent.RequestFailed)
) as Observable<void>,
fromEvent(
networkManager,
NetworkManagerEvent.Response as unknown as string
) as Observable<void>,
fromEvent(
networkManager,
NetworkManagerEvent.RequestFailed as unknown as string
) as Observable<void>
).pipe( ).pipe(
startWith(undefined), startWith(undefined),
filter(() => { filter(() => {
return networkManager.inFlightRequestsCount() <= requestsInFlight; return networkManager.inFlightRequestsCount() <= requestsInFlight;
}), }),
switchMap(v => { switchMap(() => {
return of(v).pipe(delay(idleTime)); return of(undefined).pipe(delay(idleTime));
}) })
); );
} }
@ -1725,15 +1716,15 @@ export abstract class Page extends EventEmitter<PageEvents> {
return await firstValueFrom( return await firstValueFrom(
merge( merge(
fromEvent(this, PageEvent.FrameAttached) as Observable<Frame>, fromEmitterEvent(this, PageEvent.FrameAttached),
fromEvent(this, PageEvent.FrameNavigated) as Observable<Frame>, fromEmitterEvent(this, PageEvent.FrameNavigated),
from(this.frames()) from(this.frames())
).pipe( ).pipe(
filterAsync(urlOrPredicate), filterAsync(urlOrPredicate),
first(), first(),
raceWith( raceWith(
timeout(ms), timeout(ms),
fromEvent(this, PageEvent.Close).pipe( fromEmitterEvent(this, PageEvent.Close).pipe(
map(() => { map(() => {
throw new TargetCloseError('Page closed.'); throw new TargetCloseError('Page closed.');
}) })

View File

@ -8,27 +8,27 @@ import type {
OperatorFunction, OperatorFunction,
} from '../../../third_party/rxjs/rxjs.js'; } from '../../../third_party/rxjs/rxjs.js';
import { import {
mergeMap,
from,
EMPTY, EMPTY,
catchError,
defaultIfEmpty,
defer, defer,
filter, filter,
first, first,
firstValueFrom,
from,
fromEvent,
identity, identity,
ignoreElements, ignoreElements,
retry,
throwIfEmpty,
race,
catchError,
defaultIfEmpty,
firstValueFrom,
fromEvent,
map, map,
merge, merge,
mergeMap,
noop, noop,
pipe, pipe,
race,
raceWith, raceWith,
retry,
tap, tap,
throwIfEmpty,
} from '../../../third_party/rxjs/rxjs.js'; } from '../../../third_party/rxjs/rxjs.js';
import type {EventType} from '../../common/EventEmitter.js'; import type {EventType} from '../../common/EventEmitter.js';
import {EventEmitter} from '../../common/EventEmitter.js'; import {EventEmitter} from '../../common/EventEmitter.js';

View File

@ -7,29 +7,27 @@
import * as Bidi from 'chromium-bidi/lib/cjs/protocol/protocol.js'; import * as Bidi from 'chromium-bidi/lib/cjs/protocol/protocol.js';
import { import {
type Observable,
from,
fromEvent,
merge,
map,
forkJoin,
first, first,
firstValueFrom, firstValueFrom,
forkJoin,
from,
map,
merge,
raceWith, raceWith,
} from '../../third_party/rxjs/rxjs.js'; } from '../../third_party/rxjs/rxjs.js';
import type {CDPSession} from '../api/CDPSession.js'; import type {CDPSession} from '../api/CDPSession.js';
import type {ElementHandle} from '../api/ElementHandle.js'; import type {ElementHandle} from '../api/ElementHandle.js';
import { import {
Frame, Frame,
throwIfDetached,
type GoToOptions, type GoToOptions,
type WaitForOptions, type WaitForOptions,
throwIfDetached,
} from '../api/Frame.js'; } from '../api/Frame.js';
import type {WaitForSelectorOptions} from '../api/Page.js'; import type {WaitForSelectorOptions} from '../api/Page.js';
import {UnsupportedOperation} from '../common/Errors.js'; import {UnsupportedOperation} from '../common/Errors.js';
import type {TimeoutSettings} from '../common/TimeoutSettings.js'; import type {TimeoutSettings} from '../common/TimeoutSettings.js';
import type {Awaitable, NodeFor} from '../common/types.js'; import type {Awaitable, NodeFor} from '../common/types.js';
import {UTILITY_WORLD_NAME, timeout} from '../common/util.js'; import {fromEmitterEvent, timeout, UTILITY_WORLD_NAME} from '../common/util.js';
import {Deferred} from '../util/Deferred.js'; import {Deferred} from '../util/Deferred.js';
import {disposeSymbol} from '../util/disposable.js'; import {disposeSymbol} from '../util/disposable.js';
@ -163,7 +161,7 @@ export class BidiFrame extends Frame {
this.#page this.#page
._waitWithNetworkIdle( ._waitWithNetworkIdle(
forkJoin([ forkJoin([
fromEvent(this.#context, waitEvent).pipe(first()), fromEmitterEvent(this.#context, waitEvent).pipe(first()),
from(this.setFrameContent(html)), from(this.setFrameContent(html)),
]).pipe( ]).pipe(
map(() => { map(() => {
@ -194,18 +192,16 @@ export class BidiFrame extends Frame {
const navigatedObservable = merge( const navigatedObservable = merge(
forkJoin([ forkJoin([
fromEvent( fromEmitterEvent(
this.#context, this.#context,
Bidi.ChromiumBidi.BrowsingContext.EventNames.NavigationStarted Bidi.ChromiumBidi.BrowsingContext.EventNames.NavigationStarted
).pipe(first()), ).pipe(first()),
fromEvent(this.#context, waitUntilEvent).pipe( fromEmitterEvent(this.#context, waitUntilEvent).pipe(first()),
first()
) as Observable<Bidi.BrowsingContext.NavigationInfo>,
]), ]),
fromEvent( fromEmitterEvent(
this.#context, this.#context,
Bidi.ChromiumBidi.BrowsingContext.EventNames.FragmentNavigated Bidi.ChromiumBidi.BrowsingContext.EventNames.FragmentNavigated
) as Observable<Bidi.BrowsingContext.NavigationInfo> )
).pipe( ).pipe(
map(result => { map(result => {
if (Array.isArray(result)) { if (Array.isArray(result)) {

View File

@ -9,16 +9,15 @@ import type {Readable} from 'stream';
import type {Protocol} from 'devtools-protocol'; import type {Protocol} from 'devtools-protocol';
import type {Observable} from '../../third_party/rxjs/rxjs.js';
import { import {
filterAsync,
firstValueFrom,
from,
map, map,
NEVER, NEVER,
timer, Observable,
firstValueFrom,
fromEvent,
filterAsync,
from,
raceWith, raceWith,
timer,
} from '../../third_party/rxjs/rxjs.js'; } from '../../third_party/rxjs/rxjs.js';
import type {CDPSession} from '../api/CDPSession.js'; import type {CDPSession} from '../api/CDPSession.js';
import {isNode} from '../environment.js'; import {isNode} from '../environment.js';
@ -594,9 +593,7 @@ export async function waitForHTTP<T extends {url(): string}>(
cancelation: Deferred<never> cancelation: Deferred<never>
): Promise<T> { ): Promise<T> {
return await firstValueFrom( return await firstValueFrom(
( (fromEmitterEvent(networkManager, eventName) as Observable<T>).pipe(
fromEvent(networkManager, eventName as unknown as string) as Observable<T>
).pipe(
filterAsync(async http => { filterAsync(async http => {
if (isString(urlOrPredicate)) { if (isString(urlOrPredicate)) {
return urlOrPredicate === http.url(); return urlOrPredicate === http.url();
@ -711,3 +708,21 @@ function convertPrintParameterToInches(
} }
return pixels / unitToPixels[lengthUnit]; return pixels / unitToPixels[lengthUnit];
} }
/**
* @internal
*/
export function fromEmitterEvent<
Events extends Record<EventType, unknown>,
Event extends keyof Events,
>(emitter: EventEmitter<Events>, eventName: Event): Observable<Events[Event]> {
return new Observable(subscriber => {
const listener = (event: Events[Event]) => {
subscriber.next(event);
};
emitter.on(eventName, listener);
return () => {
emitter.off(eventName, listener);
};
});
}

View File

@ -9,12 +9,8 @@ import {spawn, spawnSync} from 'child_process';
import {PassThrough} from 'stream'; import {PassThrough} from 'stream';
import debug from 'debug'; import debug from 'debug';
import type Protocol from 'devtools-protocol';
import type { import type {OperatorFunction} from '../../third_party/rxjs/rxjs.js';
Observable,
OperatorFunction,
} from '../../third_party/rxjs/rxjs.js';
import { import {
bufferCount, bufferCount,
concatMap, concatMap,
@ -29,7 +25,7 @@ import {
import {CDPSessionEvent} from '../api/CDPSession.js'; import {CDPSessionEvent} from '../api/CDPSession.js';
import type {BoundingBox} from '../api/ElementHandle.js'; import type {BoundingBox} from '../api/ElementHandle.js';
import type {Page} from '../api/Page.js'; import type {Page} from '../api/Page.js';
import {debugError} from '../common/util.js'; import {debugError, fromEmitterEvent} from '../common/util.js';
import {guarded} from '../util/decorators.js'; import {guarded} from '../util/decorators.js';
import {asyncDisposeSymbol} from '../util/disposable.js'; import {asyncDisposeSymbol} from '../util/disposable.js';
@ -137,12 +133,7 @@ export class ScreenRecorder extends PassThrough {
}); });
this.#lastFrame = lastValueFrom( this.#lastFrame = lastValueFrom(
( fromEmitterEvent(client, 'Page.screencastFrame').pipe(
fromEvent(
client,
'Page.screencastFrame'
) as Observable<Protocol.Page.ScreencastFrameEvent>
).pipe(
tap(event => { tap(event => {
void client.send('Page.screencastFrameAck', { void client.send('Page.screencastFrameAck', {
sessionId: event.sessionId, sessionId: event.sessionId,

View File

@ -25,6 +25,7 @@ export {
mergeMap, mergeMap,
NEVER, NEVER,
noop, noop,
Observable,
of, of,
pipe, pipe,
race, race,