Compartilhar via


PagedFluxBase<T,P> Class

Type Parameters

T

The type of items in P.

P

The PagedResponse holding items of type T.

@Deprecated
public class PagedFluxBase<T,P>
extends ContinuablePagedFluxCore<String,T,P>

Note

This class has been deprecated. use ContinuablePagedFluxCore<C,T,P>.

This class is a flux that can operate on any type that extends PagedResponse<T> and also provides the ability to operate on individual items. When processing the response by page, each response will contain the items in the page as well as the request details like status code and headers.

Process each item in Flux

To process one item at a time, simply subscribe to this Flux.

pagedFluxBase
     .log()
     .subscribe(item -> System.out.println("Processing item with value: " + item),
         error -> System.err.println("An error occurred: " + error),
         () -> System.out.println("Processing complete."));

Process one page at a time

To process one page at a time, starting from the beginning, use byPage() method.

pagedFluxBase
     .byPage()
     .log()
     .subscribe(page -> System.out.printf("Processing page containing item values: %s%n",
         page.getElements().stream().map(String::valueOf).collect(Collectors.joining(", "))),
         error -> System.err.println("An error occurred: " + error),
         () -> System.out.println("Processing complete."));

Process items starting from a continuation token

To process items one page at a time starting from any page associated with a continuation token, use byPage(String continuationToken).

String continuationToken = getContinuationToken();
 pagedFluxBase
     .byPage(continuationToken)
     .log()
     .doOnSubscribe(ignored -> System.out.println(
         "Subscribed to paged flux processing pages starting from: " + continuationToken))
     .subscribe(page -> System.out.printf("Processing page containing item values: %s%n",
         page.getElements().stream().map(String::valueOf).collect(Collectors.joining(", "))),
         error -> System.err.println("An error occurred: " + error),
         () -> System.out.println("Processing complete."));

Constructor Summary

Constructor Description
PagedFluxBase(Supplier<Mono<P>> firstPageRetriever)

Creates an instance of PagedFluxBase<T,P> that consists of only a single page.

PagedFluxBase(Supplier<Mono<P>> firstPageRetriever, Function<String,Mono<P>> nextPageRetriever)

Creates an instance of PagedFluxBase<T,P>.

Method Summary

Modifier and Type Method and Description
Flux<P> byPage()

Creates a Flux of PagedResponse<T> starting from the first page.

Flux<P> byPage(String continuationToken)

Creates a Flux of PagedResponse<T> starting from the next page associated with the given continuation token.

void subscribe(CoreSubscriber<? super T> coreSubscriber)

Subscribe to consume all items of type T in the sequence respectively.

Methods inherited from ContinuablePagedFlux

Methods inherited from ContinuablePagedFluxCore

