次の方法で共有


イベント ハブ データ接続 (プレビュー)

Azure Event Hubs は、ビッグ データのストリーミング プラットフォームとなるイベント インジェスト サービスです。 Azure Synapse Data Explorer は、お客様が管理する Event Hubs からの継続的なインジェストを提供します。

イベント ハブのインジェスト パイプラインは、いくつかのステップで、Azure Synapse Data Explorer にイベントを転送します。 最初に、Azure portal でイベント ハブを作成します。 次に、特定の形式のデータが、指定されたインジェスト プロパティを使用して取り込まれるターゲット テーブルを Azure Synapse Data Explorer に作成します。 イベント ハブ接続はイベント ルーティングを認識している必要があります。 データは、イベント システム プロパティのマッピングに従って、選択したプロパティを使用して埋め込まれます。 イベント ハブへの接続を作成して、イベント ハブを作成し、イベントを送信します。 このプロセスは、Azure portalC# または Python によるプログラム、または Azure Resource Manager テンプレートを使用して管理できます。

Azure Synapse Data Explorer でのデータ インジェストに関する一般的な情報については、「Azure Synapse Data Explorer のデータ インジェスト概要」を参照してください。

データ形式

  • データは EventData オブジェクトの形式でイベント ハブから読み取られます。

  • サポートされる形式を確認してください。

    注意

    イベント ハブでは、.raw 形式はサポートされていません。

  • GZip 圧縮アルゴリズムを使用してデータを圧縮できます。 インジェスト プロパティCompression を指定します。

    • 圧縮形式 (Avro、Parquet、ORC) については、データ圧縮はサポートされません。
    • カスタム エンコードおよび埋め込みシステム プロパティは、圧縮データではサポートされていません。

インジェストのプロパティ

インジェスト プロパティは、インジェスト プロセス、データのルーティング先と処理方法を指示します。 EventData.Properties を使用して、イベント インジェストのインジェスト プロパティを指定できます。 以下のプロパティを設定できます。

プロパティ 説明
テーブル 既存のターゲット テーブルの名前 (大文字と小文字の区別あり)。 [Data Connection] ペインで設定された Table をオーバーライドします。
形式 データ形式。 [Data Connection] ペインで設定された Data format をオーバーライドします。
IngestionMappingReference 使用する既存のインジェスト マッピングの名前。 [Data Connection] ペインで設定された Column mapping をオーバーライドします。
圧縮 データ圧縮、None (既定)、または GZip 圧縮。
エンコード データ エンコード (既定値は UTF8)。 .NET でサポートされているエンコードのいずれかを指定できます。
タグ JSON 配列文字列として書式設定された、取り込まれたデータに関連付けられるタグの一覧。 タグを使用すると、パフォーマンスに影響します。

注意

データ接続の作成後にエンキューされたイベントのみが取り込まれたます。

イベント ルーティング

Azure Synapse Data Explorer クラスターへのイベント ハブ接続を設定するときに、ターゲット テーブルのプロパティ (テーブル名、データ形式、圧縮、およびマッピング) を指定します。 データの既定のルーティングは、static routing と呼ばれることもあります。 イベント プロパティを使用して、各イベントのターゲット テーブルのプロパティを指定することもできます。 EventData.Properties の指定に従って、接続でデータが動的にルーティングされ、このイベントの静的プロパティがオーバーライドされます。

次の例では、Event Hub の詳細を設定し、気象メトリック データをテーブル WeatherMetrics に送信します。 データは json 形式です。 mapping1 はテーブル WeatherMetrics で事前定義されています。

var eventHubNamespaceConnectionString=<connection_string>;
var eventHubName=<event_hub>;

// Create the data
var metric = new Metric { Timestamp = DateTime.UtcNow, MetricName = "Temperature", Value = 32 }; 
var data = JsonConvert.SerializeObject(metric);

// Create the event and add optional "dynamic routing" properties
var eventData = new EventData(Encoding.UTF8.GetBytes(data));
eventData.Properties.Add("Table", "WeatherMetrics");
eventData.Properties.Add("Format", "json");
eventData.Properties.Add("IngestionMappingReference", "mapping1");
eventData.Properties.Add("Tags", "['mydatatag']");

// Send events
var eventHubClient = EventHubClient.CreateFromConnectionString(eventHubNamespaceConnectionString, eventHubName);
eventHubClient.Send(eventData);
eventHubClient.Close();

イベント システム プロパティのマッピング

イベントがエンキューされるときに、Event Hubs サービスによって設定されたプロパティがシステム プロパティに格納されます。 Azure Synapse Data Explorer Event Hub 接続により、選択したプロパティがテーブルのデータ ランディングに埋め込まれます。

