ContinuablePagedFluxCore<C,T,P> Class
- java.
lang. Object - reactor.
core. publisher. Flux - com.
azure. core. util. paging. ContinuablePagedFlux<C,T,P> - com.
azure. core. util. paging. ContinuablePagedFluxCore<C,T,P>
- com.
- com.
- reactor.
Type Parameters
- C
the type of the continuation token
- T
The type of elements in a ContinuablePage
- P
The ContinuablePage holding items of type
T
.
public abstract class ContinuablePagedFluxCore<C,T,P>
extends ContinuablePagedFlux<C,T,P>
The default implementation of ContinuablePagedFlux<C,T,P>.
This type is a Flux that provides the ability to operate on pages of type ContinuablePage<C,T> and individual items in such pages. This type supports user-provided continuation tokens, allowing for restarting from a previously-retrieved continuation token.
The type is backed by the Page Retriever provider provided in it's constructor. The provider is expected to return PageRetriever<C,P> when called. The provider is invoked for each Subscription to this Flux. Given provider is called per Subscription, the provider implementation can create one or more objects to store any state and Page Retriever can capture and use those objects. This indirectly associate the state objects to the Subscription. The Page Retriever can get called multiple times in serial fashion, each time after the completion of the Flux returned by the previous invocation. The final completion signal will be send to the Subscriber when the last Page emitted by the Flux returned by the Page Retriever has null
continuation token.
Extending PagedFluxCore for Custom Continuation Token support
class ContinuationState<C> {
private C lastContinuationToken;
private boolean isDone;
ContinuationState(C token) {
this.lastContinuationToken = token;
}
void setLastContinuationToken(C token) {
this.isDone = token == null;
this.lastContinuationToken = token;
}
C getLastContinuationToken() {
return this.lastContinuationToken;
}
boolean isDone() {
return this.isDone;
}
}
class FileContinuationToken {
private final int nextLinkId;
FileContinuationToken(int nextLinkId) {
this.nextLinkId = nextLinkId;
}
public int getNextLinkId() {
return nextLinkId;
}
}
class File {
private final String guid;
File(String guid) {
this.guid = guid;
}
public String getGuid() {
return guid;
}
}
class FilePage implements ContinuablePage<FileContinuationToken, File> {
private final IterableStream<File> elements;
private final FileContinuationToken fileContinuationToken;
FilePage(List<File> elements, FileContinuationToken fileContinuationToken) {
this.elements = IterableStream.of(elements);
this.fileContinuationToken = fileContinuationToken;
}
@Override
public IterableStream<File> getElements() {
return elements;
}
@Override
public FileContinuationToken getContinuationToken() {
return fileContinuationToken;
}
}
class FileShareServiceClient {
Flux<FilePage> getFilePages(FileContinuationToken token) {
List<File> files = Collections.singletonList(new File(UUID.randomUUID().toString()));
if (token.getNextLinkId() < 10) {
return Flux.just(new FilePage(files, null));
} else {
return Flux.just(new FilePage(files,
new FileContinuationToken((int) Math.floor(Math.random() * 20))));
}
}
}
FileShareServiceClient client = new FileShareServiceClient();
Supplier<PageRetriever<FileContinuationToken, FilePage>> pageRetrieverProvider = () ->
(continuationToken, pageSize) -> client.getFilePages(continuationToken);
class FilePagedFlux extends ContinuablePagedFluxCore<FileContinuationToken, File, FilePage> {
FilePagedFlux(Supplier<PageRetriever<FileContinuationToken, FilePage>>
pageRetrieverProvider) {
super(pageRetrieverProvider);
}
}
FilePagedFlux filePagedFlux = new FilePagedFlux(pageRetrieverProvider);
Constructor Summary
Modifier | Constructor | Description |
---|---|---|
protected | ContinuablePagedFluxCore(Supplier<PageRetriever<C,P>> pageRetrieverProvider) |
Creates an instance of ContinuablePagedFluxCore<C,T,P>. |
protected | ContinuablePagedFluxCore(Supplier<PageRetriever<C,P>> pageRetrieverProvider, int pageSize) |
Creates an instance of ContinuablePagedFluxCore<C,T,P>. |
protected | ContinuablePagedFluxCore(Supplier<PageRetriever<C,P>> pageRetrieverProvider, Integer pageSize, Predicate<C> continuationPredicate) |
Creates an instance of ContinuablePagedFluxCore<C,T,P>. |
Method Summary
Modifier and Type | Method and Description |
---|---|
Flux<P> |
byPage()
Gets a Flux of ContinuablePage<C,T> starting at the first page. |
Flux<P> |
byPage(C continuationToken)
Gets a Flux of ContinuablePage<C,T> beginning at the page identified by the given continuation token. |
Flux<P> |
byPage(C continuationToken, int preferredPageSize)
Gets a Flux of ContinuablePage<C,T> beginning at the page identified by the given continuation token requesting each page to contain the number of elements equal to the preferred page size. |
Flux<P> |
byPage(int preferredPageSize)
Gets a Flux of ContinuablePage<C,T> starting at the first page requesting each page to contain a number of elements equal to the preferred page size. |
Integer |
getPageSize()
Get the page size configured this ContinuablePagedFluxCore<C,T,P>. |
void |
subscribe(CoreSubscriber<? super T> coreSubscriber)
Subscribe to consume all items of type |
Methods inherited from ContinuablePagedFlux
Methods inherited from java.lang.Object
Methods inherited from reactor.core.publisher.Flux
Constructor Details
ContinuablePagedFluxCore
protected ContinuablePagedFluxCore(Supplier
Creates an instance of ContinuablePagedFluxCore<C,T,P>.
Parameters:
ContinuablePagedFluxCore
protected ContinuablePagedFluxCore(Supplier
Creates an instance of ContinuablePagedFluxCore<C,T,P>.
Parameters:
ContinuablePagedFluxCore
protected ContinuablePagedFluxCore(Supplier
Creates an instance of ContinuablePagedFluxCore<C,T,P>.
Parameters:
Method Details
byPage
public Flux
byPage()
Gets a Flux of ContinuablePage<C,T> starting at the first page.
Overrides:
ContinuablePagedFluxCore<C,T,P>.byPage()byPage
public Flux
byPage(C continuationToken)
Gets a Flux of ContinuablePage<C,T> beginning at the page identified by the given continuation token.
Overrides:
ContinuablePagedFluxCore<C,T,P>.byPage(C continuationToken)Parameters:
byPage
public Flux
byPage(C continuationToken, int preferredPageSize)
Gets a Flux of ContinuablePage<C,T> beginning at the page identified by the given continuation token requesting each page to contain the number of elements equal to the preferred page size.
The service may or may not honor the preferred page size therefore the client MUST be prepared to handle pages with different page sizes.
Overrides:
ContinuablePagedFluxCore<C,T,P>.byPage(C continuationToken, int preferredPageSize)Parameters:
byPage
public Flux
byPage(int preferredPageSize)
Gets a Flux of ContinuablePage<C,T> starting at the first page requesting each page to contain a number of elements equal to the preferred page size.
The service may or may not honor the preferred page size therefore the client MUST be prepared to handle pages with different page sizes.
Overrides:
ContinuablePagedFluxCore<C,T,P>.byPage(int preferredPageSize)Parameters:
getPageSize
public Integer getPageSize()
Get the page size configured this ContinuablePagedFluxCore<C,T,P>.
Returns:
null
if unspecified.subscribe
public void subscribe(CoreSubscriber coreSubscriber)
Subscribe to consume all items of type T
in the sequence respectively. This is recommended for most common scenarios. This will seamlessly fetch next page when required and provide with a Flux of items.
Overrides:
ContinuablePagedFluxCore<C,T,P>.subscribe(CoreSubscriber<? super T> coreSubscriber)Parameters: