ChangeFeedProcessorBuilder Class

  • java.lang.Object
    • com.azure.cosmos.ChangeFeedProcessorBuilder

public class ChangeFeedProcessorBuilder

Helper class to build a ChangeFeedProcessor instance. Below is an example of building ChangeFeedProcessor for LatestVersion mode.

ChangeFeedProcessor changeFeedProcessor = new ChangeFeedProcessorBuilder()
     .hostName(hostName)
     .feedContainer(feedContainer)
     .leaseContainer(leaseContainer)
     .handleChanges(docs -> {
         for (JsonNode item : docs) {
             // Implementation for handling and processing of each JsonNode item goes here
         }
     })
     .buildChangeFeedProcessor();

Below is an example of building ChangeFeedProcessor with throughput control for handleChanges.

ThroughputControlGroupConfig throughputControlGroupConfig =
         new ThroughputControlGroupConfigBuilder()
                 .groupName("cfp")
                 .targetThroughput(300)
                 .priorityLevel(PriorityLevel.LOW)
                 .build();
 ChangeFeedProcessor changeFeedProcessor = new ChangeFeedProcessorBuilder()
         .hostName(hostName)
         .feedContainer(feedContainer)
         .leaseContainer(leaseContainer)
         .handleChanges(docs -> {
             for (JsonNode item : docs) {
                 // Implementation for handling and processing of each JsonNode item goes here
             }
         })
         .options(
                 new ChangeFeedProcessorOptions()
                         .setFeedPollThroughputControlConfig(throughputControlGroupConfig)
         )
         .buildChangeFeedProcessor();

Below is an example of building ChangeFeedProcessor with throughput control for LatestVersion mode.

ThroughputControlGroupConfig throughputControlGroupConfig =
         new ThroughputControlGroupConfigBuilder()
                 .groupName("cfp")
                 .targetThroughput(300)
                 .priorityLevel(PriorityLevel.LOW)
                 .build();
 ChangeFeedProcessor changeFeedProcessor = new ChangeFeedProcessorBuilder()
         .hostName(hostName)
         .feedContainer(feedContainer)
         .leaseContainer(leaseContainer)
         .handleLatestVersionChanges(changeFeedProcessorItems -> {
             for (ChangeFeedProcessorItem item : changeFeedProcessorItems) {
                 // Implementation for handling and processing of each change feed item goes here
             }
         })
         .options(
                 new ChangeFeedProcessorOptions()
                         .setFeedPollThroughputControlConfig(throughputControlGroupConfig)
         )
         .buildChangeFeedProcessor();

Below is an example of building ChangeFeedProcessor for AllVersionsAndDeletes mode.

ChangeFeedProcessor changeFeedProcessor = new ChangeFeedProcessorBuilder()
     .hostName(hostName)
     .feedContainer(feedContainer)
     .leaseContainer(leaseContainer)
     .handleAllVersionsAndDeletesChanges(docs -> {
         for (ChangeFeedProcessorItem item : docs) {
             // Implementation for handling and processing of each ChangeFeedProcessorItem item goes here
         }
     })
     .buildChangeFeedProcessor();

Below is an example of building ChangeFeedProcessor for AllVersionsAndDeletes mode when also wishing to process a ChangeFeedProcessorContext.

ChangeFeedProcessor changeFeedProcessor = new ChangeFeedProcessorBuilder()
     .hostName(hostName)
     .feedContainer(feedContainer)
     .leaseContainer(leaseContainer)
     .handleAllVersionsAndDeletesChanges((docs, context) -> {
         for (ChangeFeedProcessorItem item : docs) {
             // Implementation for handling and processing of each ChangeFeedProcessorItem item goes here
         }
         String leaseToken = context.getLeaseToken();
         // Handling of the lease token corresponding to a batch of change feed processor item goes here
     })
     .buildChangeFeedProcessor();

Constructor Summary

Constructor Description
ChangeFeedProcessorBuilder()

Instantiates a new Cosmos a new ChangeFeedProcessor builder.

Method Summary

Modifier and Type Method and Description
ChangeFeedProcessor buildChangeFeedProcessor()

Builds a new instance of the ChangeFeedProcessor with the specified configuration.

ChangeFeedProcessorBuilder feedContainer(CosmosAsyncContainer feedContainer)

Sets and existing CosmosAsyncContainer to be used to read from the monitored container.

ChangeFeedProcessorBuilder handleAllVersionsAndDeletesChanges(BiConsumer<List<ChangeFeedProcessorItem>,ChangeFeedProcessorContext> biConsumer)

Sets a BiConsumer function which will be called to process changes for AllVersionsAndDeletes change feed mode.

ChangeFeedProcessorBuilder handleAllVersionsAndDeletesChanges(Consumer<List<ChangeFeedProcessorItem>> consumer)

Sets a consumer function which will be called to process changes for AllVersionsAndDeletes change feed mode.

ChangeFeedProcessorBuilder handleChanges(Consumer<List<JsonNode>> consumer)

Sets a consumer function which will be called to process changes for LatestVersion change feed mode.

