Compartilhar via


ContinuablePagedFluxCore<C,T,P> Class

Type Parameters

C

the type of the continuation token

T

The type of elements in a ContinuablePage

P

The ContinuablePage holding items of type T.

public abstract class ContinuablePagedFluxCore<C,T,P>
extends ContinuablePagedFlux<C,T,P>

The default implementation of ContinuablePagedFlux<C,T,P>.

This type is a Flux that provides the ability to operate on pages of type ContinuablePage<C,T> and individual items in such pages. This type supports user-provided continuation tokens, allowing for restarting from a previously-retrieved continuation token.

The type is backed by the Page Retriever provider provided in it's constructor. The provider is expected to return PageRetriever<C,P> when called. The provider is invoked for each Subscription to this Flux. Given provider is called per Subscription, the provider implementation can create one or more objects to store any state and Page Retriever can capture and use those objects. This indirectly associate the state objects to the Subscription. The Page Retriever can get called multiple times in serial fashion, each time after the completion of the Flux returned by the previous invocation. The final completion signal will be send to the Subscriber when the last Page emitted by the Flux returned by the Page Retriever has null continuation token.

Extending PagedFluxCore for Custom Continuation Token support

class ContinuationState<C> {
     private C lastContinuationToken;
     private boolean isDone;

     ContinuationState(C token) {
         this.lastContinuationToken = token;
     }

     void setLastContinuationToken(C token) {
         this.isDone = token == null;
         this.lastContinuationToken = token;
     }

     C getLastContinuationToken() {
         return this.lastContinuationToken;
     }

     boolean isDone() {
         return this.isDone;
     }
 }

 class FileContinuationToken {
     private final int nextLinkId;

     FileContinuationToken(int nextLinkId) {
         this.nextLinkId = nextLinkId;
     }

     public int getNextLinkId() {
         return nextLinkId;
     }
 }

 class File {
     private final String guid;

     File(String guid) {
         this.guid = guid;
     }

     public String getGuid() {
         return guid;
     }
 }

 class FilePage implements ContinuablePage<FileContinuationToken, File> {
     private final IterableStream<File> elements;
     private final FileContinuationToken fileContinuationToken;

     FilePage(List<File> elements, FileContinuationToken fileContinuationToken) {
         this.elements = IterableStream.of(elements);
         this.fileContinuationToken = fileContinuationToken;
     }

     @Override
     public IterableStream<File> getElements() {
         return elements;
     }

     @Override
     public FileContinuationToken getContinuationToken() {
         return fileContinuationToken;
     }
 }

 class FileShareServiceClient {
     Flux<FilePage> getFilePages(FileContinuationToken token) {
         List<File> files = Collections.singletonList(new File(UUID.randomUUID().toString()));
         if (token.getNextLinkId() < 10) {
             return Flux.just(new FilePage(files, null));
         } else {
             return Flux.just(new FilePage(files,
                 new FileContinuationToken((int) Math.floor(Math.random() * 20))));
         }
     }
 }

 FileShareServiceClient client = new FileShareServiceClient();

 Supplier<PageRetriever<FileContinuationToken, FilePage>> pageRetrieverProvider = () ->
     (continuationToken, pageSize) -> client.getFilePages(continuationToken);

 class FilePagedFlux extends ContinuablePagedFluxCore<FileContinuationToken, File, FilePage> {
     FilePagedFlux(Supplier<PageRetriever<FileContinuationToken, FilePage>>
         pageRetrieverProvider) {
         super(pageRetrieverProvider);
     }
 }

 FilePagedFlux filePagedFlux = new FilePagedFlux(pageRetrieverProvider);

Constructor Summary

Modifier Constructor Description
protected ContinuablePagedFluxCore(Supplier<PageRetriever<C,P>> pageRetrieverProvider)

Creates an instance of ContinuablePagedFluxCore<C,T,P>.

protected ContinuablePagedFluxCore(Supplier<PageRetriever<C,P>> pageRetrieverProvider, int pageSize)

Creates an instance of ContinuablePagedFluxCore<C,T,P>.

protected ContinuablePagedFluxCore(Supplier<PageRetriever<C,P>> pageRetrieverProvider, Integer pageSize, Predicate<C> continuationPredicate)

Creates an instance of ContinuablePagedFluxCore<C,T,P>.

Method Summary

