ChangeFeedProcessorBuilder Class
- java.
lang. Object - com.
azure. cosmos. ChangeFeedProcessorBuilder
- com.
public class ChangeFeedProcessorBuilder
Helper class to build a ChangeFeedProcessor instance. Below is an example of building ChangeFeedProcessor for LatestVersion mode.
ChangeFeedProcessor changeFeedProcessor = new ChangeFeedProcessorBuilder()
.hostName(hostName)
.feedContainer(feedContainer)
.leaseContainer(leaseContainer)
.handleChanges(docs -> {
for (JsonNode item : docs) {
// Implementation for handling and processing of each JsonNode item goes here
}
})
.buildChangeFeedProcessor();
Below is an example of building ChangeFeedProcessor with throughput control for handleChanges.
ThroughputControlGroupConfig throughputControlGroupConfig =
new ThroughputControlGroupConfigBuilder()
.groupName("cfp")
.targetThroughput(300)
.priorityLevel(PriorityLevel.LOW)
.build();
ChangeFeedProcessor changeFeedProcessor = new ChangeFeedProcessorBuilder()
.hostName(hostName)
.feedContainer(feedContainer)
.leaseContainer(leaseContainer)
.handleChanges(docs -> {
for (JsonNode item : docs) {
// Implementation for handling and processing of each JsonNode item goes here
}
})
.options(
new ChangeFeedProcessorOptions()
.setFeedPollThroughputControlConfig(throughputControlGroupConfig)
)
.buildChangeFeedProcessor();
Below is an example of building ChangeFeedProcessor with throughput control for LatestVersion mode.
ThroughputControlGroupConfig throughputControlGroupConfig =
new ThroughputControlGroupConfigBuilder()
.groupName("cfp")
.targetThroughput(300)
.priorityLevel(PriorityLevel.LOW)
.build();
ChangeFeedProcessor changeFeedProcessor = new ChangeFeedProcessorBuilder()
.hostName(hostName)
.feedContainer(feedContainer)
.leaseContainer(leaseContainer)
.handleLatestVersionChanges(changeFeedProcessorItems -> {
for (ChangeFeedProcessorItem item : changeFeedProcessorItems) {
// Implementation for handling and processing of each change feed item goes here
}
})
.options(
new ChangeFeedProcessorOptions()
.setFeedPollThroughputControlConfig(throughputControlGroupConfig)
)
.buildChangeFeedProcessor();
Below is an example of building ChangeFeedProcessor for AllVersionsAndDeletes mode.
ChangeFeedProcessor changeFeedProcessor = new ChangeFeedProcessorBuilder()
.hostName(hostName)
.feedContainer(feedContainer)
.leaseContainer(leaseContainer)
.handleAllVersionsAndDeletesChanges(docs -> {
for (ChangeFeedProcessorItem item : docs) {
// Implementation for handling and processing of each ChangeFeedProcessorItem item goes here
}
})
.buildChangeFeedProcessor();
Below is an example of building ChangeFeedProcessor for AllVersionsAndDeletes mode when also wishing to process a ChangeFeedProcessorContext.
ChangeFeedProcessor changeFeedProcessor = new ChangeFeedProcessorBuilder()
.hostName(hostName)
.feedContainer(feedContainer)
.leaseContainer(leaseContainer)
.handleAllVersionsAndDeletesChanges((docs, context) -> {
for (ChangeFeedProcessorItem item : docs) {
// Implementation for handling and processing of each ChangeFeedProcessorItem item goes here
}
String leaseToken = context.getLeaseToken();
// Handling of the lease token corresponding to a batch of change feed processor item goes here
})
.buildChangeFeedProcessor();
Constructor Summary
| Constructor | Description |
|---|---|
| ChangeFeedProcessorBuilder() |
Instantiates a new Cosmos a new Change |
Method Summary
Methods inherited from java.lang.Object
Constructor Details
ChangeFeedProcessorBuilder
public ChangeFeedProcessorBuilder()
Instantiates a new Cosmos a new ChangeFeedProcessor builder.
Method Details
buildChangeFeedProcessor
public ChangeFeedProcessor buildChangeFeedProcessor()
Builds a new instance of the ChangeFeedProcessor with the specified configuration.
Returns:
feedContainer
public ChangeFeedProcessorBuilder feedContainer(CosmosAsyncContainer feedContainer)
Sets and existing CosmosAsyncContainer to be used to read from the monitored container.
Parameters:
Returns:
handleAllVersionsAndDeletesChanges
public ChangeFeedProcessorBuilder handleAllVersionsAndDeletesChanges(BiConsumer<List<ChangeFeedProcessorItem>,ChangeFeedProcessorContext> biConsumer)
Sets a BiConsumer function which will be called to process changes for AllVersionsAndDeletes change feed mode.
.handleAllVersionsAndDeletesChanges((docs, context) -> {
for (ChangeFeedProcessorItem item : docs) {
// Implementation for handling and processing of each ChangeFeedProcessorItem item goes here
}
String leaseToken = context.getLeaseToken();
// Handling of the lease token corresponding to a batch of change feed processor item goes here
})
Parameters:
Returns:
handleAllVersionsAndDeletesChanges
public ChangeFeedProcessorBuilder handleAllVersionsAndDeletesChanges(Consumer<List<ChangeFeedProcessorItem>> consumer)
Sets a consumer function which will be called to process changes for AllVersionsAndDeletes change feed mode.
.handleAllVersionsAndDeletesChanges(docs -> {
for (ChangeFeedProcessorItem item : docs) {
// Implementation for handling and processing of each ChangeFeedProcessorItem item goes here
}
})
Parameters:
Returns:
handleChanges
public ChangeFeedProcessorBuilder handleChanges(Consumer<List<JsonNode>> consumer)
Sets a consumer function which will be called to process changes for LatestVersion change feed mode. Attention! This API is not merge proof, please use #handleLatestVersionChanges(Consumer) instead.
.handleChanges(docs -> {
for (JsonNode item : docs) {
// Implementation for handling and processing of each JsonNode item goes here
}
})
Parameters:
Returns:
handleLatestVersionChanges
public ChangeFeedProcessorBuilder handleLatestVersionChanges(Consumer<List<ChangeFeedProcessorItem>> consumer)
Sets a consumer function which will be called to process changes for LatestVersion change feed mode.
.handleLatestVersionChanges(changeFeedProcessorItems -> {
for (ChangeFeedProcessorItem item : changeFeedProcessorItems) {
// Implementation for handling and processing of each change feed item goes here
}
})
Parameters:
Returns:
hostName
public ChangeFeedProcessorBuilder hostName(String hostName)
Sets the host name.
Parameters:
Returns:
leaseContainer
public ChangeFeedProcessorBuilder leaseContainer(CosmosAsyncContainer leaseContainer)
Sets an existing CosmosAsyncContainer to be used to read from the leases container.
Parameters:
Returns:
options
public ChangeFeedProcessorBuilder options(ChangeFeedProcessorOptions changeFeedProcessorOptions)
Sets the ChangeFeedProcessorOptions to be used. Unless specifically set the default values that will be used are:
- maximum items per page or FeedResponse: 100
- lease renew interval: 17 seconds
- lease acquire interval: 13 seconds
- lease expiration interval: 60 seconds
- feed poll delay: 5 seconds
- maximum scale count: unlimited
Parameters:
Returns: