Delen via


PagedFlux<T> Class

Type Parameters

T

The type of items in a PagedResponse

public class PagedFlux
extends PagedFluxBase<T,PagedResponse<T>>

PagedFlux is a Flux that provides the ability to operate on paginated REST responses of type PagedResponse<T> and individual items in such pages. When processing the response by page each response will contain the items in the page as well as the REST response details such as status code and headers.

To process one item at a time, simply subscribe to this flux as shown below

Code sample

// Subscribe to process one item at a time
 pagedFlux
     .log()
     .subscribe(item -> System.out.println("Processing item with value: " + item),
         error -> System.err.println("An error occurred: " + error),
         () -> System.out.println("Processing complete."));

To process one page at a time, use #byPage() method as shown below

Code sample

// Subscribe to process one page at a time from the beginning
 pagedFlux
     .byPage()
     .log()
     .subscribe(page -> System.out.printf("Processing page containing item values: %s%n",
         page.getElements().stream().map(String::valueOf).collect(Collectors.joining(", "))),
         error -> System.err.println("An error occurred: " + error),
         () -> System.out.println("Processing complete."));

To process items one page at a time starting from any page associated with a continuation token, use #byPage(String) as shown below

Code sample

// Subscribe to process one page at a time starting from a page associated with
 // a continuation token
 String continuationToken = getContinuationToken();
 pagedFlux
     .byPage(continuationToken)
     .log()
     .doOnSubscribe(ignored -> System.out.println(
         "Subscribed to paged flux processing pages starting from: " + continuationToken))
     .subscribe(page -> System.out.printf("Processing page containing item values: %s%n",
         page.getElements().stream().map(String::valueOf).collect(Collectors.joining(", "))),
         error -> System.err.println("An error occurred: " + error),
         () -> System.out.println("Processing complete."));

Constructor Summary

Constructor Description
PagedFlux(Function<Integer,Mono<PagedResponse<T>>> firstPageRetriever)

Creates an instance of PagedFlux<T> that consists of only a single page with a given element count.

PagedFlux(Function<Integer,Mono<PagedResponse<T>>> firstPageRetriever, BiFunction<String,Integer,Mono<PagedResponse<T>>> nextPageRetriever)

Creates an instance of PagedFlux<T> that is capable of retrieving multiple pages with of a given page size.

PagedFlux(Supplier<Mono<PagedResponse<T>>> firstPageRetriever)

Creates an instance of PagedFlux<T> that consists of only a single page.

PagedFlux(Supplier<Mono<PagedResponse<T>>> firstPageRetriever, Function<String,Mono<PagedResponse<T>>> nextPageRetriever)

Creates an instance of PagedFlux<T>.

Method Summary

Modifier and Type Method and Description
static PagedFlux<T> create(Supplier<PageRetriever<String,PagedResponse<T>>> provider)

Creates an instance of PagedFlux<T> backed by a Page Retriever Supplier (provider).

PagedFlux<S> mapPage(Function<T,S> mapper)

Deprecated

refer the decoration samples for PagedFlux#create(Supplier).

Maps this PagedFlux instance of T to a PagedFlux instance of type S as per the provided mapper function.

Methods inherited from PagedFluxBase