Modifier and Type Method and Description
Flux<P> byPage()

Gets a Flux of ContinuablePage<C,T> starting at the first page.

Flux<P> byPage(C continuationToken)

Gets a Flux of ContinuablePage<C,T> beginning at the page identified by the given continuation token.

Flux<P> byPage(C continuationToken, int preferredPageSize)

Gets a Flux of ContinuablePage<C,T> beginning at the page identified by the given continuation token requesting each page to contain the number of elements equal to the preferred page size.

Flux<P> byPage(int preferredPageSize)

Gets a Flux of ContinuablePage<C,T> starting at the first page requesting each page to contain a number of elements equal to the preferred page size.

Integer getPageSize()

Get the page size configured this ContinuablePagedFluxCore<C,T,P>.

void subscribe(CoreSubscriber<? super T> coreSubscriber)

Subscribe to consume all items of type T in the sequence respectively.

Methods inherited from ContinuablePagedFlux

Methods inherited from java.lang.Object

Methods inherited from reactor.core.publisher.Flux

reduce reduceWith scan scanWith buffer buffer buffer bufferTimeout bufferTimeout cast collect doOnError onErrorContinue onErrorContinue onErrorMap onErrorResume onErrorReturn subscribeWith zip zip first first firstWithSignal firstWithSignal firstWithValue firstWithValue index merge merge merge mergeComparing mergeDelayError mergeOrdered mergePriority mergeSequential mergeSequential mergeSequential mergeSequential mergeSequentialDelayError mergeSequentialDelayError collectMap collectMap collectMultimap collectMultimap groupBy groupBy collectMap collectMultimap groupBy groupBy error zip zip as collect concatMapIterable concatMapIterable doOnDiscard flatMap flatMap flatMapIterable flatMapIterable flatMapSequential flatMapSequential flatMapSequential flatMapSequentialDelayError handle publish publish using using usingWhen usingWhen generate generate combineLatest combineLatest combineLatest combineLatest zip zip zip combineLatest zip combineLatest zip combineLatest zip combineLatest zip combineLatest zip zipWith zipWith zipWithIterable zipWith zipWith zipWithIterable concat concat concat concat concatDelayError concatDelayError concatDelayError concatDelayError create create defer deferContextual deferWithContext empty error error from fromArray fromIterable fromStream fromStream generate just just merge merge merge mergeComparing mergeComparing mergeComparingDelayError mergeOrdered mergeOrdered mergePriority mergePriority mergePriorityDelayError mergeSequential mergeSequential mergeSequentialDelayError never onAssembly onAssembly push push switchOnNext switchOnNext groupJoin join zip withLatestFrom bufferWhen bufferWhen timeout timeout windowWhen delaySubscription ofType sample sampleFirst sampleTimeout sampleTimeout timeout distinct distinct bufferUntilChanged bufferUntilChanged bufferUntilChanged concatMap concatMap concatMapDelayError concatMapDelayError concatMapDelayError distinct distinctUntilChanged distinctUntilChanged flatMap flatMap flatMapDelayError map mapNotNull switchMap switchMap switchOnFirst switchOnFirst then thenMany transform transformDeferred transformDeferredContextual windowUntilChanged windowUntilChanged windowUntilChanged dematerialize all any blockFirst blockFirst blockLast blockLast buffer buffer buffer buffer buffer buffer buffer buffer bufferTimeout bufferTimeout bufferUntil bufferUntil bufferWhile cache cache cache cache cache cache cancelOn checkpoint checkpoint checkpoint collectList collectSortedList collectSortedList concatWith concatWithValues contextWrite contextWrite count defaultIfEmpty delayElements delayElements delaySequence delaySequence delaySubscription delaySubscription delayUntil distinct distinctUntilChanged doAfterTerminate doFinally doFirst doOnCancel doOnComplete doOnEach doOnError doOnError doOnNext doOnRequest doOnSubscribe doOnTerminate elapsed elapsed elementAt elementAt expand expand expandDeep expandDeep filter filterWhen filterWhen getPrefetch hasElement hasElements hide ignoreElements index interval interval interval interval last last limitRate limitRate limitRequest log log log log log log materialize mergeComparingWith mergeOrderedWith mergeWith metrics name next onBackpressureBuffer onBackpressureBuffer onBackpressureBuffer onBackpressureBuffer onBackpressureBuffer onBackpressureBuffer onBackpressureBuffer onBackpressureDrop onBackpressureDrop onBackpressureError onBackpressureLatest onErrorComplete onErrorComplete onErrorComplete onErrorContinue onErrorMap onErrorMap onErrorResume onErrorResume onErrorReturn onErrorReturn onErrorStop onTerminateDetach or parallel parallel parallel publish publish publishNext publishOn publishOn publishOn range reduce repeat repeat repeat repeat repeatWhen replay replay replay replay replay replay retry retry retryWhen sample sampleFirst scan share shareNext single single singleOrEmpty skip skip skip skipLast skipUntil skipUntilOther skipWhile sort sort startWith startWith startWith subscribe subscribe subscribe subscribe subscribe subscribe subscribe subscribe subscribeOn subscribeOn subscriberContext subscriberContext switchIfEmpty tag take take take take takeLast takeUntil takeUntilOther takeWhile then thenEmpty timed timed timeout timeout timeout timeout timestamp timestamp toIterable toIterable toIterable toStream toStream toString window window window window window window window windowTimeout windowTimeout windowTimeout windowTimeout windowUntil windowUntil windowUntil windowWhile windowWhile

