次の方法で共有


イベント レプリケーション タスクのパターン

フェデレーションの概要レプリケーター関数の概要では、レプリケーション タスクの根拠と基本的な要素について説明しています。この記事に進む前に、それらを理解しておくことをお勧めします。

この記事では、概要セクションで強調表示されているいくつかのパターンの実装ガイダンスについて詳しく説明します。

レプリケーション

レプリケーション パターンは、あるイベント ハブから次のイベント ハブに、または Event Hub から Service Bus キューなどの他の宛先にイベントをコピーします。 イベントは、イベント ペイロードに変更を加えずに転送されます。

このパターンの実装は、 Event Hubs と Service Bus サンプル間のイベント レプリケーションと Event Hubs 間のイベント レプリケーション と、Apache Kafka ブローカーから Event Hubs にデータをレプリケートする特定のケースに関する Apache Kafka MirrorMaker と Event Hubs の使用 に関するチュートリアルで説明されています。

ストリームと順序の保持

Azure Functions または Azure Stream Analytics を使用したレプリケーションでは、ソース イベント ハブの正確な 1 対 1 の複製をターゲット イベント ハブに確実に作成することを目的としていませんが、アプリケーションが必要とするイベントの相対的な順序を保持することに重点を置いています。 アプリケーションは、関連するイベントを同じパーティション キーでグループ化することでこれを通信し、 Event Hubs は同じパーティション キーを持つメッセージを同じパーティション内に順番に配置します

Von Bedeutung

"オフセット" 情報はイベント ハブごとに一意であり、同じイベントのオフセットは Event Hub インスタンス間で異なります。 コピーしたイベント ストリーム内の位置を見つけるには、時間ベースのオフセットを使用し、 伝達されたサービス割り当てメタデータを参照します。

時間ベースのオフセットは、特定の時点でレシーバーを開始します。

  • EventPosition.FromStart() - 保持されているすべてのデータを再度読み取ります。
  • EventPosition.FromEnd() - 接続時からすべての新しいデータを読み取る。
  • EventPosition.FromEnqueuedTime(dateTime) - 指定された日付と時刻から始まるすべてのデータ。

EventProcessor では、EventProcessorOptions の InitialOffsetProvider を使用して位置を設定します。 他の受信側 API では、位置はコンストラクターを介して渡されます。

Azure Functions ベースのガイダンスで使用される サンプルとして提供 される事前構築済みのレプリケーション関数ヘルパーを使用すると、ソース パーティションから取得したのと同じパーティション キーを持つイベント ストリームが、元のストリーム内のバッチとして、同じパーティション キーを持つターゲット Event Hub に送信されます。

ソースとターゲットの Event Hub のパーティション数が同じ場合、ターゲット内のすべてのストリームはソースと同じパーティションにマップされます。 パーティション数が異なる場合(以下で説明するパターンの一部で重要)、マッピングは異なりますが、ストリームは常に一緒に順番に保持されます。

異なるストリームに属するイベント、またはターゲット パーティションにパーティション キーがない独立したイベントの相対的な順序は、ソース パーティションとは常に異なる場合があります。

サービス割り当てメタデータ

