次の方法で共有


Python を使用して Azure Synapse Data Explorer 用にイベント ハブ データ接続を作成する (プレビュー)

Azure Synapse Data Explorer は、ログと利用統計情報のための高速で拡張性に優れたデータ探索サービスです。 Azure Synapse Data Explorer では、BLOB コンテナーに書き込まれた Event Hubs、IoT Hub、BLOB からのインジェスト (データの読み込み) を提供します。

この記事では、Python を使用して Azure Synapse Data Explorer 用にイベント ハブ データ接続を作成します。

前提条件

  • Azure サブスクリプション。 無料の Azure アカウントを作成します。

  • Synapse Studio または Azure portal を使用して Data Explorer プールを作成します

  • Data Explorer データベースを作成します。

    1. Synapse Studio の左側のペインで、 [データ] を選択します。

    2. + (新しいリソースの追加) >[Data Explorer プール] を選択し、次の情報を使用します。

      設定 提案された値 説明
      プール名 contosodataexplorer 使用する Data Explorer プールの名前
      Name TestDatabase データベース名はクラスター内で一意である必要があります。
      既定のリテンション期間 365 クエリにデータを使用できることが保証される期間 (日数) です。 期間は、データが取り込まれた時点から測定されます。
      既定のキャッシュ期間 31 頻繁にクエリされるデータが、長期ストレージではなく SSD ストレージまたは RAM で利用できるように保持される期間 (日数) です。
    3. [作成] を選択してデータベースを作成します。 通常、作成にかかる時間は 1 分未満です。

テスト クラスターにテーブルを作成する

StormEvents.csv ファイル内のデータのスキーマと一致する、StormEvents という名前のテーブルを作成します。

ヒント

次のコード スニペットは、ほぼすべての呼び出しに対してクライアントのインスタンスを作成します。 これは、各スニペットを個別に実行できるようにするためです。 実稼働環境では、クライアント インスタンスは再入可能であり、必要な限り保持する必要があります。 複数のデータベースを使用する場合でも、URI ごとに 1 つのクライアントインスタンスで十分です (データベースはコマンド レベルで指定できます)。

var databaseName = "<DatabaseName>";
var table = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableCreateCommand(
            table,
            new[]
            {
                Tuple.Create("StartTime", "System.DateTime"),
                Tuple.Create("EndTime", "System.DateTime"),
                Tuple.Create("EpisodeId", "System.Int32"),
                Tuple.Create("EventId", "System.Int32"),
                Tuple.Create("State", "System.String"),
                Tuple.Create("EventType", "System.String"),
                Tuple.Create("InjuriesDirect", "System.Int32"),
                Tuple.Create("InjuriesIndirect", "System.Int32"),
                Tuple.Create("DeathsDirect", "System.Int32"),
                Tuple.Create("DeathsIndirect", "System.Int32"),
                Tuple.Create("DamageProperty", "System.Int32"),
                Tuple.Create("DamageCrops", "System.Int32"),
                Tuple.Create("Source", "System.String"),
                Tuple.Create("BeginLocation", "System.String"),
                Tuple.Create("EndLocation", "System.String"),
                Tuple.Create("BeginLat", "System.Double"),
                Tuple.Create("BeginLon", "System.Double"),
                Tuple.Create("EndLat", "System.Double"),
                Tuple.Create("EndLon", "System.Double"),
                Tuple.Create("EpisodeNarrative", "System.String"),
                Tuple.Create("EventNarrative", "System.String"),
                Tuple.Create("StormSummary", "System.Object"),
            });

    kustoClient.ExecuteControlCommand(databaseName, command);
}

インジェストのマッピングを定義する

受信した CSV データを、テーブル作成時に使用される列名にマップします。 CSV 列マッピング オブジェクトをそのテーブルにプロビジョニングします。

var tableMapping = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableMappingCreateCommand(
            Data.Ingestion.IngestionMappingKind.Csv,
            table,
            tableMapping,
            new[] {
                new ColumnMapping() { ColumnName = "StartTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "0" } } },
                new ColumnMapping() { ColumnName = "EndTime", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "1" } } },
                new ColumnMapping() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "2" } } },
                new ColumnMapping() { ColumnName = "EventId", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "3" } } },
                new ColumnMapping() { ColumnName = "State", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "4" } } },
                new ColumnMapping() { ColumnName = "EventType", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "5" } } },
                new ColumnMapping() { ColumnName = "InjuriesDirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "6" } } },
                new ColumnMapping() { ColumnName = "InjuriesIndirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "7" } } },
                new ColumnMapping() { ColumnName = "DeathsDirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "8" } } },
                new ColumnMapping() { ColumnName = "DeathsIndirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "9" } } },
                new ColumnMapping() { ColumnName = "DamageProperty", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "10" } } },
                new ColumnMapping() { ColumnName = "DamageCrops", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "11" } } },
                new ColumnMapping() { ColumnName = "Source", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "12" } } },
                new ColumnMapping() { ColumnName = "BeginLocation", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "13" } } },
                new ColumnMapping() { ColumnName = "EndLocation", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "14" } } },
                new ColumnMapping() { ColumnName = "BeginLat", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "15" } } },
                new ColumnMapping() { ColumnName = "BeginLon", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "16" } } },
                new ColumnMapping() { ColumnName = "EndLat", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "17" } } },
                new ColumnMapping() { ColumnName = "EndLon", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "18" } } },
                new ColumnMapping() { ColumnName = "EpisodeNarrative", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "19" } } },
                new ColumnMapping() { ColumnName = "EventNarrative", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "20" } } },
                new ColumnMapping() { ColumnName = "StormSummary", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "21" } } }
        });

    kustoClient.ExecuteControlCommand(databaseName, command);
}

