Sdílet prostřednictvím


Vytvoření datového připojení centra událostí pro Azure Synapse Data Explorer pomocí Pythonu (Preview)

Azure Synapse Data Explorer je rychlá a vysoce škálovatelná služba pro zkoumání dat protokolů a telemetrie. Azure Synapse Data Explorer nabízí příjem dat (načítání dat) ze služby Event Hubs, IoT Hubs a objektů blob zapsaných do kontejnerů objektů blob.

V tomto článku vytvoříte datové připojení centra událostí pro Průzkumníka dat Azure Synapse pomocí Pythonu.

Předpoklady

  • Předplatné Azure. Vytvořte bezplatný účet Azure.

  • Vytvoření fondu Průzkumníka dat pomocí nástroje Synapse Studio nebo webu Azure Portal

  • Vytvořte databázi Průzkumníka dat.

    1. V nástroji Synapse Studio v levém podokně vyberte Data.

    2. Vyberte + fond Průzkumníka dat (přidat nový zdroj) >a použijte následující informace:

      Nastavení Navrhovaná hodnota Popis
      Název fondu contosodataexplorer Název fondu Průzkumníka dat, který se má použít
      Název TestDatabase Název databáze musí být v rámci clusteru jedinečný.
      Výchozí doba uchovávání 365 Časové období (ve dnech), pro které je zaručeno, že jsou data k dispozici pro dotazování. Časový rozsah se začíná měřit od okamžiku, kdy jsou data ingestována.
      Výchozí období mezipaměti 31 Časové období (ve dnech), pro které se mají uchovávat často dotazovaná data dostupná v úložišti SSD nebo paměti RAM, a ne v dlouhodobějším úložišti.
    3. Výběrem možnosti Vytvořit vytvořte databázi. Vytvoření obvykle trvá méně než minutu.

Vytvoření tabulky v testovacím clusteru

Vytvořte tabulku s názvem StormEvents , která odpovídá schématu StormEvents.csv dat v souboru.

Tip

Následující fragmenty kódu vytvoří instanci klienta pro téměř každé volání. To se provádí, aby každý fragment kódu byl jednotlivě spustitelný. V produkčním prostředí se instance klienta znovu zadají a měly by se uchovávat tak dlouho, jak je to potřeba. Jedna instance klienta na identifikátor URI stačí, i když pracujete s více databázemi (databázi je možné zadat na úrovni příkazu).

var databaseName = "<DatabaseName>";
var table = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableCreateCommand(
            table,
            new[]
            {
                Tuple.Create("StartTime", "System.DateTime"),
                Tuple.Create("EndTime", "System.DateTime"),
                Tuple.Create("EpisodeId", "System.Int32"),
                Tuple.Create("EventId", "System.Int32"),
                Tuple.Create("State", "System.String"),
                Tuple.Create("EventType", "System.String"),
                Tuple.Create("InjuriesDirect", "System.Int32"),
                Tuple.Create("InjuriesIndirect", "System.Int32"),
                Tuple.Create("DeathsDirect", "System.Int32"),
                Tuple.Create("DeathsIndirect", "System.Int32"),
                Tuple.Create("DamageProperty", "System.Int32"),
                Tuple.Create("DamageCrops", "System.Int32"),
                Tuple.Create("Source", "System.String"),
                Tuple.Create("BeginLocation", "System.String"),
                Tuple.Create("EndLocation", "System.String"),
                Tuple.Create("BeginLat", "System.Double"),
                Tuple.Create("BeginLon", "System.Double"),
                Tuple.Create("EndLat", "System.Double"),
                Tuple.Create("EndLon", "System.Double"),
                Tuple.Create("EpisodeNarrative", "System.String"),
                Tuple.Create("EventNarrative", "System.String"),
                Tuple.Create("StormSummary", "System.Object"),
            });

    kustoClient.ExecuteControlCommand(databaseName, command);
}

Definování mapování ingestace

Namapujte příchozí data CSV na názvy sloupců použitých při vytváření tabulky. Zřiďte objekt mapování sloupců CSV v této tabulce.

var tableMapping = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableMappingCreateCommand(
            Data.Ingestion.IngestionMappingKind.Csv,
            table,
            tableMapping,
            new[] {
                new ColumnMapping() { ColumnName = "StartTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "0" } } },
                new ColumnMapping() { ColumnName = "EndTime", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "1" } } },
                new ColumnMapping() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "2" } } },
                new ColumnMapping() { ColumnName = "EventId", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "3" } } },
                new ColumnMapping() { ColumnName = "State", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "4" } } },
                new ColumnMapping() { ColumnName = "EventType", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "5" } } },
                new ColumnMapping() { ColumnName = "InjuriesDirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "6" } } },
                new ColumnMapping() { ColumnName = "InjuriesIndirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "7" } } },
                new ColumnMapping() { ColumnName = "DeathsDirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "8" } } },
                new ColumnMapping() { ColumnName = "DeathsIndirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "9" } } },
                new ColumnMapping() { ColumnName = "DamageProperty", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "10" } } },
                new ColumnMapping() { ColumnName = "DamageCrops", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "11" } } },
                new ColumnMapping() { ColumnName = "Source", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "12" } } },
                new ColumnMapping() { ColumnName = "BeginLocation", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "13" } } },
                new ColumnMapping() { ColumnName = "EndLocation", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "14" } } },
                new ColumnMapping() { ColumnName = "BeginLat", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "15" } } },
                new ColumnMapping() { ColumnName = "BeginLon", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "16" } } },
                new ColumnMapping() { ColumnName = "EndLat", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "17" } } },
                new ColumnMapping() { ColumnName = "EndLon", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "18" } } },
                new ColumnMapping() { ColumnName = "EpisodeNarrative", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "19" } } },
                new ColumnMapping() { ColumnName = "EventNarrative", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "20" } } },
                new ColumnMapping() { ColumnName = "StormSummary", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "21" } } }
        });

    kustoClient.ExecuteControlCommand(databaseName, command);
}