ソース Event Hub から取得したイベントのサービス割り当てメタデータ、元のエンキュー時間、シーケンス番号、オフセットは、ターゲット イベント ハブ内の新しいサービス割り当て値に置き換えられますが、 ヘルパー関数、サンプルで提供されているレプリケーション タスクでは、元の値はユーザー プロパティ ( repl-enqueue-time (ISO8601文字列)、 repl-sequencerepl-offsetに保持されます。

これらのプロパティは string 型であり、それぞれの元のプロパティの文字列化された値が含まれています。 イベントが複数回転送される場合、即時ソースのサービス割り当てメタデータが、セミコロンで区切られた値を使用して、既存のプロパティに追加されます。

フェールオーバー

ディザスター リカバリーの目的でレプリケーションを使用している場合、Event Hubs サービス内のリージョンの可用性イベントから保護する場合、またはネットワークの中断から保護するために、このような障害シナリオでは、あるイベント ハブから次のイベント ハブへのフェールオーバーを実行する必要があり、プロデューサーやコンシューマーにセカンダリ エンドポイントの使用を指示する必要があります。

すべてのフェールオーバー シナリオでは、名前空間の必要な要素が構造的に同じであることを前提としています。つまり、Event Hubs とコンシューマー グループの名前は同じであり、共有アクセス署名規則やロールベースのアクセス制御規則も同じ方法で設定されます。 名前空間の移動とクリーンアップ手順の省略 に関するガイダンス に従って、セカンダリ名前空間を作成 (および更新) できます。

プロデューサーとコンシューマーに切り替えを強制するには、どの名前空間を使用するかに関する情報を、簡単にアクセスして更新できる場所の検索に使用できるようにする必要があります。 プロデューサーまたはコンシューマーで頻繁または永続的なエラーが発生した場合は、その場所を調べ、構成を調整する必要があります。 その構成を共有する方法は多数ありますが、DNS とファイル共有の 2 つを示します。

DNS ベースのフェールオーバー構成

1 つの候補となる方法は、制御する DNS 内の DNS SRV レコードに情報を保持し、それぞれのイベント ハブ エンドポイントを指し示す方法です。

Von Bedeutung

Event Hubs では、エンドポイントに CNAME レコードを直接エイリアス化することはできません。つまり、エンドポイント アドレスの回復性のある参照メカニズムとして DNS を使用し、IP アドレス情報を直接解決しないようにします。

ドメイン example.com を所有し、アプリケーションの場合はゾーン test.example.comを所有しているとします。 2 つの代替 Event Hub に対して、さらに 2 つの入れ子になったゾーンと、それぞれに SRV レコードを作成します。

SRV レコードは、次の一般的な規則に従い、 _azure_eventhubs._amqp でプレフィックス付きで、2 つのエンドポイント レコードを保持します。1 つはポート 5671 の AMQP-over-TLS 用、1 つはポート 443 の AMQP-over-WebSocket 用で、どちらもゾーンに対応する名前空間の Event Hubs エンドポイントを指しています。

ゾーン SRV レコード
eh1.test.example.com _azure_servicebus._amqp.eh1.test.example.com
1 1 5671 eh1-test-example-com.servicebus.windows.net
2 2 443 eh1-test-example-com.servicebus.windows.net
eh2.test.example.com _azure_servicebus._amqp.eh2.test.example.com
1 1 5671 eh2-test-example-com.servicebus.windows.net
2 2 443 eh2-test-example-com.servicebus.windows.net

アプリケーションのゾーンで、プライマリ イベント ハブに対応する下位ゾーンを指す CNAME エントリを作成します。

CNAME レコード 別名
eventhub.test.example.com eh1.test.example.com

CNAME レコードと SRV レコードを明示的に照会できる DNS クライアントを使用すると (Java と .NET の組み込みクライアントでは、IP アドレスに対する名前の単純な解決のみが許可されます)、目的のエンドポイントを解決できます。 たとえば、DnsClient.NET では、ルックアップ関数は次のようになります。

static string GetEventHubName(string aliasName)
{
    const string SrvRecordPrefix = "_azure_eventhub._amqp.";
    LookupClient lookup = new LookupClient();

    return (from CNameRecord alias in (lookup.Query(aliasName, QueryType.CNAME).Answers)
            from SrvRecord srv in lookup.Query(SrvRecordPrefix + alias.CanonicalName, QueryType.SRV).Answers
            where srv.Port == 5671
            select srv.Target).FirstOrDefault()?.Value.TrimEnd('.');
}

この関数は、上に示すように、現在 CNAME にエイリアスが設定されているゾーンのポート 5671 に登録されているターゲット ホスト名を返します。

フェールオーバーを実行するには、CNAME レコードを編集し、代替ゾーンをポイントする必要があります。

DNS (特に Azure DNS) を使用する利点は、Azure DNS 情報がグローバルにレプリケートされるため、単一リージョンの停止に対する回復性があるということです。

この手順は 、Event Hubs Geo-DR のしくみに似ていますが、完全に独自の制御下にあり、アクティブ/アクティブなシナリオでも機能します。

ファイル共有ベースのフェールオーバー構成

エンドポイント情報を共有するために DNS を使用する最も簡単な代替方法は、プライマリ エンドポイントの名前をプレーンテキスト ファイルに配置し、障害に対して堅牢で更新を許可するインフラストラクチャからファイルを提供することです。

グローバル可用性とコンテンツ レプリケーションを使用して高可用性 Web サイト インフラストラクチャを既に実行している場合は、そこにそのようなファイルを追加し、スイッチが必要な場合はファイルを再発行します。

注意事項

この方法でのみエンドポイント名を発行する必要があります。シークレットを含む完全な接続文字列は発行しないでください。

コンシューマーのフェールオーバーに関する追加の考慮事項

Event Hub コンシューマーの場合、フェールオーバー戦略に関するその他の考慮事項は、イベント プロセッサのニーズによって異なります。

バックアップ データからデータベースを含むシステムを再構築する必要がある障害が発生し、データベースが Event Hub で保持されているイベントから直接または中間処理を介して供給される場合は、バックアップを復元し、元のシステムが破棄された時点からではなく、データベース バックアップが作成された時点からシステムへのイベントの再生を開始します。

障害がシステムのスライスにのみ影響を与える場合、または実際に到達不能になった単一のイベント ハブのみである場合は、処理が中断されたのと同じ位置からイベントの処理を続行する必要があります。

いずれかのシナリオを実現し、それぞれの Azure SDK のイベント プロセッサを使用するには、 新しいチェックポイント ストアを作成 し、処理を再開する タイムスタンプ に基づいて初期パーティション位置を指定します。

切り替え先の Event Hub のチェックポイント ストアに引き続きアクセスできる場合、前述 の伝達されたメタデータ は、既に処理されたイベントをスキップし、最後に中断した場所から正確に再開するのに役立ちます。

統合

マージ パターンには、1 つのターゲットを指す 1 つ以上のレプリケーション タスクがあり、通常のプロデューサーも同じターゲットにイベントを送信する場合があります。

これらのパターンのバリエーションは次のとおりです。

  • 2 つ以上のレプリケーション関数が、個別のソースからイベントを同時に取得し、それらを同じターゲットに送信します。
  • ターゲットがプロデューサーによって直接使用されている間に、ソースからイベントを取得するもう 1 つのレプリケーション関数。
  • 前のパターンでは、2 つ以上の Event Hubs 間でミラー化されているため、イベントが生成される場所に関係なく、それらの Event Hubs には同じストリームが含まれます。

最初の 2 つのパターンのバリエーションは単純であり、単純なレプリケーション タスクと異なるわけではありません。

最後のシナリオでは、既にレプリケートされたイベントが再びレプリケートされないように除外する必要があります。 この手法を示し、 EventHubToEventHubMerge サンプルで説明します。

編集者

エディター パターンは レプリケーション パターンに基づいていますが、メッセージは転送される前に変更されます。

このような変更の例を次に示します。

  • "コード変換" - ソースから到着したイベント コンテンツ ("本文" または "ペイロード" ともいいます) は Apache Avro 形式または何らかの独自のシリアル化形式を使ってエンコードされているのに、ターゲットを所有しているシステムではコンテンツが JSON でエンコードされていることが想定されている場合、コード変換レプリケーション タスクでは、まずペイロードを Apache Avro からメモリ内オブジェクト グラフに逆シリアル化してから、そのグラフを転送されるイベント用に JSON 形式にシリアル化します。 コード変換には、 コンテンツの圧縮 と展開のタスクも含まれます。
  • 変換 - 構造化データを含むイベントでは、ダウンストリーム コンシューマーによる使用を容易にするために、そのデータの再整形が必要になる場合があります。 これには、入れ子になった構造体のフラット化、余分なデータ要素の排除、特定のスキーマに正確に適合するようにペイロードを再整形するなどの作業が含まれる場合があります。
  • バッチ処理 - イベントは、ソースからバッチ (1 回の転送で複数のイベント) で受信できますが、ターゲットに 1 回で転送するか、またはその逆に転送する必要があります。 したがって、1 つのタスクは、1 つの入力イベント転送に基づいて複数のイベントを転送したり、一緒に転送される一連のイベントを集計したりできます。
  • 検証 - 外部ソースからのイベント データは、多くの場合、転送される前に、一連のルールに準拠しているかどうかを確認する必要があります。 規則は、スキーマまたはコードを使用して表現できます。 準拠していないと検出されたイベントは、ログに記載されている問題で削除される場合や、特別なターゲット宛先に転送されてさらに処理される可能性があります。
  • エンリッチメント - 一部のソースから送信されるイベント データは、ターゲット システムで使用できるようにするために、追加のコンテキストを持つエンリッチメントが必要になる場合があります。 これには、参照データを検索し、そのデータをイベントに埋め込んだり、レプリケーション タスクに認識されているがイベントに含まれていないソースに関する情報を追加したりする必要があります。
  • フィルター処理 - ソースから到着する一部のイベントは、いくつかのルールに基づいてターゲットから保留する必要がある場合があります。 フィルターはルールに対してイベントをテストし、イベントがルールと一致しない場合はイベントを削除します。 特定の条件を観察して重複するイベントを除外し、同じ値を持つ後続のイベントを削除することは、フィルター処理の一種です。
  • 暗号化 - レプリケーション タスクでは、ソースから到着するコンテンツの暗号化を解除したり、ターゲットに転送されたコンテンツを暗号化したり、イベントで送信された署名に関連するコンテンツとメタデータの整合性を検証したり、そのような署名を添付したりする必要がある場合があります。
  • 構成証明 - レプリケーション タスクは、デジタル署名によって保護されている可能性があるメタデータを、イベントが特定のチャネルまたは特定の時刻に受信されたことを証明するイベントにアタッチできます。
  • チェーン - レプリケーション タスクは、ストリームの整合性が保護され、不足しているイベントを検出できるように、イベントのストリームに署名を適用できます。

一般に、変換、バッチ処理、エンリッチメント パターンは、 Azure Stream Analytics ジョブで最適に実装されます。

これらのすべてのパターンは、Azure Functions を使用して、イベントを取得するための Event Hubs トリガー と、イベントを配信するための Event Hub 出力バインド を使用して実装できます。

経路選択

ルーティング パターンは レプリケーション パターンに基づいていますが、1 つのソースと 1 つのターゲットを持つ代わりに、レプリケーション タスクには複数のターゲットがあります。C# で次に示します。

[FunctionName("EH2EH")]
public static async Task Run(
    [EventHubTrigger("source", Connection = "EventHubConnectionAppSetting")] EventData[] events,
    [EventHub("dest1", Connection = "EventHubConnectionAppSetting")] EventHubClient output1,
    [EventHub("dest2", Connection = "EventHubConnectionAppSetting")] EventHubClient output2,
    ILogger log)
{
    foreach (EventData eventData in events)
    {
        // send to output1 and/or output2 based on criteria
        EventHubReplicationTasks.ConditionalForwardToEventHub(input, output1, log, (eventData) => {
            return ( inputEvent.SystemProperties.SequenceNumber%2==0 ) ? inputEvent : null;
        });
        EventHubReplicationTasks.ConditionalForwardToEventHub(input, output2, log, (eventData) => {
            return ( inputEvent.SystemProperties.SequenceNumber%2!=0 ) ? inputEvent : null;
        });
    }
}

ルーティング関数は、メッセージ メタデータやメッセージ ペイロードを考慮し、送信先として使用可能な宛先の 1 つを選択します。

Azure Stream Analytics では、複数の出力を定義し、出力ごとにクエリを実行することで同じことができます。

select * into dest1Output from inputSource where Info = 1
select * into dest2Output from inputSource where Info = 2

ログのプロジェクション

ログ プロジェクション パターンでは、イベント ストリームがインデックス付きデータベースにフラット化され、イベントはデータベース内のレコードになります。 通常、イベントは同じコレクションまたはテーブルに追加され、Event Hub パーティション キーはレコードを一意にする主キーの一部になります。

ログ プロジェクションでは、イベント データの時系列ヒストリアンまたは最適化されたビューを生成できます。このビューでは、パーティション キーごとに最新のイベントのみが保持されます。 ターゲット データベースの形状は、最終的にはユーザーとアプリケーションのニーズに応じて行われます。 このパターンは、"イベント ソーシング" とも呼ばれます。

ヒント

Azure Stream Analytics で Azure SQL DatabaseAzure Cosmos DB にログ プロジェクションを簡単に作成できます。このオプションを使用することをお勧めします。

次の Azure 関数は、圧縮されたイベント ハブの内容を Azure Cosmos DB コレクションに投影します。

[FunctionName("Eh1ToCosmosDb1Json")]
[ExponentialBackoffRetry(-1, "00:00:05", "00:05:00")]
public static async Task Eh1ToCosmosDb1Json(
    [EventHubTrigger("eh1", ConsumerGroup = "Eh1ToCosmosDb1", Connection = "Eh1ToCosmosDb1-source-connection")] EventData[] input,
    [CosmosDB(databaseName: "SampleDb", collectionName: "foo", ConnectionStringSetting = "CosmosDBConnection")] IAsyncCollector<object> output,
    ILogger log)
{
    foreach (var ev in input)
    {
        if (!string.IsNullOrEmpty(ev.SystemProperties.PartitionKey))
        {
            var record = new
            {
                id = ev.SystemProperties.PartitionKey,
                data = JsonDocument.Parse(ev.Body),
                properties = ev.Properties
            };
            await output.AddAsync(record);
        }
    }
}

次のステップ