Python パッケージのインストール

Azure Synapse Data Explorer 向けの Python パッケージをインストールするには、Python をパスに設定した状態でコマンド プロンプトを開きます。 次のコマンドを実行します。

pip install azure-common
pip install azure-mgmt-kusto

認証

次の例を実行するには、リソースにアクセスできる Microsoft Entra アプリケーションとサービス プリンシパルが必要です。 無料の Microsoft Entra アプリケーションを作成し、サブスクリプション レベルでロールの割り当てを追加するには、Microsoft Entra アプリケーションの作成に関する記事を参照してください。 また、ディレクトリ (テナント) ID、アプリケーション ID、およびクライアント シークレットも必要です。

イベント ハブ データ接続を追加する

次の例は、イベント ハブ データ接続をプログラムを使用して追加する方法を示しています。 Azure portal を使用してイベント ハブ データ接続を追加する方法については、「イベント ハブへの接続」を参照してください。

from azure.mgmt.kusto import KustoManagementClient
from azure.mgmt.kusto.models import EventHubDataConnection
from azure.common.credentials import ServicePrincipalCredentials

#Directory (tenant) ID
tenant_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
#Application ID
client_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
#Client Secret
client_secret = "xxxxxxxxxxxxxx"
subscription_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
credentials = ServicePrincipalCredentials(
        client_id=client_id,
        secret=client_secret,
        tenant=tenant_id
    )
kusto_management_client = KustoManagementClient(credentials, subscription_id)

resource_group_name = "testrg"
#The cluster and database that are created as part of the Prerequisites
cluster_name = "mykustocluster"
database_name = "mykustodatabase"
data_connection_name = "myeventhubconnect"
#The event hub that is created as part of the Prerequisites
event_hub_resource_id = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.EventHub/namespaces/xxxxxx/eventhubs/xxxxxx";
consumer_group = "$Default"
location = "Central US"
#The table and column mapping that are created as part of the Prerequisites
table_name = "StormEvents"
mapping_rule_name = "StormEvents_CSV_Mapping"
data_format = "csv"
#Returns an instance of LROPoller, check https://learn.microsoft.com/python/api/msrest/msrest.polling.lropoller?view=azure-python
poller = kusto_management_client.data_connections.create_or_update(resource_group_name=resource_group_name, cluster_name=cluster_name, database_name=database_name, data_connection_name=data_connection_name,
                                        parameters=EventHubDataConnection(event_hub_resource_id=event_hub_resource_id, consumer_group=consumer_group, location=location,
                                                                            table_name=table_name, mapping_rule_name=mapping_rule_name, data_format=data_format))
設定 推奨値 フィールドの説明
tenant_id xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx テナント ID。 ディレクトリ ID とも呼ばれます。
subscriptionId xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx リソースの作成に使用するサブスクリプション ID。
client_id xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx ご利用のテナント内のリソースにアクセスできるアプリケーションのクライアント ID。
client_secret xxxxxxxxxxxxxx ご利用のテナント内のリソースにアクセスできるアプリケーションのクライアント シークレット。
resource_group_name testrg ご利用のクラスターを含むリソース グループの名前。
cluster_name mykustocluster ご利用のクラスターの名前。
database_name mykustodatabase ご利用のクラスター内のターゲット データベースの名前。
data_connection_name myeventhubconnect データ接続の任意の名前。
table_name StormEvents ターゲット データベース内のターゲット テーブルの名前。
mapping_rule_name StormEvents_CSV_Mapping ターゲット テーブルに関連付けられている列マッピングの名前。
data_format csv メッセージのデータ形式。
event_hub_resource_id リソース ID インジェスト用のデータを保持しているイベント ハブのリソース ID。
consumer_group $Default ご利用のイベント ハブのコンシューマー グループ。
location 米国中部 データ接続リソースの場所。

リソースをクリーンアップする

データ接続を削除するには、次のコマンドを使用します。

kusto_management_client.data_connections.delete(resource_group_name=resource_group_name, cluster_name=kusto_cluster_name, database_name=kusto_database_name, data_connection_name=kusto_data_connection_name)

次のステップ