byPage byPage byPage byPage getPageSize com.azure.core.util.paging.ContinuablePagedFluxCore.subscribe(reactor.core.CoreSubscriber<

Methods inherited from java.lang.Object

Methods inherited from reactor.core.publisher.Flux

reduce reduceWith scan scanWith buffer buffer buffer bufferTimeout bufferTimeout cast collect doOnError onErrorContinue onErrorContinue onErrorMap onErrorResume onErrorReturn subscribeWith zip zip first first firstWithSignal firstWithSignal firstWithValue firstWithValue index merge merge merge mergeComparing mergeDelayError mergeOrdered mergePriority mergeSequential mergeSequential mergeSequential mergeSequential mergeSequentialDelayError mergeSequentialDelayError collectMap collectMap collectMultimap collectMultimap groupBy groupBy collectMap collectMultimap groupBy groupBy error zip zip as collect concatMapIterable concatMapIterable doOnDiscard flatMap flatMap flatMapIterable flatMapIterable flatMapSequential flatMapSequential flatMapSequential flatMapSequentialDelayError handle publish publish using using usingWhen usingWhen generate generate combineLatest combineLatest combineLatest combineLatest zip zip zip combineLatest zip combineLatest zip combineLatest zip combineLatest zip combineLatest zip zipWith zipWith zipWithIterable zipWith zipWith zipWithIterable concat concat concat concat concatDelayError concatDelayError concatDelayError concatDelayError create create defer deferContextual deferWithContext empty error error from fromArray fromIterable fromStream fromStream generate just just merge merge merge mergeComparing mergeComparing mergeComparingDelayError mergeOrdered mergeOrdered mergePriority mergePriority mergePriorityDelayError mergeSequential mergeSequential mergeSequentialDelayError never onAssembly onAssembly push push switchOnNext switchOnNext groupJoin join zip withLatestFrom bufferWhen bufferWhen timeout timeout windowWhen delaySubscription ofType sample sampleFirst sampleTimeout sampleTimeout timeout distinct distinct bufferUntilChanged bufferUntilChanged bufferUntilChanged concatMap concatMap concatMapDelayError concatMapDelayError concatMapDelayError distinct distinctUntilChanged distinctUntilChanged flatMap flatMap flatMapDelayError map mapNotNull switchMap switchMap switchOnFirst switchOnFirst then thenMany transform transformDeferred transformDeferredContextual windowUntilChanged windowUntilChanged windowUntilChanged dematerialize all any blockFirst blockFirst blockLast blockLast buffer buffer buffer buffer buffer buffer buffer buffer bufferTimeout bufferTimeout bufferUntil bufferUntil bufferWhile cache cache cache cache cache cache cancelOn checkpoint checkpoint checkpoint collectList collectSortedList collectSortedList concatWith concatWithValues contextWrite contextWrite count defaultIfEmpty delayElements delayElements delaySequence delaySequence delaySubscription delaySubscription delayUntil distinct distinctUntilChanged doAfterTerminate doFinally doFirst doOnCancel doOnComplete doOnEach doOnError doOnError doOnNext doOnRequest doOnSubscribe doOnTerminate elapsed elapsed elementAt elementAt expand expand expandDeep expandDeep filter filterWhen filterWhen getPrefetch hasElement hasElements hide ignoreElements index interval interval interval interval last last limitRate limitRate limitRequest log log log log log log materialize mergeComparingWith mergeOrderedWith mergeWith metrics name next onBackpressureBuffer onBackpressureBuffer onBackpressureBuffer onBackpressureBuffer onBackpressureBuffer onBackpressureBuffer onBackpressureBuffer onBackpressureDrop onBackpressureDrop onBackpressureError onBackpressureLatest onErrorComplete onErrorComplete onErrorComplete onErrorContinue onErrorMap onErrorMap onErrorResume onErrorResume onErrorReturn onErrorReturn onErrorStop onTerminateDetach or parallel parallel parallel publish publish publishNext publishOn publishOn publishOn range reduce repeat repeat repeat repeat repeatWhen replay replay replay replay replay replay retry retry retryWhen sample sampleFirst scan share shareNext single single singleOrEmpty skip skip skip skipLast skipUntil skipUntilOther skipWhile sort sort startWith startWith startWith subscribe subscribe subscribe subscribe subscribe subscribe subscribe subscribeOn subscribeOn subscriberContext subscriberContext switchIfEmpty tag take take take take takeLast takeUntil takeUntilOther takeWhile then thenEmpty timed timed timeout timeout timeout timeout timestamp timestamp toIterable toIterable toIterable toStream toStream toString window window window window window window window windowTimeout windowTimeout windowTimeout windowTimeout windowUntil windowUntil windowUntil windowWhile windowWhile

Constructor Details

PagedFluxBase

public PagedFluxBase(Supplier> firstPageRetriever)

Creates an instance of PagedFluxBase<T,P> that consists of only a single page. This constructor takes a Supplier that return the single page of T.

Code sample

// A supplier that fetches the first page of data from source/service
 Supplier<Mono<PagedResponse<Integer>>> firstPageRetrieverFunction = () -> getFirstPage();

 PagedFluxBase<Integer, PagedResponse<Integer>> pagedFluxBaseInstance =
     new PagedFluxBase<>(firstPageRetrieverFunction,
         nextPageRetriever);

Parameters:

firstPageRetriever - Supplier that retrieves the first page.

PagedFluxBase

public PagedFluxBase(Supplier> firstPageRetriever, Function> nextPageRetriever)

Creates an instance of PagedFluxBase<T,P>. The constructor takes a Supplier and Function. The Supplier returns the first page of T, the Function retrieves subsequent pages of T.

Code sample

// A supplier that fetches the first page of data from source/service
 Supplier<Mono<PagedResponse<Integer>>> firstPageRetriever = () -> getFirstPage();

 // A function that fetches subsequent pages of data from source/service given a continuation token
 Function<String, Mono<PagedResponse<Integer>>> nextPageRetriever =
     continuationToken -> getNextPage(continuationToken);

 PagedFluxBase<Integer, PagedResponse<Integer>> pagedFluxBase = new PagedFluxBase<>(firstPageRetriever,
     nextPageRetriever);

Parameters:

firstPageRetriever - Supplier that retrieves the first page
nextPageRetriever - Function that retrieves the next page given a continuation token

Method Details

byPage

public Flux

byPage()

Creates a Flux of PagedResponse<T> starting from the first page.

Code sample

// Start processing the results from first page
 pagedFluxBase.byPage()
     .log()
     .doOnSubscribe(ignoredVal -> System.out.println(
         "Subscribed to paged flux processing pages starting from first page"))
     .subscribe(page -> System.out.printf("Processing page containing item values: %s%n",
         page.getElements().stream().map(String::valueOf).collect(Collectors.joining(", "))),
         error -> System.err.println("An error occurred: " + error),
         () -> System.out.println("Processing complete."));

Overrides:

PagedFluxBase<T,P>.byPage()

Returns:

A PagedFluxBase<T,P> starting from the first page

byPage

public Flux

byPage(String continuationToken)

Creates a Flux of PagedResponse<T> starting from the next page associated with the given continuation token. To start from first page, use byPage() instead.

Code sample

// Start processing the results from a page associated with the continuation token
 String continuationToken = getContinuationToken();
 pagedFluxBase.byPage(continuationToken)
     .log()
     .doOnSubscribe(ignoredVal -> System.out.println(
         "Subscribed to paged flux processing page starting from " + continuationToken))
     .subscribe(page -> System.out.printf("Processing page containing item values: %s%n",
         page.getElements().stream().map(String::valueOf).collect(Collectors.joining(", "))),
         error -> System.err.println("An error occurred: " + error),
         () -> System.out.println("Processing complete."));

Overrides:

PagedFluxBase<T,P>.byPage(String continuationToken)

Parameters:

continuationToken - The continuation token used to fetch the next page

Returns:

A PagedFluxBase<T,P> starting from the page associated with the continuation token

subscribe

public void subscribe(CoreSubscriber coreSubscriber)

Subscribe to consume all items of type T in the sequence respectively. This is recommended for most common scenarios. This will seamlessly fetch next page when required and provide with a Flux of items.

Code sample

pagedFluxBase.subscribe(new BaseSubscriber<Integer>() {
     @Override
     protected void hookOnSubscribe(Subscription subscription) {
         System.out.println("Subscribed to paged flux processing items");
         super.hookOnSubscribe(subscription);
     }

     @Override
     protected void hookOnNext(Integer value) {
         System.out.println("Processing item with value: " + value);
     }

     @Override
     protected void hookOnComplete() {
         System.out.println("Processing complete.");
     }
 });

Overrides:

PagedFluxBase<T,P>.subscribe(CoreSubscriber<? super T> coreSubscriber)

Parameters:

coreSubscriber - The subscriber for this PagedFluxBase<T,P>

Applies to