Instalace balíčku Pythonu

Pokud chcete nainstalovat balíček Pythonu pro Průzkumník dat Azure Synapse, otevřete příkazový řádek s Pythonem v jeho cestě. Spusťte následující příkaz:

pip install azure-common
pip install azure-mgmt-kusto

Authentication

Ke spuštění následujícího příkladu potřebujete aplikaci Microsoft Entra a instanční objekt, který má přístup k prostředkům. Pokud chcete vytvořit bezplatnou aplikaci Microsoft Entra a přidat přiřazení role na úrovni předplatného, přečtěte si téma Vytvoření aplikace Microsoft Entra. Potřebujete také ID adresáře (tenanta), ID aplikace a tajný klíč klienta.

Přidání datového připojení centra událostí

Následující příklad ukazuje, jak přidat datové připojení centra událostí programově. Viz připojení k centru událostí pro přidání datového připojení centra událostí pomocí webu Azure Portal.

from azure.mgmt.kusto import KustoManagementClient
from azure.mgmt.kusto.models import EventHubDataConnection
from azure.common.credentials import ServicePrincipalCredentials

#Directory (tenant) ID
tenant_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
#Application ID
client_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
#Client Secret
client_secret = "xxxxxxxxxxxxxx"
subscription_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
credentials = ServicePrincipalCredentials(
        client_id=client_id,
        secret=client_secret,
        tenant=tenant_id
    )
kusto_management_client = KustoManagementClient(credentials, subscription_id)

resource_group_name = "testrg"
#The cluster and database that are created as part of the Prerequisites
cluster_name = "mykustocluster"
database_name = "mykustodatabase"
data_connection_name = "myeventhubconnect"
#The event hub that is created as part of the Prerequisites
event_hub_resource_id = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.EventHub/namespaces/xxxxxx/eventhubs/xxxxxx";
consumer_group = "$Default"
location = "Central US"
#The table and column mapping that are created as part of the Prerequisites
table_name = "StormEvents"
mapping_rule_name = "StormEvents_CSV_Mapping"
data_format = "csv"
#Returns an instance of LROPoller, check https://learn.microsoft.com/python/api/msrest/msrest.polling.lropoller?view=azure-python
poller = kusto_management_client.data_connections.create_or_update(resource_group_name=resource_group_name, cluster_name=cluster_name, database_name=database_name, data_connection_name=data_connection_name,
                                        parameters=EventHubDataConnection(event_hub_resource_id=event_hub_resource_id, consumer_group=consumer_group, location=location,
                                                                            table_name=table_name, mapping_rule_name=mapping_rule_name, data_format=data_format))
Nastavení Navrhovaná hodnota Popis pole
tenant_id xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx ID vašeho tenanta Označuje se také jako ID adresáře.
subscriptionId xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx ID předplatného, které používáte k vytvoření prostředku.
client_id xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx ID klienta aplikace, která má přístup k prostředkům ve vašem tenantovi.
tajný klíč klienta xxxxxxxxxxxxxxxx Tajný klíč klienta aplikace, která má přístup k prostředkům ve vašem tenantovi.
resource_group_name testrg Název skupiny prostředků obsahující váš cluster.
cluster_name mykustocluster Název clusteru.
Název_databáze mykustodatabase Název cílové databáze v clusteru
data_connection_name myeventhubconnect Požadovaný název datového připojení.
Table_name StormEvents Název cílové tabulky v cílové databázi.
mapping_rule_name StormEvents_CSV_Mapping Název mapování sloupců související s cílovou tabulkou.
data_format Csv Formát dat zprávy.
event_hub_resource_id ID prostředku ID prostředku vašeho centra událostí, které obsahuje data pro příjem dat.
consumer_group $Default Skupina příjemců vašeho centra událostí.
umístění USA – střed Umístění prostředku datového připojení

Vyčištění prostředků

Pokud chcete datové připojení odstranit, použijte následující příkaz:

kusto_management_client.data_connections.delete(resource_group_name=resource_group_name, cluster_name=kusto_cluster_name, database_name=kusto_database_name, data_connection_name=kusto_data_connection_name)

Další kroky