ChangeFeedProcessorBuilder handleLatestVersionChanges(Consumer<List<ChangeFeedProcessorItem>> consumer)

Sets a consumer function which will be called to process changes for LatestVersion change feed mode.

ChangeFeedProcessorBuilder hostName(String hostName)

Sets the host name.

ChangeFeedProcessorBuilder leaseContainer(CosmosAsyncContainer leaseContainer)

Sets an existing CosmosAsyncContainer to be used to read from the leases container.

ChangeFeedProcessorBuilder options(ChangeFeedProcessorOptions changeFeedProcessorOptions)

Sets the ChangeFeedProcessorOptions to be used.

Methods inherited from java.lang.Object

Constructor Details

ChangeFeedProcessorBuilder

public ChangeFeedProcessorBuilder()

Instantiates a new Cosmos a new ChangeFeedProcessor builder.

Method Details

buildChangeFeedProcessor

public ChangeFeedProcessor buildChangeFeedProcessor()

Builds a new instance of the ChangeFeedProcessor with the specified configuration.

Returns:

an instance of ChangeFeedProcessor.

feedContainer

public ChangeFeedProcessorBuilder feedContainer(CosmosAsyncContainer feedContainer)

Sets and existing CosmosAsyncContainer to be used to read from the monitored container.

Parameters:

feedContainer - the instance of CosmosAsyncContainer to be used.

Returns:

current Builder.

handleAllVersionsAndDeletesChanges

public ChangeFeedProcessorBuilder handleAllVersionsAndDeletesChanges(BiConsumer<List<ChangeFeedProcessorItem>,ChangeFeedProcessorContext> biConsumer)

Sets a BiConsumer function which will be called to process changes for AllVersionsAndDeletes change feed mode.

.handleAllVersionsAndDeletesChanges((docs, context) -> {
     for (ChangeFeedProcessorItem item : docs) {
         // Implementation for handling and processing of each ChangeFeedProcessorItem item goes here
     }
     String leaseToken = context.getLeaseToken();
     // Handling of the lease token corresponding to a batch of change feed processor item goes here
 })

Parameters:

biConsumer - the BiConsumer to call for handling the feeds and the ChangeFeedProcessorContext instance.

Returns:

current Builder.

handleAllVersionsAndDeletesChanges

public ChangeFeedProcessorBuilder handleAllVersionsAndDeletesChanges(Consumer<List<ChangeFeedProcessorItem>> consumer)

Sets a consumer function which will be called to process changes for AllVersionsAndDeletes change feed mode.

.handleAllVersionsAndDeletesChanges(docs -> {
     for (ChangeFeedProcessorItem item : docs) {
         // Implementation for handling and processing of each ChangeFeedProcessorItem item goes here
     }
 })

Parameters:

consumer - the Consumer to call for handling the feeds.

Returns:

current Builder.

handleChanges

public ChangeFeedProcessorBuilder handleChanges(Consumer<List<JsonNode>> consumer)

Sets a consumer function which will be called to process changes for LatestVersion change feed mode. Attention! This API is not merge proof, please use #handleLatestVersionChanges(Consumer) instead.

.handleChanges(docs -> {
     for (JsonNode item : docs) {
         // Implementation for handling and processing of each JsonNode item goes here
     }
 })

Parameters:

consumer - the Consumer to call for handling the feeds.

Returns:

current Builder.

handleLatestVersionChanges

public ChangeFeedProcessorBuilder handleLatestVersionChanges(Consumer<List<ChangeFeedProcessorItem>> consumer)

Sets a consumer function which will be called to process changes for LatestVersion change feed mode.

.handleLatestVersionChanges(changeFeedProcessorItems -> {
     for (ChangeFeedProcessorItem item : changeFeedProcessorItems) {
         // Implementation for handling and processing of each change feed item goes here
     }
 })

Parameters:

consumer - the Consumer to call for handling the feeds.

Returns:

current Builder.

hostName

public ChangeFeedProcessorBuilder hostName(String hostName)

Sets the host name.

Parameters:

hostName - the name to be used for the host. When using multiple hosts, each host must have a unique name.

Returns:

current Builder.

leaseContainer

public ChangeFeedProcessorBuilder leaseContainer(CosmosAsyncContainer leaseContainer)

Sets an existing CosmosAsyncContainer to be used to read from the leases container.

Parameters:

leaseContainer - the instance of CosmosAsyncContainer to use.

Returns:

current Builder.

options

public ChangeFeedProcessorBuilder options(ChangeFeedProcessorOptions changeFeedProcessorOptions)

Sets the ChangeFeedProcessorOptions to be used. Unless specifically set the default values that will be used are:

  • maximum items per page or FeedResponse: 100
  • lease renew interval: 17 seconds
  • lease acquire interval: 13 seconds
  • lease expiration interval: 60 seconds
  • feed poll delay: 5 seconds
  • maximum scale count: unlimited

Parameters:

changeFeedProcessorOptions - the change feed processor options to use.

Returns:

current Builder.

Applies to