PollerFlux<T,U> Class
- java.
lang. Object - reactor.
core. publisher. Flux - com.
azure. core. util. polling. PollerFlux<T,U>
- com.
- reactor.
Type Parameters
- T
The type of poll response value.
- U
The type of the final result of long-running operation.
public final class PollerFlux<T,U>
extends Flux<AsyncPollResponse<T,U>>
A Flux that simplifies the task of executing long-running operations against an Azure service. A subscription to PollerFlux<T,U> initiates a long-running operation and polls the status until it completes.
Code samples
Instantiating and subscribing to PollerFlux
LocalDateTime timeToReturnFinalResponse = LocalDateTime.now().plus(Duration.ofMillis(800));
// Create poller instance
PollerFlux<String, String> poller = new PollerFlux<>(Duration.ofMillis(100),
(context) -> Mono.empty(),
// Define your custom poll operation
(context) -> {
if (LocalDateTime.now().isBefore(timeToReturnFinalResponse)) {
System.out.println("Returning intermediate response.");
return Mono.just(new PollResponse<>(LongRunningOperationStatus.IN_PROGRESS,
"Operation in progress."));
} else {
System.out.println("Returning final response.");
return Mono.just(new PollResponse<>(LongRunningOperationStatus.SUCCESSFULLY_COMPLETED,
"Operation completed."));
}
},
(activationResponse, context) -> Mono.error(new RuntimeException("Cancellation is not supported")),
(context) -> Mono.just("Final Output"));
// Listen to poll responses
poller.subscribe(response -> {
// Process poll response
System.out.printf("Got response. Status: %s, Value: %s%n", response.getStatus(), response.getValue());
});
// Do something else
Asynchronously wait for polling to complete and then retrieve the final result
LocalDateTime timeToReturnFinalResponse = LocalDateTime.now().plus(Duration.ofMinutes(5));
// Create poller instance
PollerFlux<String, String> poller = new PollerFlux<>(Duration.ofMillis(100),
(context) -> Mono.empty(),
(context) -> {
if (LocalDateTime.now().isBefore(timeToReturnFinalResponse)) {
System.out.println("Returning intermediate response.");
return Mono.just(new PollResponse<>(LongRunningOperationStatus.IN_PROGRESS,
"Operation in progress."));
} else {
System.out.println("Returning final response.");
return Mono.just(new PollResponse<>(LongRunningOperationStatus.SUCCESSFULLY_COMPLETED,
"Operation completed."));
}
},
(activationResponse, context) -> Mono.just("FromServer:OperationIsCancelled"),
(context) -> Mono.just("FromServer:FinalOutput"));
poller.take(Duration.ofMinutes(30))
.last()
.flatMap(asyncPollResponse -> {
if (asyncPollResponse.getStatus() == LongRunningOperationStatus.SUCCESSFULLY_COMPLETED) {
// operation completed successfully, retrieving final result.
return asyncPollResponse
.getFinalResult();
} else {
return Mono.error(new RuntimeException("polling completed unsuccessfully with status:"
+ asyncPollResponse.getStatus()));
}
}).block();
Block for polling to complete and then retrieve the final result
AsyncPollResponse<String, String> terminalResponse = pollerFlux.blockLast();
System.out.printf("Polling complete. Final Status: %s", terminalResponse.getStatus());
if (terminalResponse.getStatus() == LongRunningOperationStatus.SUCCESSFULLY_COMPLETED) {
String finalResult = terminalResponse.getFinalResult().block();
System.out.printf("Polling complete. Final Status: %s", finalResult);
}
Asynchronously poll until poller receives matching status
final Predicate<AsyncPollResponse<String, String>> isComplete = response -> {
return response.getStatus() != LongRunningOperationStatus.IN_PROGRESS
&& response.getStatus() != LongRunningOperationStatus.NOT_STARTED;
};
pollerFlux
.takeUntil(isComplete)
.subscribe(completed -> {
System.out.println("Completed poll response, status: " + completed.getStatus());
});
Asynchronously cancel the long running operation
LocalDateTime timeToReturnFinalResponse = LocalDateTime.now().plus(Duration.ofMinutes(5));
// Create poller instance
PollerFlux<String, String> poller = new PollerFlux<>(Duration.ofMillis(100),
(context) -> Mono.empty(),
(context) -> {
if (LocalDateTime.now().isBefore(timeToReturnFinalResponse)) {
System.out.println("Returning intermediate response.");
return Mono.just(new PollResponse<>(LongRunningOperationStatus.IN_PROGRESS,
"Operation in progress."));
} else {
System.out.println("Returning final response.");
return Mono.just(new PollResponse<>(LongRunningOperationStatus.SUCCESSFULLY_COMPLETED,
"Operation completed."));
}
},
(activationResponse, context) -> Mono.just("FromServer:OperationIsCancelled"),
(context) -> Mono.just("FromServer:FinalOutput"));
// Asynchronously wait 30 minutes to complete the polling, if not completed
// within in the time then cancel the server operation.
poller.take(Duration.ofMinutes(30))
.last()
.flatMap(asyncPollResponse -> {
if (!asyncPollResponse.getStatus().isComplete()) {
return asyncPollResponse
.cancelOperation()
.then(Mono.error(new RuntimeException("Operation is cancelled!")));
} else {
return Mono.just(asyncPollResponse);
}
}).block();
Instantiating and subscribing to PollerFlux from a known polling strategy
// Create poller instance
PollerFlux<BinaryData, String> poller = PollerFlux.create(
Duration.ofMillis(100),
// pass in your custom activation operation
() -> Mono.just(new SimpleResponse<Void>(new HttpRequest(
HttpMethod.POST,
"http://httpbin.org"),
202,
new HttpHeaders().set("Operation-Location", "http://httpbin.org"),
null)),
new OperationResourcePollingStrategy<>(new HttpPipelineBuilder().build()),
TypeReference.createInstance(BinaryData.class),
TypeReference.createInstance(String.class));
// Listen to poll responses
poller.subscribe(response -> {
// Process poll response
System.out.printf("Got response. Status: %s, Value: %s%n", response.getStatus(), response.getValue());
});
// Do something else
Instantiating and subscribing to PollerFlux from a custom polling strategy
// Create custom polling strategy based on OperationResourcePollingStrategy
PollingStrategy<BinaryData, String> strategy = new OperationResourcePollingStrategy<BinaryData, String>(
new HttpPipelineBuilder().build()) {
// override any interface method to customize the polling behavior
@Override
public Mono<PollResponse<BinaryData>> poll(PollingContext<BinaryData> context,
TypeReference<BinaryData> pollResponseType) {
return Mono.just(new PollResponse<>(
LongRunningOperationStatus.SUCCESSFULLY_COMPLETED,
BinaryData.fromString("")));
}
};
// Create poller instance
PollerFlux<BinaryData, String> poller = PollerFlux.create(
Duration.ofMillis(100),
// pass in your custom activation operation
() -> Mono.just(new SimpleResponse<Void>(new HttpRequest(
HttpMethod.POST,
"http://httpbin.org"),
202,
new HttpHeaders().set("Operation-Location", "http://httpbin.org"),
null)),
strategy,
TypeReference.createInstance(BinaryData.class),
TypeReference.createInstance(String.class));
// Listen to poll responses
poller.subscribe(response -> {
// Process poll response
System.out.printf("Got response. Status: %s, Value: %s%n", response.getStatus(), response.getValue());
});
// Do something else
Constructor Summary
Method Summary
Methods inherited from java.lang.Object
Methods inherited from reactor.core.publisher.Flux
Constructor Details
PollerFlux
public PollerFlux(Duration pollInterval, Function<>
Creates PollerFlux.
Parameters:
Method Details
<T,U>create
public static PollerFlux
Creates PollerFlux.
This method differs from the PollerFlux constructor in that the constructor uses an activationOperation which returns a Mono that emits result, the create method uses an activationOperation which returns a Mono that emits PollResponse<T>. The PollResponse<T> holds the result. If the PollResponse<T> from the activationOperation indicate that long-running operation is completed then the pollOperation will not be called.
Parameters:
Returns:
<T,U>create
public static PollerFlux
Creates PollerFlux.
This method uses a PollingStrategy<T,U> to poll the status of a long-running operation after the activation operation is invoked. See PollingStrategy<T,U> for more details of known polling strategies and how to create a custom strategy.
Parameters:
Returns:
<T,U>error
public static PollerFlux
Creates a PollerFlux instance that returns an error on subscription.
Parameters:
Returns:
getPollInterval
public Duration getPollInterval()
Returns the current polling duration for this PollerFlux<T,U> instance.
Returns:
getSyncPoller
public SyncPoller
Gets a synchronous blocking poller.
Returns:
setPollInterval
public PollerFlux
Sets the poll interval for this poller. The new interval will be used for all subsequent polling operations including the subscriptions that are already in progress.
Parameters:
Returns:
subscribe
public void subscribe(CoreSubscriber super AsyncPollResponse
Overrides:
PollerFlux<T,U>.subscribe(CoreSubscriber<? super AsyncPollResponse<T,U>> actual)Parameters:
Applies to
Azure SDK for Java
Commentaires
https://aka.ms/ContentUserFeedback.
Bientôt disponible : Tout au long de 2024, nous allons supprimer progressivement GitHub Issues comme mécanisme de commentaires pour le contenu et le remplacer par un nouveau système de commentaires. Pour plus d’informations, consultezEnvoyer et afficher des commentaires pour