fix: use shareReplay for inflight requests (#11810)

This commit is contained in:
jrandolf 2024-02-02 08:09:33 +01:00 committed by GitHub
parent fa353d3a59
commit 0f0813db38
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 39 additions and 39 deletions

View File

@ -19,11 +19,13 @@ import {
map,
merge,
mergeMap,
mergeScan,
of,
race,
raceWith,
ReplaySubject,
startWith,
switchMap,
take,
takeUntil,
timer,
type Observable,
@ -604,8 +606,7 @@ export abstract class Page extends EventEmitter<PageEvents> {
#requestHandlers = new WeakMap<Handler<HTTPRequest>, Handler<HTTPRequest>>();
#requestsInFlight = 0;
#inflight$: Observable<number>;
#inflight$ = new ReplaySubject<number>(1);
/**
* @internal
@ -613,39 +614,37 @@ export abstract class Page extends EventEmitter<PageEvents> {
constructor() {
super();
this.#inflight$ = fromEmitterEvent(this, PageEvent.Request).pipe(
takeUntil(fromEmitterEvent(this, PageEvent.Close)),
mergeMap(request => {
return concat(
of(1),
race(
fromEmitterEvent(this, PageEvent.Response).pipe(
filter(response => {
return response.request()._requestId === request._requestId;
})
),
fromEmitterEvent(this, PageEvent.RequestFailed).pipe(
filter(failure => {
return failure._requestId === request._requestId;
})
),
fromEmitterEvent(this, PageEvent.RequestFinished).pipe(
filter(success => {
return success._requestId === request._requestId;
fromEmitterEvent(this, PageEvent.Request)
.pipe(
mergeMap(originalRequest => {
return concat(
of(1),
merge(
fromEmitterEvent(this, PageEvent.RequestFailed),
fromEmitterEvent(this, PageEvent.RequestFinished),
fromEmitterEvent(this, PageEvent.Response).pipe(
map(response => {
return response.request();
})
)
).pipe(
filter(request => {
return request._requestId === originalRequest._requestId;
}),
take(1),
map(() => {
return -1;
})
)
).pipe(
map(() => {
return -1;
})
)
);
})
);
this.#inflight$.subscribe(count => {
this.#requestsInFlight += count;
});
);
}),
mergeScan((acc, addend) => {
return of(acc + addend);
}, 0),
takeUntil(fromEmitterEvent(this, PageEvent.Close)),
startWith(0)
)
.subscribe(this.#inflight$);
}
/**
@ -1776,13 +1775,11 @@ export abstract class Page extends EventEmitter<PageEvents> {
} = options;
return this.#inflight$.pipe(
startWith(this.#requestsInFlight),
switchMap(() => {
if (this.#requestsInFlight > concurrency) {
switchMap(inflight => {
if (inflight > concurrency) {
return EMPTY;
} else {
return timer(idleTime);
}
return timer(idleTime);
}),
map(() => {}),
raceWith(

View File

@ -22,8 +22,10 @@ export {
ignoreElements,
lastValueFrom,
map,
ReplaySubject,
merge,
mergeMap,
mergeScan,
NEVER,
noop,
Observable,
@ -34,6 +36,7 @@ export {
retry,
startWith,
switchMap,
take,
takeUntil,
tap,
throwIfEmpty,