Constructor Details

ContinuablePagedFluxCore

protected ContinuablePagedFluxCore(Supplier> pageRetrieverProvider)

Creates an instance of ContinuablePagedFluxCore<C,T,P>.

Parameters:

pageRetrieverProvider - a provider that returns PageRetriever<C,P>.

ContinuablePagedFluxCore

protected ContinuablePagedFluxCore(Supplier> pageRetrieverProvider, int pageSize)

Creates an instance of ContinuablePagedFluxCore<C,T,P>.

Parameters:

pageRetrieverProvider - a provider that returns PageRetriever<C,P>.
pageSize - the preferred page size

ContinuablePagedFluxCore

protected ContinuablePagedFluxCore(Supplier> pageRetrieverProvider, Integer pageSize, Predicate continuationPredicate)

Creates an instance of ContinuablePagedFluxCore<C,T,P>.

Parameters:

pageRetrieverProvider - A provider that returns PageRetriever<C,P>.
pageSize - The preferred page size.
continuationPredicate - A predicate which determines if paging should continue.

Method Details

byPage

public Flux

byPage()

Gets a Flux of ContinuablePage<C,T> starting at the first page.

Overrides:

ContinuablePagedFluxCore<C,T,P>.byPage()

byPage

public Flux

byPage(C continuationToken)

Gets a Flux of ContinuablePage<C,T> beginning at the page identified by the given continuation token.

Overrides:

ContinuablePagedFluxCore<C,T,P>.byPage(C continuationToken)

Parameters:

continuationToken

byPage

public Flux

byPage(C continuationToken, int preferredPageSize)

Gets a Flux of ContinuablePage<C,T> beginning at the page identified by the given continuation token requesting each page to contain the number of elements equal to the preferred page size.

The service may or may not honor the preferred page size therefore the client MUST be prepared to handle pages with different page sizes.

Overrides:

ContinuablePagedFluxCore<C,T,P>.byPage(C continuationToken, int preferredPageSize)

Parameters:

continuationToken
preferredPageSize

byPage

public Flux

byPage(int preferredPageSize)

Gets a Flux of ContinuablePage<C,T> starting at the first page requesting each page to contain a number of elements equal to the preferred page size.

The service may or may not honor the preferred page size therefore the client MUST be prepared to handle pages with different page sizes.

Overrides:

ContinuablePagedFluxCore<C,T,P>.byPage(int preferredPageSize)

Parameters:

preferredPageSize

getPageSize

public Integer getPageSize()

Get the page size configured this ContinuablePagedFluxCore<C,T,P>.

Returns:

the page size configured, null if unspecified.

subscribe

public void subscribe(CoreSubscriber coreSubscriber)

Subscribe to consume all items of type T in the sequence respectively. This is recommended for most common scenarios. This will seamlessly fetch next page when required and provide with a Flux of items.

Overrides:

ContinuablePagedFluxCore<C,T,P>.subscribe(CoreSubscriber<? super T> coreSubscriber)

Parameters:

coreSubscriber - The subscriber for this ContinuablePagedFluxCore<C,T,P>

Applies to