diff --git a/packages/puppeteer-core/src/api/Page.ts b/packages/puppeteer-core/src/api/Page.ts index deb04628fdc..63b18c8ee1c 100644 --- a/packages/puppeteer-core/src/api/Page.ts +++ b/packages/puppeteer-core/src/api/Page.ts @@ -19,11 +19,13 @@ import { map, merge, mergeMap, + mergeScan, of, - race, raceWith, + ReplaySubject, startWith, switchMap, + take, takeUntil, timer, type Observable, @@ -604,8 +606,7 @@ export abstract class Page extends EventEmitter { #requestHandlers = new WeakMap, Handler>(); - #requestsInFlight = 0; - #inflight$: Observable; + #inflight$ = new ReplaySubject(1); /** * @internal @@ -613,39 +614,37 @@ export abstract class Page extends EventEmitter { constructor() { super(); - this.#inflight$ = fromEmitterEvent(this, PageEvent.Request).pipe( - takeUntil(fromEmitterEvent(this, PageEvent.Close)), - mergeMap(request => { - return concat( - of(1), - race( - fromEmitterEvent(this, PageEvent.Response).pipe( - filter(response => { - return response.request()._requestId === request._requestId; - }) - ), - fromEmitterEvent(this, PageEvent.RequestFailed).pipe( - filter(failure => { - return failure._requestId === request._requestId; - }) - ), - fromEmitterEvent(this, PageEvent.RequestFinished).pipe( - filter(success => { - return success._requestId === request._requestId; + fromEmitterEvent(this, PageEvent.Request) + .pipe( + mergeMap(originalRequest => { + return concat( + of(1), + merge( + fromEmitterEvent(this, PageEvent.RequestFailed), + fromEmitterEvent(this, PageEvent.RequestFinished), + fromEmitterEvent(this, PageEvent.Response).pipe( + map(response => { + return response.request(); + }) + ) + ).pipe( + filter(request => { + return request._requestId === originalRequest._requestId; + }), + take(1), + map(() => { + return -1; }) ) - ).pipe( - map(() => { - return -1; - }) - ) - ); - }) - ); - - this.#inflight$.subscribe(count => { - this.#requestsInFlight += count; - }); + ); + }), + mergeScan((acc, addend) => { + return of(acc + addend); + }, 0), + takeUntil(fromEmitterEvent(this, PageEvent.Close)), + startWith(0) + ) + .subscribe(this.#inflight$); } /** @@ -1776,13 +1775,11 @@ export abstract class Page extends EventEmitter { } = options; return this.#inflight$.pipe( - startWith(this.#requestsInFlight), - switchMap(() => { - if (this.#requestsInFlight > concurrency) { + switchMap(inflight => { + if (inflight > concurrency) { return EMPTY; - } else { - return timer(idleTime); } + return timer(idleTime); }), map(() => {}), raceWith( diff --git a/packages/puppeteer-core/third_party/rxjs/rxjs.ts b/packages/puppeteer-core/third_party/rxjs/rxjs.ts index b8b64788ae5..2460f047bd2 100644 --- a/packages/puppeteer-core/third_party/rxjs/rxjs.ts +++ b/packages/puppeteer-core/third_party/rxjs/rxjs.ts @@ -22,8 +22,10 @@ export { ignoreElements, lastValueFrom, map, + ReplaySubject, merge, mergeMap, + mergeScan, NEVER, noop, Observable, @@ -34,6 +36,7 @@ export { retry, startWith, switchMap, + take, takeUntil, tap, throwIfEmpty,