注意

  • システム プロパティは、json および表形式 (csvtsv など) でサポートされており、圧縮データではサポートされていません。 サポートされていない形式を使用してもデータは取り込まれますが、プロパティは無視されます。
  • 表形式データの場合、システム プロパティは単一レコードのイベント メッセージでのみサポートされます。
  • JSON データの場合、システム プロパティは複数レコードのイベント メッセージでもサポートされます。 このような場合、システム プロパティは、イベント メッセージの最初のレコードにのみ追加されます。
  • csv マッピングの場合、csvの表に示された順序でプロパティがレコードの先頭に追加されます。
  • json マッピングの場合、システム プロパティの表のプロパティ名に従ってプロパティが追加されます。

システム プロパティ

イベント ハブでは、次のシステム プロパティが公開されます。

プロパティ データ型 説明
x-opt-enqueued-time DATETIME イベントがエンキューされた UTC 時刻
x-opt-sequence-number long イベント ハブのパーティション ストリーム内のイベントの論理シーケンス番号
x-opt-offset string イベント ハブのパーティション ストリームからのイベントのオフセット。 このオフセット識別子は、イベント ハブ ストリームのパーティション内で一意です
x-opt-publisher string 発行元の名前 (発行元のエンドポイントにメッセージが送信された場合)
x-opt-partition-key string イベントが格納されている、対応するパーティションのパーティション キー

テーブルの [データ ソース] セクションで [イベント システムのプロパティ] を選択した場合は、テーブルのスキーマとマッピングにプロパティを含める必要があります。

スキーマ マッピングの例

テーブル スキーマ マッピングの例

データに 3 つの列 (TimespanMetric、および Value) が含まれており、含めるプロパティが x-opt-enqueued-time および x-opt-offset の場合は、次のコマンドを使用してテーブル スキーマを作成または変更します。

    .create-merge table TestTable (TimeStamp: datetime, Metric: string, Value: int, EventHubEnqueuedTime:datetime, EventHubOffset:string)

CSV マッピングの例

次のコマンドを実行して、レコードの先頭にデータを追加します。 序数値に注意してください。

    .create table TestTable ingestion csv mapping "CsvMapping1"
    '['
    '   { "column" : "Timespan", "Properties":{"Ordinal":"2"}},'
    '   { "column" : "Metric", "Properties":{"Ordinal":"3"}},'
    '   { "column" : "Value", "Properties":{"Ordinal":"4"}},'
    '   { "column" : "EventHubEnqueuedTime", "Properties":{"Ordinal":"0"}},'
    '   { "column" : "EventHubOffset", "Properties":{"Ordinal":"1"}}'
    ']'

JSON マッピングの例

データは、システム プロパティのマッピングを使用して追加されます。 これらのコマンドを実行します。

    .create table TestTable ingestion json mapping "JsonMapping1"
    '['
    '    { "column" : "Timespan", "Properties":{"Path":"$.timestamp"}},'
    '    { "column" : "Metric", "Properties":{"Path":"$.metric"}},'
    '    { "column" : "Value", "Properties":{"Path":"$.value"}},'
    '    { "column" : "EventHubEnqueuedTime", "Properties":{"Path":"$.x-opt-enqueued-time"}},'
    '    { "column" : "EventHubOffset", "Properties":{"Path":"$.x-opt-offset"}}'
    ']'

イベント ハブの接続

Note

最適なパフォーマンスを得るには、Azure Synapse Data Explorer クラスターと同じリージョンにすべてのリソースを作成します。

イベント ハブを作成する

まだ用意していない場合は、Event Hub を作成します。 イベント ハブへの接続は、Azure portalC# または Python によるプログラム、または Azure Resource Manager テンプレートを使用して管理できます。

注意

  • パーティション数は変更できないため、パーティション数を設定する際には、長期的な規模を考慮する必要があります。
  • コンシューマー グループは、コンシューマーごとに一意である "必要があります"。 Azure Synapse Data Explorer 接続専用のコンシューマー グループを作成します。

送信イベント

データを生成して Event Hub に送信するサンプル アプリをご覧ください。

サンプル データの生成方法の例については、「イベント ハブから Azure Synapse Data Explorer にデータを取り込む」を参照してください。

geo ディザスター リカバリー ソリューションの設定

イベント ハブには、geo ディザスター リカバリー ソリューションが用意されています。 Azure Synapse Data Explorer では Alias イベント ハブ名前空間がサポートされていません。 ソリューションに geo ディザスター リカバリーを実装するには、2 つのイベント ハブ データ接続を作成します。1 つはプライマリ名前空間用で、もう 1 つはセカンダリ名前空間用です。 Azure Synapse Data Explorer は、両方のイベント ハブ接続をリッスンします。

注意

プライマリ名前空間からセカンダリ名前空間へのフェールオーバーを実装するのは、ユーザーの責任です。

次の手順