プル モデルを使用して変更フィードを処理するために、FeedIterator
のインスタンスを作成します。 最初に FeedIterator
を作成するときには、必要な ChangeFeedStartFrom
値を指定する必要があります。この値は、変更の読み取りの開始位置と FeedRange
に使用する値の両方で構成されます。 FeedRange
はパーティション キー値の範囲であり、該当する特定の FeedIterator
を使用して変更フィードから読み取れる項目を指定します。 また、変更を処理するモード (最新バージョンまたはすべてのバージョンと削除) に必要な ChangeFeedMode
値を指定する必要があります。 ChangeFeedMode.LatestVersion
または ChangeFeedMode.AllVersionsAndDeletes
を使用して、変更フィードの読み取りに使用するモードを指定します。 すべてのバージョンと削除モードを使用する場合は、Now()
または特定の継続トークンの値から変更フィードの開始を選択する必要があります。
必要に応じて ChangeFeedRequestOptions
を指定し、PageSizeHint
を設定できます。 設定すると、ページあたりの受信アイテムの最大数がこのプロパティによって設定されます。 監視対象のコレクションでストアド プロシージャによって操作が実行されると、変更フィードから項目が読み取られるとき、トランザクション スコープが保持されます。 その結果、受信した項目数が指定した値よりも多くなり、同じトランザクションで変更された項目が 1 つのアトミック バッチの一部として返される可能性があります。
次に、エンティティ オブジェクト (この例では User
オブジェクト) を返す FeedIterator
を最新バージョンモードで取得する方法の例を示します。
FeedIterator<User> InteratorWithPOCOS = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
ヒント
バージョン 3.34.0
よりも前では、ChangeFeedMode.Incremental
を設定して最新バージョン モード使用できます。 Incremental
と LatestVersion
は両方とも、変更フィードの最新バージョン モードを参照します。また、どちらのモードを使用するアプリケーションも動作は同じになります。
すべてのバージョンと削除モードはプレビュー段階であり、プレビュー .NET SDK バージョン >= 3.32.0-preview
として使用できます。 すべてのバージョンと削除モードで FeedIterator
を取得し、User
オブジェクトを返す例を次に示します。
FeedIterator<ChangeFeedItem<User>> InteratorWithPOCOS = container.GetChangeFeedIterator<ChangeFeedItem<User>>(ChangeFeedStartFrom.Now(), ChangeFeedMode.AllVersionsAndDeletes);
ストリーム経由で変更フィードを使用する
両方の変更フィード モードの FeedIterator
には、2 つのオプションがあります。 エンティティ オブジェクトを返す例に加えて、Stream
のサポートを使用して応答を取得することもできます。 ストリームを使用すると、最初にデータを逆シリアル化しなくてもデータを読み取れるため、クライアント リソースを節約できます。
Stream
を返す FeedIterator
を最新バージョンモードで取得する方法の例を次に示します。
FeedIterator iteratorWithStreams = container.GetChangeFeedStreamIterator(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
コンテナー全体の変更を使用する
FeedIterator
に対する FeedRange
パラメーターを指定しない場合は、コンテナー全体の変更フィードを自分のペースで処理できます。 最新バージョンモードを使ってすべての変更の読み取りが現在の時刻から開始される例を次に示します。
FeedIterator<User> iteratorForTheEntireContainer = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Now(), ChangeFeedMode.LatestVersion);
while (iteratorForTheEntireContainer.HasMoreResults)
{
FeedResponse<User> response = await iteratorForTheEntireContainer.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
変更フィードは事実上、今後のすべての書き込みと更新を含む無限の項目の一覧であるため、HasMoreResults
の値は常に true
です。 変更フィードを読み取ろうとして、新しい変更がない場合は、NotModified
ステータスの応答が返されます。 上の例では、これは、変更を再確認する前に 5 秒間待機することによって処理されます。
パーティション キーの変更を使用する
特定のパーティション キーの変更のみを処理することが必要になる場合があります。 特定のパーティション キーの FeedIterator
を取得し、その変更をコンテナー全体の場合と同じ方法で処理することができます。
FeedIterator<User> iteratorForPartitionKey = container.GetChangeFeedIterator<User>(
ChangeFeedStartFrom.Beginning(FeedRange.FromPartitionKey(new PartitionKey("PartitionKeyValue")), ChangeFeedMode.LatestVersion));
while (iteratorForThePartitionKey.HasMoreResults)
{
FeedResponse<User> response = await iteratorForThePartitionKey.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
並列処理のために FeedRange を使用する
変更フィード プロセッサでは、作業は複数のコンシューマーに自動的に分散されます。 変更フィード プル モデルでは、FeedRange
を使用して、変更フィードの処理を並列化できます。 FeedRange
は、パーティション キー値の範囲を表します。
コンテナーの範囲の一覧を取得する方法の例を次に示します。
IReadOnlyList<FeedRange> ranges = await container.GetFeedRangesAsync();
コンテナーの FeedRange
の値の一覧を取得すると、物理パーティションごとに 1 つの FeedRange
が得られます。
FeedRange
を使用することで、FeedIterator
を作成して、複数のマシンやスレッドの変更フィードの処理を並列化できます。 前の例では、コンテナー全体または単一のパーティション キーに対応する 1 つの FeedIterator
を取得する方法を示しましたが、FeedRanges を使用して、変更フィードを並列処理できる複数の FeedIterator を取得することも可能です。
FeedRange を使用する場合は、FeedRange を取得して対象のマシンに配布するオーケストレーター プロセスが必要になります。 この配布は次のように行います。
FeedRange.ToJsonString
を使用して、この文字列値を配布する。 コンシューマーは、この値を FeedRange.FromJsonString
で使用できます。
- 配布がインプロセスの場合は、
FeedRange
オブジェクト参照を渡す。
並列で読み取りを行う 2 台の別個の仮定のマシンを使用して、コンテナーの変更フィードを最初から読み取る方法を示すサンプルを次に示します。
マシン 1:
FeedIterator<User> iteratorA = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[0]), ChangeFeedMode.LatestVersion);
while (iteratorA.HasMoreResults)
{
FeedResponse<User> response = await iteratorA.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
マシン 2:
FeedIterator<User> iteratorB = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[1]), ChangeFeedMode.LatestVersion);
while (iteratorB.HasMoreResults)
{
FeedResponse<User> response = await iteratorA.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
継続トークンを保存する
継続トークンを取得すれば、FeedIterator
の位置を保存できます。 継続トークンとは、FeedIterator が最後に処理した変更を追跡し、FeedIterator
が将来この位置から再開できるようにする文字列値のことです。 後続トークンを指定している場合、開始時間と初めから開始の値よりも優先されます。 次のコードでは、コンテナーの作成以降の変更フィードを読み取ります。 それ以上読み取るべき変更がなくなると、変更フィードの使用を後で再開できるように継続トークンが保持されます。
FeedIterator<User> iterator = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
string continuation = null;
while (iterator.HasMoreResults)
{
FeedResponse<User> response = await iterator.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
continuation = response.ContinuationToken;
// Stop the consumption since there are no new changes
break;
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
// Some time later when I want to check changes again
FeedIterator<User> iteratorThatResumesFromLastPoint = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.ContinuationToken(continuation), ChangeFeedMode.LatestVersion);
最新バージョン モードを使用している場合は、 Azure Cosmos DB コンテナーが存在する限り、FeedIterator
の継続トークンが期限切れになることはありません。 すべてのバージョンと削除モードを使用している場合は、継続的バックアップの保持期間内に変更が行われる限り、FeedIterator
の継続トークンは有効です。
プル モデルを使用して変更フィードを処理するために、Iterator<FeedResponse<JsonNode>> responseIterator
のインスタンスを作成します。 CosmosChangeFeedRequestOptions
を作成するときは、変更フィードの読み取りを開始する場所を指定し、使用する FeedRange
パラメーターを渡す必要があります。 FeedRange
はパーティション キー値の範囲であり、変更フィードから読み取る項目を指定します。
すべてのバージョンと削除モードで変更フィードを読み取る場合は、CosmosChangeFeedRequestOptions
を作成するときに allVersionsAndDeletes()
も指定する必要があります。 すべてのバージョンと削除モードでは、変更フィードの最初または時点からの処理はサポートされていません。 変更を今から処理するか、後続トークンから処理する必要があります。 すべてのバージョンと削除モードはプレビュー段階であり、Java SDK バージョン >= 4.42.0
で使用できます。
コンテナー全体の変更を使用する
FeedRange.forFullRange()
を指定すると、コンテナー全体の変更フィードを自分のペースで処理できます。 必要に応じて、byPage()
で値を指定できます。 設定すると、ページあたりの受信アイテムの最大数がこのプロパティによって設定されます。
最新バージョン モードで responseIterator
の値を取得する方法の例を次に示します。
CosmosChangeFeedRequestOptions options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(FeedRange.forFullRange());
Iterator<FeedResponse<JsonNode>> responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
すべてのバージョンと削除モードで responseIterator
を取得する方法の例を次に示します。
CosmosChangeFeedRequestOptions options = CosmosChangeFeedRequestOptions
.createForProcessingFromNow(FeedRange.forFullRange())
.allVersionsAndDeletes();
Iterator<FeedResponse<JsonNode>> responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
これで、処理を繰り返して結果を得ることができます。 変更フィードは事実上、今後のすべての書き込みと更新を含む無限の項目の一覧であるため、responseIterator.hasNext()
の値は常に true
です。 最初からすべての変更が読み取られる、最新バージョン モードの例を次に示します。 各繰り返しでは、すべてのイベントの処理後に継続トークンが保持されます。 変更フィードの最後に処理された位置から取得され、createForProcessingFromContinuation
を使用して処理されます。
int i = 0;
List<JsonNode> results;
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s)");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
i++;
if (i >= 5) {
// artificially breaking out of loop - not required in a real app
System.out.println("breaking....");
break;
}
}
パーティション キーの変更を使用する
特定のパーティション キーの変更のみを処理することが必要になる場合があります。 特定のパーティション キーの変更をコンテナー全体の場合と同じ方法で処理することができます。 最新バージョン モードを使用する例を次に示します。
options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(FeedRange.forLogicalPartition(new PartitionKey(partitionKey)));
responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
int pkIndex = 0;
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s) retrieved");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
pkIndex++;
if (pkIndex >= 5) {
// artificially breaking out of loop
System.out.println("breaking....");
break;
}
}
並列処理のために FeedRange を使用する
変更フィード プロセッサでは、作業は複数のコンシューマーに自動的に分散されます。 変更フィード プル モデルでは、FeedRange
を使用して、変更フィードの処理を並列化できます。 FeedRange
は、パーティション キー値の範囲を表します。
最新バージョンモードを使用してコンテナーの範囲の一覧を取得する方法の例を次に示します。
Mono<List<FeedRange>> feedranges = resources.container.getFeedRanges();
List<FeedRange> feedRangeList = feedranges.block();
コンテナーの FeedRange の一覧を取得すると、物理パーティションごとに 1 つの FeedRange
が得られます。
FeedRange
を使用して、複数のマシンやスレッドにわたって変更フィードの処理を並列化できます。 前の例では、コンテナー全体または単一のパーティション キーに対する変更を処理する方法を示しましたが、FeedRanges を使用して、変更フィードを並列処理することも可能です。
FeedRange を使用する場合は、FeedRange を取得して対象のマシンに配布するオーケストレーター プロセスが必要になります。 この配布は次のように行います。
FeedRange.toString()
を使用して、この文字列値を配布する。
- 配布がインプロセスの場合は、
FeedRange
オブジェクト参照を渡す。
最新バージョン モードを使用する例を次に示します。 ここでは、並列で読み取りを行う 2 台の別個の仮定のマシンを使用して、コンテナーの変更フィードを最初から読み取る方法を示します。
マシン 1:
FeedRange range1 = feedRangeList.get(0);
options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(range1);
int machine1index = 0;
responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s) retrieved");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
machine1index++;
if (machine1index >= 5) {
// artificially breaking out of loop - not required in a real app
System.out.println("breaking....");
break;
}
}
マシン 2:
FeedRange range2 = feedRangeList.get(1);
options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(range2);
responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
int machine2index = 0;
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s) retrieved");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
machine2index++;
if (machine2index >= 5) {
// artificially breaking out of loop - not required in a real app
System.out.println("breaking....");
break;
}
}
プル モデルを使用して変更フィードを処理するために、ChangeFeedPullModelIterator
のインスタンスを作成します。 最初に ChangeFeedPullModelIterator
を作成するときは、ChangeFeedIteratorOptions
内で必須の changeFeedStartFrom
値を指定する必要があります。これは、変更を読み取る開始位置と、変更をフェッチするリソース (パーティション キーまたは FeedRange) の両方で構成されます。
Note
changeFeedStartFrom
値が指定されていない場合、コンテナー全体の changefeed が Now() からフェッチされます。
現在、JS SDK でサポートされ、既定で選択されるのは最新バージョンのみです。
必要に応じて ChangeFeedIteratorOptions
で maxItemCount
を使うと、受信する項目数のページあたりの上限を設定できます。
次に、エンティティ オブジェクトを返す反復子を最新バージョン モードで取得する方法の例を示します。
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Now()
};
const iterator = container.items.getChangeFeedIterator(options);
コンテナー全体の変更を使用する
ChangeFeedStartFrom
内の FeedRange
または PartitionKey
パラメーターを指定しない場合は、コンテナー全体の変更フィードを自分のペースで処理できます。 すべての変更の読み取りが現在の時刻から開始される例を、次に示します。
async function waitFor(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning()
};
const iterator = container.items.getChangeFeedIterator(options);
let timeout = 0;
while(iterator.hasMoreResults) {
const response = await iterator.readNext();
if (response.statusCode === StatusCodes.NotModified) {
timeout = 5000;
}
else {
console.log("Result found", response.result);
timeout = 0;
}
await waitFor(timeout);
}
変更フィードは事実上、今後のすべての書き込みと更新を含む無限の項目の一覧であるため、hasMoreResults
の値は常に true
です。 変更フィードを読み取ろうとして、新しい変更がない場合は、NotModified
ステータスの応答が返されます。 上の例では、これは、変更を再確認する前に 5 秒間待機することによって処理されます。
パーティション キーの変更を使用する
特定のパーティション キーの変更のみを処理することが必要になる場合があります。 特定のパーティション キーの反復子を取得し、その変更をコンテナー全体の場合と同じ方法で処理することができます。
async function waitFor(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning("partitionKeyValue")
};
const iterator = container.items.getChangeFeedIterator(options);
let timeout = 0;
while(iterator.hasMoreResults) {
const response = await iterator.readNext();
if (response.statusCode === StatusCodes.NotModified) {
timeout = 5000;
}
else {
console.log("Result found", response.result);
timeout = 0;
}
await waitFor(timeout);
}
並列処理のために FeedRange を使用する
変更フィード プル モデルでは、FeedRange
を使用して、変更フィードの処理を並列化できます。 FeedRange
は、パーティション キー値の範囲を表します。
コンテナーの範囲の一覧を取得する方法の例を次に示します。
const ranges = await container.getFeedRanges();
コンテナーの FeedRange
の値の一覧を取得すると、物理パーティションごとに 1 つの FeedRange
が得られます。
FeedRange
を使うことで、反復子を作成し、複数のマシンやスレッドの変更フィードの処理を並列化できます。 前の例では、コンテナー全体または単一のパーティション キーに対応する 1 つの changefeed 反復子を取得する方法を示しましたが、FeedRanges を使って、変更フィードを並列処理できる複数の反復子を取得することもできます。
並列で読み取りを行う 2 台の別個の仮定のマシンを使用して、コンテナーの変更フィードを最初から読み取る方法を示すサンプルを次に示します。
マシン 1:
async function waitFor(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning(ranges[0])
};
const iterator = container.items.getChangeFeedIterator(options);
let timeout = 0;
while(iterator.hasMoreResults) {
const response = await iterator.readNext();
if (response.statusCode === StatusCodes.NotModified) {
timeout = 5000;
}
else {
console.log("Result found", response.result);
timeout = 0;
}
await waitFor(timeout);
}
マシン 2:
async function waitFor(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning(ranges[1])
};
const iterator = container.items.getChangeFeedIterator(options);
let timeout = 0;
while(iterator.hasMoreResults) {
const response = await iterator.readNext();
if (response.statusCode === StatusCodes.NotModified) {
timeout = 5000;
}
else {
console.log("Result found", response.result);
timeout = 0;
}
await waitFor(timeout);
}
継続トークンを保存する
継続トークンを取得すれば、反復子の位置を保存できます。 継続トークンとは、changefeed 反復子が最後に処理した変更を追跡し、反復子が将来この位置から再開できるようにする文字列値のことです。 後続トークンを指定している場合、開始時間と初めから開始の値よりも優先されます。 次のコードでは、コンテナーの作成以降の変更フィードを読み取ります。 それ以上読み取るべき変更がなくなると、変更フィードの使用を後で再開できるように継続トークンが保持されます。
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning()
};
const iterator = container.items.getChangeFeedIterator(options);
let timeout = 0;
let continuation = "";
while(iterator.hasMoreResults) {
const response = await iterator.readNext();
if (response.statusCode === StatusCodes.NotModified) {
continuation = response.continuationToken;
break;
}
else {
console.log("Result found", response.result);
}
}
// For checking any new changes using the continuation token
const continuationOptions = {
changeFeedStartFrom: ChangeFeedStartFrom(continuation)
}
const newIterator = container.items.getChangeFeedIterator(continuationOptions);
Azure Cosmos DB コンテナーが存在する限り、継続トークンは期限切れになりません。
AsyncIterator を使う
JavaScript 非同期反復子を使って changefeed をフェッチできます。 非同期反復子の使用例を次に示します。
async function waitFor(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning()
};
let timeout = 0;
for await(const result of container.items.getChangeFeedIterator(options).getAsyncIterator()) {
if (result.statusCode === StatusCodes.NotModified) {
timeout = 5000;
}
else {
console.log("Result found", result.result);
timeout = 0;
}
await waitFor(timeout);
}