FluxUtil Class

  • java.lang.Object
    • com.azure.core.util.FluxUtil

public final class FluxUtil

Utility type exposing methods to deal with Flux.

Method Summary

Modifier and Type Method and Description
static Flux<ByteBuffer> addProgressReporting(Flux<ByteBuffer> flux, ProgressReporter progressReporter)

Adds progress reporting to the provided Flux of ByteBuffer.

static byte[] byteBufferToArray(ByteBuffer byteBuffer)

Gets the content of the provided ByteBuffer as a byte array.

static Mono<byte[]> collectBytesFromNetworkResponse(Flux<ByteBuffer> stream, HttpHeaders headers)

Collects ByteBuffers returned in a network response into a byte array.

static Mono<byte[]> collectBytesInByteBufferStream(Flux<ByteBuffer> stream)

Collects ByteBuffers emitted by a Flux into a byte array.

static Mono<byte[]> collectBytesInByteBufferStream(Flux<ByteBuffer> stream, int sizeHint)

Collects ByteBuffers emitted by a Flux into a byte array.

static Flux<ByteBuffer> createRetriableDownloadFlux(Supplier<Flux<ByteBuffer>> downloadSupplier, BiFunction<Throwable,Long,Flux<ByteBuffer>> onDownloadErrorResume, int maxRetries)

Creates a Flux that is capable of resuming a download by applying retry logic when an error occurs.

static Flux<ByteBuffer> createRetriableDownloadFlux(Supplier<Flux<ByteBuffer>> downloadSupplier, BiFunction<Throwable,Long,Flux<ByteBuffer>> onDownloadErrorResume, int maxRetries, long position)

Creates a Flux that is capable of resuming a download by applying retry logic when an error occurs.

static Flux<ByteBuffer> createRetriableDownloadFlux(Supplier<Flux<ByteBuffer>> downloadSupplier, BiFunction<Throwable,Long,Flux<ByteBuffer>> onDownloadErrorResume, RetryOptions retryOptions, long position)

Creates a Flux that is capable of resuming a download by applying retry logic when an error occurs.

static Flux<T> fluxContext(Function<Context,Flux<T>> serviceCall)

This method converts the incoming deferContextual from Reactor Context to Azure Context and calls the given lambda function with this context and returns a collection of type T

static Flux<T> fluxError(ClientLogger logger, RuntimeException ex)

Propagates a RuntimeException through the error channel of Flux.

static boolean isFluxByteBuffer(Type entityType)

Checks if a type is Flux<ByteBuffer>.

static Mono<T> monoError(ClientLogger logger, RuntimeException ex)

Propagates a RuntimeException through the error channel of Mono.

static Mono<T> monoError(LoggingEventBuilder logBuilder, RuntimeException ex)

Propagates a RuntimeException through the error channel of Mono.

static PagedFlux<T> pagedFluxError(ClientLogger logger, RuntimeException ex)

Propagates a RuntimeException through the error channel of PagedFlux<T>.

static Flux<ByteBuffer> readFile(AsynchronousFileChannel fileChannel)

Creates a Flux from an AsynchronousFileChannel which reads the entire file.

static Flux<ByteBuffer> readFile(AsynchronousFileChannel fileChannel, int chunkSize, long offset, long length)

Creates a Flux from an AsynchronousFileChannel which reads part of a file into chunks of the given size.

static Flux<ByteBuffer> readFile(AsynchronousFileChannel fileChannel, long offset, long length)

Creates a Flux from an AsynchronousFileChannel which reads part of a file.

static Flux<ByteBuffer> toFluxByteBuffer(InputStream inputStream)

Converts an InputStream into a Flux of ByteBuffer using a chunk size of 4096.

static Flux<ByteBuffer> toFluxByteBuffer(InputStream inputStream, int chunkSize)

Converts an InputStream into a Flux of ByteBuffer.

static Mono<T> toMono(Response<T> response)

Converts the incoming content to Mono.

static Context toReactorContext(Context context)

Converts an Azure context to Reactor context.

static Mono<T> withContext(Function<Context,Mono<T>> serviceCall)

This method converts the incoming deferContextual from Reactor Context to Azure Context and calls the given lambda function with this context and returns a single entity of type T

static Mono<T> withContext(Function<Context,Mono<T>> serviceCall, Map<String,String> contextAttributes)

This method converts the incoming deferContextual from Reactor Context to Azure Context, adds the specified context attributes and calls the given lambda function with this context and returns a single entity of type T

static Mono<Void> writeFile(Flux<ByteBuffer> content, AsynchronousFileChannel outFile)

Writes the ByteBuffers emitted by a Flux of ByteBuffer to an AsynchronousFileChannel.

static Mono<Void> writeFile(Flux<ByteBuffer> content, AsynchronousFileChannel outFile, long position)

Writes the ByteBuffers emitted by a Flux of ByteBuffer to an AsynchronousFileChannel starting at the given position in the file.

static Mono<Void> writeToAsynchronousByteChannel(Flux<ByteBuffer> content, AsynchronousByteChannel channel)

Writes the ByteBuffers emitted by a Flux of ByteBuffer to an AsynchronousByteChannel.

static Mono<Void> writeToOutputStream(Flux<ByteBuffer> content, OutputStream stream)

Writes the ByteBuffers emitted by a Flux of ByteBuffer to an OutputStream.

static Mono<Void> writeToWritableByteChannel(Flux<ByteBuffer> content, WritableByteChannel channel)

Writes the ByteBuffers emitted by a Flux of ByteBuffer to an WritableByteChannel.

Methods inherited from java.lang.Object

Method Details

addProgressReporting

public static Flux addProgressReporting(Flux flux, ProgressReporter progressReporter)

Adds progress reporting to the provided Flux of ByteBuffer.

Each ByteBuffer that's emitted from the Flux will report ByteBuffer#remaining().

When Flux is resubscribed the progress is reset. If the flux is not replayable, resubscribing can result in empty or partial data then progress reporting might not be accurate.

If ProgressReporter is not provided, i.e. is null, then this method returns unmodified Flux.

Parameters:

flux - A Flux to report progress on.
progressReporter - Optional ProgressReporter.

Returns:

A Flux that reports progress, or original Flux if ProgressReporter is not provided.

byteBufferToArray

public static byte[] byteBufferToArray(ByteBuffer byteBuffer)

Gets the content of the provided ByteBuffer as a byte array. This method will create a new byte array even if the ByteBuffer can have optionally backing array.

Parameters:

byteBuffer - the byte buffer

Returns:

the byte array

collectBytesFromNetworkResponse

public static Mono collectBytesFromNetworkResponse(Flux stream, HttpHeaders headers)

Collects ByteBuffers returned in a network response into a byte array.

The headers are inspected for containing an Content-Length which determines if a size hinted collection, collectBytesInByteBufferStream(Flux<ByteBuffer> stream, int sizeHint), or default collection, collectBytesInByteBufferStream(Flux<ByteBuffer> stream), will be used.

Parameters:

stream - A network response ByteBuffer stream.
headers - The HTTP headers of the response.

Returns:

A Mono which emits the collected network response ByteBuffers.

collectBytesInByteBufferStream

public static Mono collectBytesInByteBufferStream(Flux stream)

Collects ByteBuffers emitted by a Flux into a byte array.

Parameters:

stream - A stream which emits ByteBuffer instances.

Returns:

A Mono which emits the concatenation of all the ByteBuffer instances given by the source Flux.

collectBytesInByteBufferStream

public static Mono collectBytesInByteBufferStream(Flux stream, int sizeHint)

Collects ByteBuffers emitted by a Flux into a byte array.

Unlike collectBytesInByteBufferStream(Flux<ByteBuffer> stream), this method accepts a second parameter sizeHint. This size hint allows for optimizations when creating the initial buffer to reduce the number of times it needs to be resized while concatenating emitted ByteBuffers.

Parameters:

stream - A stream which emits ByteBuffer instances.
sizeHint - A hint about the expected stream size.

Returns:

A Mono which emits the concatenation of all the ByteBuffer instances given by the source Flux.

createRetriableDownloadFlux

public static Flux createRetriableDownloadFlux(Supplier> downloadSupplier, BiFunction> onDownloadErrorResume, int maxRetries)

Creates a Flux that is capable of resuming a download by applying retry logic when an error occurs.

Parameters:

downloadSupplier - Supplier of the initial download.
onDownloadErrorResume - BiFunction of Throwable and Long which is used to resume downloading when an error occurs.
maxRetries - The maximum number of times a download can be resumed when an error occurs.

Returns:

A Flux that downloads reliably.

createRetriableDownloadFlux

public static Flux createRetriableDownloadFlux(Supplier> downloadSupplier, BiFunction> onDownloadErrorResume, int maxRetries, long position)

Creates a Flux that is capable of resuming a download by applying retry logic when an error occurs.

Parameters:

downloadSupplier - Supplier of the initial download.
onDownloadErrorResume - BiFunction of Throwable and Long which is used to resume downloading when an error occurs.
maxRetries - The maximum number of times a download can be resumed when an error occurs.
position - The initial offset for the download.

Returns:

A Flux that downloads reliably.

createRetriableDownloadFlux

public static Flux createRetriableDownloadFlux(Supplier> downloadSupplier, BiFunction> onDownloadErrorResume, RetryOptions retryOptions, long position)

Creates a Flux that is capable of resuming a download by applying retry logic when an error occurs.

Parameters:

downloadSupplier - Supplier of the initial download.
onDownloadErrorResume - BiFunction of Throwable and Long which is used to resume downloading when an error occurs.
retryOptions - The options for retrying.
position - The initial offset for the download.

Returns:

A Flux that downloads reliably.

fluxContext

public static Flux fluxContext(Function> serviceCall)

This method converts the incoming deferContextual from Reactor Context to Azure Context and calls the given lambda function with this context and returns a collection of type T

If the reactor context is empty, NONE will be used to call the lambda function

Code samples

Java
String prefix = "Hello, ";
 Flux<String> response = FluxUtil
     .fluxContext(context -> serviceCallReturnsCollection(prefix, context));

Parameters:

serviceCall - The lambda function that makes the service call into which the context will be passed

Returns:

The response from service call

fluxError

public static Flux fluxError(ClientLogger logger, RuntimeException ex)

Propagates a RuntimeException through the error channel of Flux.

Parameters:

logger - The ClientLogger to log the exception.
ex - The RuntimeException.

Returns:

A Flux that terminates with error wrapping the RuntimeException.

isFluxByteBuffer

public static boolean isFluxByteBuffer(Type entityType)

Checks if a type is Flux.

Parameters:

entityType - the type to check

Returns:

whether the type represents a Flux that emits ByteBuffer

monoError

public static Mono monoError(ClientLogger logger, RuntimeException ex)

Propagates a RuntimeException through the error channel of Mono.

Parameters:

logger - The ClientLogger to log the exception.
ex - The RuntimeException.

Returns:

A Mono that terminates with error wrapping the RuntimeException.

monoError

public static Mono monoError(LoggingEventBuilder logBuilder, RuntimeException ex)

Propagates a RuntimeException through the error channel of Mono.

Parameters:

logBuilder - The LoggingEventBuilder with context to log the exception.
ex - The RuntimeException.

Returns:

A Mono that terminates with error wrapping the RuntimeException.

pagedFluxError

public static PagedFlux pagedFluxError(ClientLogger logger, RuntimeException ex)

Propagates a RuntimeException through the error channel of PagedFlux<T>.

Parameters:

logger - The ClientLogger to log the exception.
ex - The RuntimeException.

Returns:

A PagedFlux<T> that terminates with error wrapping the RuntimeException.

readFile

public static Flux readFile(AsynchronousFileChannel fileChannel)

Creates a Flux from an AsynchronousFileChannel which reads the entire file.

Parameters:

fileChannel - The file channel.

Returns:

The AsyncInputStream.

readFile

public static Flux readFile(AsynchronousFileChannel fileChannel, int chunkSize, long offset, long length)

Creates a Flux from an AsynchronousFileChannel which reads part of a file into chunks of the given size.

Parameters:

fileChannel - The file channel.
chunkSize - the size of file chunks to read.
offset - The offset in the file to begin reading.
length - The number of bytes to read from the file.

Returns:

the Flux.

readFile

public static Flux readFile(AsynchronousFileChannel fileChannel, long offset, long length)

Creates a Flux from an AsynchronousFileChannel which reads part of a file.

Parameters:

fileChannel - The file channel.
offset - The offset in the file to begin reading.
length - The number of bytes to read from the file.

Returns:

the Flux.

toFluxByteBuffer

public static Flux toFluxByteBuffer(InputStream inputStream)

Converts an InputStream into a Flux of ByteBuffer using a chunk size of 4096.

Given that InputStream is not guaranteed to be replayable the returned Flux should be considered non-replayable as well.

If the passed InputStream is null Flux#empty() will be returned.

Parameters:

inputStream - The InputStream to convert into a Flux.

Returns:

A Flux of ByteBuffers that contains the contents of the stream.

toFluxByteBuffer

public static Flux toFluxByteBuffer(InputStream inputStream, int chunkSize)

Converts an InputStream into a Flux of ByteBuffer.

Given that InputStream is not guaranteed to be replayable the returned Flux should be considered non-replayable as well.

If the passed InputStream is null Flux#empty() will be returned.

Parameters:

inputStream - The InputStream to convert into a Flux.
chunkSize - The requested size for each ByteBuffer.