byPage byPage com.azure.core.http.rest.PagedFluxBase.subscribe(reactor.core.CoreSubscriber<

Methods inherited from ContinuablePagedFlux

Methods inherited from ContinuablePagedFluxCore

Methods inherited from java.lang.Object

Methods inherited from reactor.core.publisher.Flux

reactor.core.publisher.Flux.<A>reduce(A,java.util.function.BiFunction<A, reactor.core.publisher.Flux.<A>reduceWith(java.util.function.Supplier<A>,java.util.function.BiFunction<A, reactor.core.publisher.Flux.<A>scan(A,java.util.function.BiFunction<A, reactor.core.publisher.Flux.<A>scanWith(java.util.function.Supplier<A>,java.util.function.BiFunction<A, reactor.core.publisher.Flux.<C>buffer reactor.core.publisher.Flux.<C>buffer reactor.core.publisher.Flux.<C>buffer(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<C>bufferTimeout reactor.core.publisher.Flux.<C>bufferTimeout reactor.core.publisher.Flux.<E>cast reactor.core.publisher.Flux.<E>collect(java.util.function.Supplier<E>,java.util.function.BiConsumer<E, reactor.core.publisher.Flux.<E>doOnError(java.lang.Class<E>,java.util.function.Consumer< reactor.core.publisher.Flux.<E>onErrorContinue reactor.core.publisher.Flux.<E>onErrorContinue reactor.core.publisher.Flux.<E>onErrorMap(java.lang.Class<E>,java.util.function.Function< reactor.core.publisher.Flux.<E>onErrorResume(java.lang.Class<E>,java.util.function.Function< reactor.core.publisher.Flux.<E>onErrorReturn reactor.core.publisher.Flux.<E>subscribeWith reactor.core.publisher.Flux.<I,O>zip(java.util.function.Function< reactor.core.publisher.Flux.<I,O>zip(java.util.function.Function< reactor.core.publisher.Flux.<I>first(java.lang.Iterable< reactor.core.publisher.Flux.<I>first(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<I>firstWithSignal(java.lang.Iterable< reactor.core.publisher.Flux.<I>firstWithSignal(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<I>firstWithValue(java.lang.Iterable< reactor.core.publisher.Flux.<I>firstWithValue(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<I>index(java.util.function.BiFunction< reactor.core.publisher.Flux.<I>merge(int,org.reactivestreams.Publisher< reactor.core.publisher.Flux.<I>merge(java.lang.Iterable< reactor.core.publisher.Flux.<I>merge(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<I>mergeComparing(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<I>mergeDelayError(int,org.reactivestreams.Publisher< reactor.core.publisher.Flux.<I>mergeOrdered(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<I>mergePriority(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<I>mergeSequential(int,org.reactivestreams.Publisher< reactor.core.publisher.Flux.<I>mergeSequential(java.lang.Iterable< reactor.core.publisher.Flux.<I>mergeSequential(java.lang.Iterable< reactor.core.publisher.Flux.<I>mergeSequential(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<I>mergeSequentialDelayError(int,org.reactivestreams.Publisher< reactor.core.publisher.Flux.<I>mergeSequentialDelayError(java.lang.Iterable< reactor.core.publisher.Flux.<K,V>collectMap(java.util.function.Function< reactor.core.publisher.Flux.<K,V>collectMap(java.util.function.Function< reactor.core.publisher.Flux.<K,V>collectMultimap(java.util.function.Function< reactor.core.publisher.Flux.<K,V>collectMultimap(java.util.function.Function< reactor.core.publisher.Flux.<K,V>groupBy(java.util.function.Function< reactor.core.publisher.Flux.<K,V>groupBy(java.util.function.Function< reactor.core.publisher.Flux.<K>collectMap(java.util.function.Function< reactor.core.publisher.Flux.<K>collectMultimap(java.util.function.Function< reactor.core.publisher.Flux.<K>groupBy(java.util.function.Function< reactor.core.publisher.Flux.<K>groupBy(java.util.function.Function< reactor.core.publisher.Flux.<O>error reactor.core.publisher.Flux.<O>zip(java.lang.Iterable< reactor.core.publisher.Flux.<O>zip(java.lang.Iterable< reactor.core.publisher.Flux.<P>as(java.util.function.Function< reactor.core.publisher.Flux.<R,A>collect(java.util.stream.Collector< reactor.core.publisher.Flux.<R>concatMapIterable(java.util.function.Function< reactor.core.publisher.Flux.<R>concatMapIterable(java.util.function.Function< reactor.core.publisher.Flux.<R>doOnDiscard(java.lang.Class<R>,java.util.function.Consumer< reactor.core.publisher.Flux.<R>flatMap(java.util.function.Function< reactor.core.publisher.Flux.<R>flatMap(java.util.function.Function< reactor.core.publisher.Flux.<R>flatMapIterable(java.util.function.Function< reactor.core.publisher.Flux.<R>flatMapIterable(java.util.function.Function< reactor.core.publisher.Flux.<R>flatMapSequential(java.util.function.Function< reactor.core.publisher.Flux.<R>flatMapSequential(java.util.function.Function< reactor.core.publisher.Flux.<R>flatMapSequential(java.util.function.Function< reactor.core.publisher.Flux.<R>flatMapSequentialDelayError(java.util.function.Function< reactor.core.publisher.Flux.<R>handle(java.util.function.BiConsumer< reactor.core.publisher.Flux.<R>publish(java.util.function.Function< reactor.core.publisher.Flux.<R>publish(java.util.function.Function< reactor.core.publisher.Flux.<T,D>using(java.util.concurrent.Callable< reactor.core.publisher.Flux.<T,D>using(java.util.concurrent.Callable< reactor.core.publisher.Flux.<T,D>usingWhen(org.reactivestreams.Publisher<D>,java.util.function.Function< reactor.core.publisher.Flux.<T,D>usingWhen(org.reactivestreams.Publisher<D>,java.util.function.Function< reactor.core.publisher.Flux.<T,S>generate reactor.core.publisher.Flux.<T,S>generate(java.util.concurrent.Callable<S>,java.util.function.BiFunction<S,reactor.core.publisher.SynchronousSink<T>,S>,java.util.function.Consumer< reactor.core.publisher.Flux.<T,V>combineLatest(java.lang.Iterable< reactor.core.publisher.Flux.<T,V>combineLatest(java.lang.Iterable< reactor.core.publisher.Flux.<T,V>combineLatest(java.util.function.Function<java.lang.Object[],V>,int,org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T,V>combineLatest(java.util.function.Function<java.lang.Object[],V>,org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T1,T2,O>zip(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T1,T2,T3,T4,T5,T6,T7,T8>zip(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T1,T2,T3,T4,T5,T6,T7>zip(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T1,T2,T3,T4,T5,T6,V>combineLatest(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T1,T2,T3,T4,T5,T6>zip(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T1,T2,T3,T4,T5,V>combineLatest(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T1,T2,T3,T4,T5>zip(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T1,T2,T3,T4,V>combineLatest(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T1,T2,T3,T4>zip(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T1,T2,T3,V>combineLatest(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T1,T2,T3>zip(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T1,T2,V>combineLatest(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T1,T2>zip(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T2,V>zipWith(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T2,V>zipWith(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T2,V>zipWithIterable(java.lang.Iterable< reactor.core.publisher.Flux.<T2>zipWith(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T2>zipWith(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T2>zipWithIterable(java.lang.Iterable< reactor.core.publisher.Flux.<T>concat(java.lang.Iterable< reactor.core.publisher.Flux.<T>concat(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T>concat(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T>concat(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T>concatDelayError(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T>concatDelayError(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T>concatDelayError(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T>concatDelayError(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T>create(java.util.function.Consumer< reactor.core.publisher.Flux.<T>create(java.util.function.Consumer< reactor.core.publisher.Flux.<T>defer(java.util.function.Supplier< reactor.core.publisher.Flux.<T>deferContextual(java.util.function.Function<reactor.util.context.ContextView, reactor.core.publisher.Flux.<T>deferWithContext(java.util.function.Function<reactor.util.context.Context, reactor.core.publisher.Flux.<T>empty reactor.core.publisher.Flux.<T>error reactor.core.publisher.Flux.<T>error(java.util.function.Supplier< reactor.core.publisher.Flux.<T>from(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T>fromArray reactor.core.publisher.Flux.<T>fromIterable(java.lang.Iterable< reactor.core.publisher.Flux.<T>fromStream(java.util.function.Supplier<java.util.stream.Stream< reactor.core.publisher.Flux.<T>fromStream(java.util.stream.Stream< reactor.core.publisher.Flux.<T>generate reactor.core.publisher.Flux.<T>just reactor.core.publisher.Flux.<T>just reactor.core.publisher.Flux.<T>merge(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T>merge(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T>merge(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T>mergeComparing(int,java.util.Comparator< reactor.core.publisher.Flux.<T>mergeComparing(java.util.Comparator< reactor.core.publisher.Flux.<T>mergeComparingDelayError(int,java.util.Comparator< reactor.core.publisher.Flux.<T>mergeOrdered(int,java.util.Comparator< reactor.core.publisher.Flux.<T>mergeOrdered(java.util.Comparator< reactor.core.publisher.Flux.<T>mergePriority(int,java.util.Comparator< reactor.core.publisher.Flux.<T>mergePriority(java.util.Comparator< reactor.core.publisher.Flux.<T>mergePriorityDelayError(int,java.util.Comparator< reactor.core.publisher.Flux.<T>mergeSequential(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T>mergeSequential(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T>mergeSequentialDelayError(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T>never reactor.core.publisher.Flux.<T>onAssembly reactor.core.publisher.Flux.<T>onAssembly reactor.core.publisher.Flux.<T>push(java.util.function.Consumer< reactor.core.publisher.Flux.<T>push(java.util.function.Consumer< reactor.core.publisher.Flux.<T>switchOnNext(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<T>switchOnNext(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<TRight,TLeftEnd,TRightEnd,R>groupJoin(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<TRight,TLeftEnd,TRightEnd,R>join(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<TUPLE,V>zip(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<U,R>withLatestFrom(org.reactivestreams.Publisher< reactor.core.publisher.Flux.<U,V,C>bufferWhen(org.reactivestreams.Publisher<U>,java.util.function.Function< reactor.core.publisher.Flux.<U,V>bufferWhen(org.reactivestreams.Publisher<U>,java.util.function.Function< reactor.core.publisher.Flux.<U,V>timeout(org.reactivestreams.Publisher<U>,java.util.function.Function< reactor.core.publisher.Flux.<U,V>timeout(org.reactivestreams.Publisher<U>,java.util.function.Function< reactor.core.publisher.Flux.<U,V>windowWhen(org.reactivestreams.Publisher<U>,java.util.function.Function< reactor.core.publisher.Flux.<U>delaySubscription reactor.core.publisher.Flux.<U>ofType reactor.core.publisher.Flux.<U>sample reactor.core.publisher.Flux.<U>sampleFirst(java.util.function.Function< reactor.core.publisher.Flux.<U>sampleTimeout(java.util.function.Function< reactor.core.publisher.Flux.<U>sampleTimeout(java.util.function.Function< reactor.core.publisher.Flux.<U>timeout reactor.core.publisher.Flux.<V,C>distinct(java.util.function.Function< reactor.core.publisher.Flux.<V,C>distinct(java.util.function.Function< reactor.core.publisher.Flux.<V>bufferUntilChanged reactor.core.publisher.Flux.<V>bufferUntilChanged(java.util.function.Function< reactor.core.publisher.Flux.<V>bufferUntilChanged(java.util.function.Function< reactor.core.publisher.Flux.<V>concatMap(java.util.function.Function< reactor.core.publisher.Flux.<V>concatMap(java.util.function.Function< reactor.core.publisher.Flux.<V>concatMapDelayError(java.util.function.Function< reactor.core.publisher.Flux.<V>concatMapDelayError(java.util.function.Function< reactor.core.publisher.Flux.<V>concatMapDelayError(java.util.function.Function< reactor.core.publisher.Flux.<V>distinct(java.util.function.Function< reactor.core.publisher.Flux.<V>distinctUntilChanged(java.util.function.Function< reactor.core.publisher.Flux.<V>distinctUntilChanged(java.util.function.Function< reactor.core.publisher.Flux.<V>flatMap(java.util.function.Function< reactor.core.publisher.Flux.<V>flatMap(java.util.function.Function< reactor.core.publisher.Flux.<V>flatMapDelayError(java.util.function.Function< reactor.core.publisher.Flux.<V>map(java.util.function.Function< reactor.core.publisher.Flux.<V>mapNotNull(java.util.function.Function< reactor.core.publisher.Flux.<V>switchMap(java.util.function.Function< reactor.core.publisher.Flux.<V>switchMap(java.util.function.Function< reactor.core.publisher.Flux.<V>switchOnFirst(java.util.function.BiFunction<reactor.core.publisher.Signal< reactor.core.publisher.Flux.<V>switchOnFirst(java.util.function.BiFunction<reactor.core.publisher.Signal< reactor.core.publisher.Flux.<V>then reactor.core.publisher.Flux.<V>thenMany reactor.core.publisher.Flux.<V>transform(java.util.function.Function< reactor.core.publisher.Flux.<V>transformDeferred(java.util.function.Function< reactor.core.publisher.Flux.<V>transformDeferredContextual(java.util.function.BiFunction< reactor.core.publisher.Flux.<V>windowUntilChanged reactor.core.publisher.Flux.<V>windowUntilChanged(java.util.function.Function< reactor.core.publisher.Flux.<V>windowUntilChanged(java.util.function.Function< reactor.core.publisher.Flux.<X>dematerialize reactor.core.publisher.Flux.all(java.util.function.Predicate< reactor.core.publisher.Flux.any(java.util.function.Predicate< reactor.core.publisher.Flux.blockFirst reactor.core.publisher.Flux.blockFirst reactor.core.publisher.Flux.blockLast reactor.core.publisher.Flux.blockLast reactor.core.publisher.Flux.buffer reactor.core.publisher.Flux.buffer reactor.core.publisher.Flux.buffer reactor.core.publisher.Flux.buffer reactor.core.publisher.Flux.buffer reactor.core.publisher.Flux.buffer reactor.core.publisher.Flux.buffer reactor.core.publisher.Flux.buffer(org.reactivestreams.Publisher< reactor.core.publisher.Flux.bufferTimeout reactor.core.publisher.Flux.bufferTimeout reactor.core.publisher.Flux.bufferUntil(java.util.function.Predicate< reactor.core.publisher.Flux.bufferUntil(java.util.function.Predicate< reactor.core.publisher.Flux.bufferWhile(java.util.function.Predicate< reactor.core.publisher.Flux.cache reactor.core.publisher.Flux.cache reactor.core.publisher.Flux.cache reactor.core.publisher.Flux.cache reactor.core.publisher.Flux.cache reactor.core.publisher.Flux.cache reactor.core.publisher.Flux.cancelOn reactor.core.publisher.Flux.checkpoint reactor.core.publisher.Flux.checkpoint reactor.core.publisher.Flux.checkpoint reactor.core.publisher.Flux.collectList reactor.core.publisher.Flux.collectSortedList reactor.core.publisher.Flux.collectSortedList(java.util.Comparator< reactor.core.publisher.Flux.concatWith(org.reactivestreams.Publisher< reactor.core.publisher.Flux.concatWithValues reactor.core.publisher.Flux.contextWrite reactor.core.publisher.Flux.contextWrite reactor.core.publisher.Flux.count reactor.core.publisher.Flux.defaultIfEmpty reactor.core.publisher.Flux.delayElements reactor.core.publisher.Flux.delayElements reactor.core.publisher.Flux.delaySequence reactor.core.publisher.Flux.delaySequence reactor.core.publisher.Flux.delaySubscription reactor.core.publisher.Flux.delaySubscription reactor.core.publisher.Flux.delayUntil(java.util.function.Function< reactor.core.publisher.Flux.distinct reactor.core.publisher.Flux.distinctUntilChanged reactor.core.publisher.Flux.doAfterTerminate reactor.core.publisher.Flux.doFinally reactor.core.publisher.Flux.doFirst reactor.core.publisher.Flux.doOnCancel reactor.core.publisher.Flux.doOnComplete reactor.core.publisher.Flux.doOnEach(java.util.function.Consumer< reactor.core.publisher.Flux.doOnError(java.util.function.Consumer< reactor.core.publisher.Flux.doOnError(java.util.function.Predicate< reactor.core.publisher.Flux.doOnNext(java.util.function.Consumer< reactor.core.publisher.Flux.doOnRequest reactor.core.publisher.Flux.doOnSubscribe(java.util.function.Consumer< reactor.core.publisher.Flux.doOnTerminate reactor.core.publisher.Flux.elapsed reactor.core.publisher.Flux.elapsed reactor.core.publisher.Flux.elementAt reactor.core.publisher.Flux.elementAt reactor.core.publisher.Flux.expand(java.util.function.Function< reactor.core.publisher.Flux.expand(java.util.function.Function< reactor.core.publisher.Flux.expandDeep(java.util.function.Function< reactor.core.publisher.Flux.expandDeep(java.util.function.Function< reactor.core.publisher.Flux.filter(java.util.function.Predicate< reactor.core.publisher.Flux.filterWhen(java.util.function.Function< reactor.core.publisher.Flux.filterWhen(java.util.function.Function< reactor.core.publisher.Flux.getPrefetch reactor.core.publisher.Flux.hasElement reactor.core.publisher.Flux.hasElements reactor.core.publisher.Flux.hide reactor.core.publisher.Flux.ignoreElements reactor.core.publisher.Flux.index reactor.core.publisher.Flux.interval reactor.core.publisher.Flux.interval reactor.core.publisher.Flux.interval reactor.core.publisher.Flux.interval reactor.core.publisher.Flux.last reactor.core.publisher.Flux.last reactor.core.publisher.Flux.limitRate reactor.core.publisher.Flux.limitRate reactor.core.publisher.Flux.limitRequest reactor.core.publisher.Flux.log reactor.core.publisher.Flux.log reactor.core.publisher.Flux.log reactor.core.publisher.Flux.log reactor.core.publisher.Flux.log reactor.core.publisher.Flux.log reactor.core.publisher.Flux.materialize reactor.core.publisher.Flux.mergeComparingWith(org.reactivestreams.Publisher< reactor.core.publisher.Flux.mergeOrderedWith(org.reactivestreams.Publisher< reactor.core.publisher.Flux.mergeWith(org.reactivestreams.Publisher< reactor.core.publisher.Flux.metrics reactor.core.publisher.Flux.name reactor.core.publisher.Flux.next reactor.core.publisher.Flux.onBackpressureBuffer reactor.core.publisher.Flux.onBackpressureBuffer reactor.core.publisher.Flux.onBackpressureBuffer(int,java.util.function.Consumer< reactor.core.publisher.Flux.onBackpressureBuffer(int,java.util.function.Consumer< reactor.core.publisher.Flux.onBackpressureBuffer reactor.core.publisher.Flux.onBackpressureBuffer(java.time.Duration,int,java.util.function.Consumer< reactor.core.publisher.Flux.onBackpressureBuffer(java.time.Duration,int,java.util.function.Consumer< reactor.core.publisher.Flux.onBackpressureDrop reactor.core.publisher.Flux.onBackpressureDrop(java.util.function.Consumer< reactor.core.publisher.Flux.onBackpressureError reactor.core.publisher.Flux.onBackpressureLatest reactor.core.publisher.Flux.onErrorComplete reactor.core.publisher.Flux.onErrorComplete(java.lang.Class< reactor.core.publisher.Flux.onErrorComplete(java.util.function.Predicate< reactor.core.publisher.Flux.onErrorContinue reactor.core.publisher.Flux.onErrorMap(java.util.function.Function< reactor.core.publisher.Flux.onErrorMap(java.util.function.Predicate< reactor.core.publisher.Flux.onErrorResume(java.util.function.Function< reactor.core.publisher.Flux.onErrorResume(java.util.function.Predicate< reactor.core.publisher.Flux.onErrorReturn reactor.core.publisher.Flux.onErrorReturn(java.util.function.Predicate< reactor.core.publisher.Flux.onErrorStop reactor.core.publisher.Flux.onTerminateDetach reactor.core.publisher.Flux.or(org.reactivestreams.Publisher< reactor.core.publisher.Flux.parallel reactor.core.publisher.Flux.parallel reactor.core.publisher.Flux.parallel reactor.core.publisher.Flux.publish reactor.core.publisher.Flux.publish reactor.core.publisher.Flux.publishNext reactor.core.publisher.Flux.publishOn reactor.core.publisher.Flux.publishOn reactor.core.publisher.Flux.publishOn reactor.core.publisher.Flux.range reactor.core.publisher.Flux.reduce reactor.core.publisher.Flux.repeat reactor.core.publisher.Flux.repeat reactor.core.publisher.Flux.repeat reactor.core.publisher.Flux.repeat reactor.core.publisher.Flux.repeatWhen(java.util.function.Function<reactor.core.publisher.Flux<java.lang.Long>, reactor.core.publisher.Flux.replay reactor.core.publisher.Flux.replay reactor.core.publisher.Flux.replay reactor.core.publisher.Flux.replay reactor.core.publisher.Flux.replay reactor.core.publisher.Flux.replay reactor.core.publisher.Flux.retry reactor.core.publisher.Flux.retry reactor.core.publisher.Flux.retryWhen reactor.core.publisher.Flux.sample reactor.core.publisher.Flux.sampleFirst reactor.core.publisher.Flux.scan reactor.core.publisher.Flux.share reactor.core.publisher.Flux.shareNext reactor.core.publisher.Flux.single reactor.core.publisher.Flux.single reactor.core.publisher.Flux.singleOrEmpty reactor.core.publisher.Flux.skip reactor.core.publisher.Flux.skip reactor.core.publisher.Flux.skip reactor.core.publisher.Flux.skipLast reactor.core.publisher.Flux.skipUntil(java.util.function.Predicate< reactor.core.publisher.Flux.skipUntilOther(org.reactivestreams.Publisher< reactor.core.publisher.Flux.skipWhile(java.util.function.Predicate< reactor.core.publisher.Flux.sort reactor.core.publisher.Flux.sort(java.util.Comparator< reactor.core.publisher.Flux.startWith reactor.core.publisher.Flux.startWith(java.lang.Iterable< reactor.core.publisher.Flux.startWith(org.reactivestreams.Publisher< reactor.core.publisher.Flux.subscribe reactor.core.publisher.Flux.subscribe(java.util.function.Consumer< reactor.core.publisher.Flux.subscribe(java.util.function.Consumer< reactor.core.publisher.Flux.subscribe(java.util.function.Consumer< reactor.core.publisher.Flux.subscribe(java.util.function.Consumer< reactor.core.publisher.Flux.subscribe(java.util.function.Consumer< reactor.core.publisher.Flux.subscribe(org.reactivestreams.Subscriber< reactor.core.publisher.Flux.subscribeOn reactor.core.publisher.Flux.subscribeOn reactor.core.publisher.Flux.subscriberContext reactor.core.publisher.Flux.subscriberContext reactor.core.publisher.Flux.switchIfEmpty(org.reactivestreams.Publisher< reactor.core.publisher.Flux.tag reactor.core.publisher.Flux.take reactor.core.publisher.Flux.take reactor.core.publisher.Flux.take reactor.core.publisher.Flux.take reactor.core.publisher.Flux.takeLast reactor.core.publisher.Flux.takeUntil(java.util.function.Predicate< reactor.core.publisher.Flux.takeUntilOther(org.reactivestreams.Publisher< reactor.core.publisher.Flux.takeWhile(java.util.function.Predicate< reactor.core.publisher.Flux.then reactor.core.publisher.Flux.thenEmpty reactor.core.publisher.Flux.timed reactor.core.publisher.Flux.timed reactor.core.publisher.Flux.timeout reactor.core.publisher.Flux.timeout(java.time.Duration,org.reactivestreams.Publisher< reactor.core.publisher.Flux.timeout(java.time.Duration,org.reactivestreams.Publisher< reactor.core.publisher.Flux.timeout reactor.core.publisher.Flux.timestamp reactor.core.publisher.Flux.timestamp reactor.core.publisher.Flux.toIterable reactor.core.publisher.Flux.toIterable reactor.core.publisher.Flux.toIterable reactor.core.publisher.Flux.toStream reactor.core.publisher.Flux.toStream reactor.core.publisher.Flux.toString reactor.core.publisher.Flux.window reactor.core.publisher.Flux.window reactor.core.publisher.Flux.window reactor.core.publisher.Flux.window reactor.core.publisher.Flux.window reactor.core.publisher.Flux.window reactor.core.publisher.Flux.window(org.reactivestreams.Publisher< reactor.core.publisher.Flux.windowTimeout reactor.core.publisher.Flux.windowTimeout reactor.core.publisher.Flux.windowTimeout reactor.core.publisher.Flux.windowTimeout reactor.core.publisher.Flux.windowUntil reactor.core.publisher.Flux.windowUntil reactor.core.publisher.Flux.windowUntil reactor.core.publisher.Flux.windowWhile reactor.core.publisher.Flux.windowWhile

Constructor Details

PagedFlux

public PagedFlux(Function>> firstPageRetriever)

Creates an instance of PagedFlux<T> that consists of only a single page with a given element count.

Code sample

// A function that fetches the single page of data from a source/service.
 Function<Integer, Mono<PagedResponse<Integer>>> singlePageRetriever = pageSize ->
     getFirstPageWithSize(pageSize);

 PagedFlux<Integer> singlePageFluxWithPageSize = new PagedFlux<Integer>(singlePageRetriever);

Parameters:

firstPageRetriever - Function that retrieves the first page.

PagedFlux

public PagedFlux(Function>> firstPageRetriever, BiFunction>> nextPageRetriever)

Creates an instance of PagedFlux<T> that is capable of retrieving multiple pages with of a given page size.

Code sample

// A function that fetches the first page of data from a source/service.
 Function<Integer, Mono<PagedResponse<Integer>>> firstPageRetriever = pageSize -> getFirstPageWithSize(pageSize);

 // A function that fetches subsequent pages of data from a source/service given a continuation token.
 BiFunction<String, Integer, Mono<PagedResponse<Integer>>> nextPageRetriever = (continuationToken, pageSize) ->
     getNextPageWithSize(continuationToken, pageSize);

 PagedFlux<Integer> pagedFluxWithPageSize = new PagedFlux<>(firstPageRetriever, nextPageRetriever);

Parameters:

firstPageRetriever - Function that retrieves the first page.
nextPageRetriever - BiFunction that retrieves the next page given a continuation token and page size.

PagedFlux

public PagedFlux(Supplier>> firstPageRetriever)

Creates an instance of PagedFlux<T> that consists of only a single page. This constructor takes a Supplier that return the single page of T.

Code sample

// A supplier that fetches the first page of data from source/service
 Supplier<Mono<PagedResponse<Integer>>> firstPageRetrieverFunction = () -> getFirstPage();

 PagedFlux<Integer> pagedFluxInstance = new PagedFlux<>(firstPageRetrieverFunction,
     nextPageRetriever);

Parameters:

firstPageRetriever - Supplier that retrieves the first page.

PagedFlux

public PagedFlux(Supplier>> firstPageRetriever, Function>> nextPageRetriever)

Creates an instance of PagedFlux<T>. The constructor takes a Supplier and Function. The Supplier returns the first page of T, the Function retrieves subsequent pages of T.

Code sample

// A supplier that fetches the first page of data from source/service
 Supplier<Mono<PagedResponse<Integer>>> firstPageRetriever = () -> getFirstPage();

 // A function that fetches subsequent pages of data from source/service given a continuation token
 Function<String, Mono<PagedResponse<Integer>>> nextPageRetriever =
     continuationToken -> getNextPage(continuationToken);

 PagedFlux<Integer> pagedFlux = new PagedFlux<>(firstPageRetriever,
     nextPageRetriever);

Parameters:

firstPageRetriever - Supplier that retrieves the first page
nextPageRetriever - Function that retrieves the next page given a continuation token

Method Details

create

public static PagedFlux create(Supplier>> provider)

Creates an instance of PagedFlux<T> backed by a Page Retriever Supplier (provider). When invoked provider should return PageRetriever<C,P>. The provider will be called for each Subscription to the PagedFlux instance. The Page Retriever can get called multiple times in serial fashion, each time after the completion of the Flux returned from the previous invocation. The final completion signal will be send to the Subscriber when the last Page emitted by the Flux returned by Page Retriever has null continuation token. The provider is useful mainly in two scenarios:

  • To manage state across multiple call to Page Retrieval within the same Subscription.
  • To decorate a PagedFlux to produce new PagedFlux.

Decoration sample

// Transform a PagedFlux with Integer items to PagedFlux of String items.
 final PagedFlux<Integer> intPagedFlux = createAnInstance();

 // PagedResponse<Integer> to PagedResponse<String> mapper
 final Function<PagedResponse<Integer>, PagedResponse<String>> responseMapper
     = intResponse -> new PagedResponseBase<Void, String>(intResponse.getRequest(),
     intResponse.getStatusCode(),
     intResponse.getHeaders(),
     intResponse.getValue()
         .stream()
         .map(intValue -> Integer.toString(intValue)).collect(Collectors.toList()),
     intResponse.getContinuationToken(),
     null);

 final Supplier<PageRetriever<String, PagedResponse<String>>> provider = () ->
     (continuationToken, pageSize) -> {
         Flux<PagedResponse<Integer>> flux = (continuationToken == null)
             ? intPagedFlux.byPage()
             : intPagedFlux.byPage(continuationToken);
         return flux.map(responseMapper);
     };
 PagedFlux<String> strPagedFlux = PagedFlux.create(provider);

 // Create a PagedFlux from a PagedFlux with all exceptions mapped to a specific exception.
 final PagedFlux<Integer> pagedFlux = createAnInstance();
 final Supplier<PageRetriever<String, PagedResponse<Integer>>> exceptionProvider = () ->
     (continuationToken, pageSize) -> {
         Flux<PagedResponse<Integer>> flux = (continuationToken == null)
             ? pagedFlux.byPage()
             : pagedFlux.byPage(continuationToken);
         return flux.onErrorMap(Exception.class, PaginationException::new);
     };
 final PagedFlux<Integer> exceptionMappedPagedFlux = PagedFlux.create(exceptionProvider);

Parameters:

provider - the Page Retrieval Provider

Returns:

PagedFlux backed by the Page Retriever Function Supplier

mapPage

@Deprecated
public PagedFlux mapPage(Function mapper)

Deprecated

refer the decoration samples for PagedFlux#create(Supplier).

Maps this PagedFlux instance of T to a PagedFlux instance of type S as per the provided mapper function.

Parameters:

mapper - The mapper function to convert from type T to type S.

Returns:

A PagedFlux of type S.

Applies to