Returns:

A Flux of ByteBuffers that contains the contents of the stream.

toMono

public static Mono toMono(Response response)

Converts the incoming content to Mono.

Parameters:

response - whose getValue() is to be converted

Returns:

The converted Mono

toReactorContext

public static Context toReactorContext(Context context)

Converts an Azure context to Reactor context. If the Azure context is null or empty, Context#empty() will be returned.

Parameters:

context - The Azure context.

Returns:

The Reactor context.

withContext

public static Mono withContext(Function> serviceCall)

This method converts the incoming deferContextual from Reactor Context to Azure Context and calls the given lambda function with this context and returns a single entity of type T

If the reactor context is empty, NONE will be used to call the lambda function

Code samples

Java
String prefix = "Hello, ";
 Mono<String> response = FluxUtil
     .withContext(context -> serviceCallReturnsSingle(prefix, context));

Parameters:

serviceCall - The lambda function that makes the service call into which azure context will be passed

Returns:

The response from service call

withContext

public static Mono withContext(Function> serviceCall, Map contextAttributes)

This method converts the incoming deferContextual from Reactor Context to Azure Context, adds the specified context attributes and calls the given lambda function with this context and returns a single entity of type T

If the reactor context is empty, NONE will be used to call the lambda function

Parameters:

serviceCall - serviceCall The lambda function that makes the service call into which azure context will be passed
contextAttributes - The map of attributes sent by the calling method to be set on Context.

Returns:

The response from service call

writeFile

public static Mono writeFile(Flux content, AsynchronousFileChannel outFile)

Writes the ByteBuffers emitted by a Flux of ByteBuffer to an AsynchronousFileChannel.

The outFile is not closed by this call, closing of the outFile is managed by the caller.

The response Mono will emit an error if content or outFile are null. Additionally, an error will be emitted if the outFile wasn't opened with the proper open options, such as StandardOpenOption#WRITE.

Parameters:

content - The Flux of ByteBuffer content.
outFile - The AsynchronousFileChannel.

Returns:

A Mono which emits a completion status once the Flux has been written to the AsynchronousFileChannel.

writeFile

public static Mono writeFile(Flux content, AsynchronousFileChannel outFile, long position)

Writes the ByteBuffers emitted by a Flux of ByteBuffer to an AsynchronousFileChannel starting at the given position in the file.

The outFile is not closed by this call, closing of the outFile is managed by the caller.

The response Mono will emit an error if content or outFile are null or position is less than 0. Additionally, an error will be emitted if the outFile wasn't opened with the proper open options, such as StandardOpenOption#WRITE.

Parameters:

content - The Flux of ByteBuffer content.
outFile - The AsynchronousFileChannel.
position - The position in the file to begin writing the content.

Returns:

A Mono which emits a completion status once the Flux has been written to the AsynchronousFileChannel.

writeToAsynchronousByteChannel

public static Mono writeToAsynchronousByteChannel(Flux content, AsynchronousByteChannel channel)

Writes the ByteBuffers emitted by a Flux of ByteBuffer to an AsynchronousByteChannel.

The channel is not closed by this call, closing of the channel is managed by the caller.

The response Mono will emit an error if content or channel are null.

Parameters:

content - The Flux of ByteBuffer content.
channel - The AsynchronousByteChannel.

Returns:

A Mono which emits a completion status once the Flux has been written to the AsynchronousByteChannel.

writeToOutputStream

public static Mono writeToOutputStream(Flux content, OutputStream stream)

Writes the ByteBuffers emitted by a Flux of ByteBuffer to an OutputStream.

The stream is not closed by this call, closing of the stream is managed by the caller.

The response Mono will emit an error if content or stream are null. Additionally, an error will be emitted if an exception occurs while writing the content to the stream.

Parameters:

content - The Flux of ByteBuffer content.
stream - The OutputStream being written into.

Returns:

A Mono which emits a completion status once the Flux has been written to the OutputStream, or an error status if writing fails.

writeToWritableByteChannel

public static Mono writeToWritableByteChannel(Flux content, WritableByteChannel channel)

Writes the ByteBuffers emitted by a Flux of ByteBuffer to an WritableByteChannel.

The channel is not closed by this call, closing of the channel is managed by the caller.

The response Mono will emit an error if content or channel are null.

Parameters:

content - The Flux of ByteBuffer content.
channel - The WritableByteChannel.

Returns:

A Mono which emits a completion status once the Flux has been written to the WritableByteChannel.

Applies to

Azure SDK for Java

Latest