Condividi tramite


Creare una connessione dati dell'hub eventi per Esplora dati di Azure Synapse usando Python (anteprima)

Esplora dati di Azure Synapse è un servizio di esplorazione dati rapido e a scalabilità elevata per dati di log e di telemetria. Esplora dati di Azure Synapse offre funzionalità di inserimento dati (caricamento) da hub eventi, hub IoT e BLOB scritti nei contenitori BLOB.

In questo articolo viene creata una connessione dati dell'hub eventi per Esplora dati di Azure Synapse usando Python.

Prerequisiti

  • Una sottoscrizione di Azure. Creare un account Azure gratuito.

  • Creare un pool di Esplora dati usando Synapse Studio o il portale di Azure

  • Creare un database di Esplora dati.

    1. Nel riquadro sinistro di Synapse Studio selezionare Dati.

    2. Selezionare + (Aggiungi nuova risorsa) >Pool di Esplora dati e usare le informazioni seguenti:

      Impostazione Valore suggerito Descrizione
      Nome pool contosodataexplorer Nome del pool di Esplora dati da usare
      Nome TestDatabase Il nome del database deve essere univoco all'interno del cluster.
      Periodo di conservazione predefinito 365 Intervallo di tempo (in giorni) per cui è garantito che i dati rimangano disponibili per le query. L'intervallo di tempo viene misurato dal momento in cui i dati vengono inseriti.
      Periodo cache predefinito 31 L'intervallo di tempo (in giorni) per cui mantenere i dati sottoposti frequentemente a query disponibili nell'archiviazione su unità SSD o nella RAM, invece che nell'archiviazione a lungo termine.
    3. Selezionare Crea per creare il database. Per la creazione è in genere necessario meno di un minuto.

Creare una tabella nel cluster di prova

Creare una tabella denominata StormEvents che corrisponda allo schema dei dati nel file StormEvents.csv.

Suggerimento

I frammenti di codice seguenti creano un'istanza di un client per quasi ogni chiamata. Questa operazione viene eseguita per rendere eseguibile singolarmente ogni frammento. Nell'ambiente di produzione, le istanze client sono di tipo rientrante e devono essere mantenute a condizione che sia necessario. Una singola istanza client per URI è sufficiente, anche quando si usano più database (il database può essere specificato a livello di comando).

var databaseName = "<DatabaseName>";
var table = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableCreateCommand(
            table,
            new[]
            {
                Tuple.Create("StartTime", "System.DateTime"),
                Tuple.Create("EndTime", "System.DateTime"),
                Tuple.Create("EpisodeId", "System.Int32"),
                Tuple.Create("EventId", "System.Int32"),
                Tuple.Create("State", "System.String"),
                Tuple.Create("EventType", "System.String"),
                Tuple.Create("InjuriesDirect", "System.Int32"),
                Tuple.Create("InjuriesIndirect", "System.Int32"),
                Tuple.Create("DeathsDirect", "System.Int32"),
                Tuple.Create("DeathsIndirect", "System.Int32"),
                Tuple.Create("DamageProperty", "System.Int32"),
                Tuple.Create("DamageCrops", "System.Int32"),
                Tuple.Create("Source", "System.String"),
                Tuple.Create("BeginLocation", "System.String"),
                Tuple.Create("EndLocation", "System.String"),
                Tuple.Create("BeginLat", "System.Double"),
                Tuple.Create("BeginLon", "System.Double"),
                Tuple.Create("EndLat", "System.Double"),
                Tuple.Create("EndLon", "System.Double"),
                Tuple.Create("EpisodeNarrative", "System.String"),
                Tuple.Create("EventNarrative", "System.String"),
                Tuple.Create("StormSummary", "System.Object"),
            });

    kustoClient.ExecuteControlCommand(databaseName, command);
}

Definire il mapping di inserimento

Eseguire il mapping dei dati CSV in ingresso sui nomi di colonna usati durante la creazione della tabella. Effettuare il provisioning di un oggetto di mapping di colonne CSV su tale tabella.

var tableMapping = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableMappingCreateCommand(
            Data.Ingestion.IngestionMappingKind.Csv,
            table,
            tableMapping,
            new[] {
                new ColumnMapping() { ColumnName = "StartTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "0" } } },
                new ColumnMapping() { ColumnName = "EndTime", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "1" } } },
                new ColumnMapping() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "2" } } },
                new ColumnMapping() { ColumnName = "EventId", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "3" } } },
                new ColumnMapping() { ColumnName = "State", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "4" } } },
                new ColumnMapping() { ColumnName = "EventType", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "5" } } },
                new ColumnMapping() { ColumnName = "InjuriesDirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "6" } } },
                new ColumnMapping() { ColumnName = "InjuriesIndirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "7" } } },
                new ColumnMapping() { ColumnName = "DeathsDirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "8" } } },
                new ColumnMapping() { ColumnName = "DeathsIndirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "9" } } },
                new ColumnMapping() { ColumnName = "DamageProperty", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "10" } } },
                new ColumnMapping() { ColumnName = "DamageCrops", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "11" } } },
                new ColumnMapping() { ColumnName = "Source", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "12" } } },
                new ColumnMapping() { ColumnName = "BeginLocation", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "13" } } },
                new ColumnMapping() { ColumnName = "EndLocation", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "14" } } },
                new ColumnMapping() { ColumnName = "BeginLat", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "15" } } },
                new ColumnMapping() { ColumnName = "BeginLon", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "16" } } },
                new ColumnMapping() { ColumnName = "EndLat", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "17" } } },
                new ColumnMapping() { ColumnName = "EndLon", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "18" } } },
                new ColumnMapping() { ColumnName = "EpisodeNarrative", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "19" } } },
                new ColumnMapping() { ColumnName = "EventNarrative", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "20" } } },
                new ColumnMapping() { ColumnName = "StormSummary", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "21" } } }
        });

    kustoClient.ExecuteControlCommand(databaseName, command);
}

Installa pacchetto Python

Per installare il pacchetto di Python per Esplora dati di Azure Synapse, aprire un prompt dei comandi con un percorso contenente Python. Esegui questo comando:

pip install azure-common
pip install azure-mgmt-kusto

Autenticazione

Per eseguire l'esempio seguente, sono necessarie un'applicazione Microsoft Entra e un'entità servizio che possano accedere alle risorse. Per creare un'applicazione Microsoft Entra gratuita e aggiungere un'assegnazione di ruolo a livello di sottoscrizione, vedere Creare un'applicazione Microsoft Entra. Sono necessari anche l'ID directory (tenant), l'ID applicazione e il segreto client.

Aggiungere una connessione dati dell'hub eventi

L'esempio seguente spiega come aggiungere una connessione dati dell'hub eventi a livello programmatico. Per informazioni sull'aggiunta di una connessione dati all'hub eventi tramite il portale di Azure, vedere Connettersi all'hub eventi.

from azure.mgmt.kusto import KustoManagementClient
from azure.mgmt.kusto.models import EventHubDataConnection
from azure.common.credentials import ServicePrincipalCredentials

#Directory (tenant) ID
tenant_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
#Application ID
client_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
#Client Secret
client_secret = "xxxxxxxxxxxxxx"
subscription_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
credentials = ServicePrincipalCredentials(
        client_id=client_id,
        secret=client_secret,
        tenant=tenant_id
    )
kusto_management_client = KustoManagementClient(credentials, subscription_id)

resource_group_name = "testrg"
#The cluster and database that are created as part of the Prerequisites
cluster_name = "mykustocluster"
database_name = "mykustodatabase"
data_connection_name = "myeventhubconnect"
#The event hub that is created as part of the Prerequisites
event_hub_resource_id = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.EventHub/namespaces/xxxxxx/eventhubs/xxxxxx";
consumer_group = "$Default"
location = "Central US"
#The table and column mapping that are created as part of the Prerequisites
table_name = "StormEvents"
mapping_rule_name = "StormEvents_CSV_Mapping"
data_format = "csv"
#Returns an instance of LROPoller, check https://learn.microsoft.com/python/api/msrest/msrest.polling.lropoller?view=azure-python
poller = kusto_management_client.data_connections.create_or_update(resource_group_name=resource_group_name, cluster_name=cluster_name, database_name=database_name, data_connection_name=data_connection_name,
                                        parameters=EventHubDataConnection(event_hub_resource_id=event_hub_resource_id, consumer_group=consumer_group, location=location,
                                                                            table_name=table_name, mapping_rule_name=mapping_rule_name, data_format=data_format))
Impostazione Valore consigliato Descrizione campo
tenant_id xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx ID tenant. Noto anche come ID directory.
subscriptionId xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx L'ID sottoscrizione usato per la creazione di risorse.
client_id xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx L'ID client dell'applicazione che può accedere alle risorse nel tenant.
client_secret xxxxxxxxxxxxxx Il segreto client dell'applicazione che può accedere alle risorse nel tenant.
resource_group_name testrg Nome del gruppo di risorse contenente il cluster.
cluster_name mykustocluster Nome del cluster.
database_name mykustodatabase Nome del database di destinazione nel cluster.
data_connection_name myeventhubconnect Nome desiderato della connessione dati.
table_name StormEvents Nome della tabella di destinazione nel database di destinazione.
mapping_rule_name StormEvents_CSV_Mapping Nome del mapping delle colonne relativo alla tabella di destinazione.
data_format csv Formato dei dati del messaggio.
event_hub_resource_id ID risorsa ID risorsa dell'hub eventi che contiene i dati per l'inserimento.
consumer_group $Default Gruppo di consumer dell'hub eventi.
posizione Stati Uniti centrali Posizione della risorsa di connessione dati.

Pulire le risorse

Per eliminare la connessione dati, usare il comando seguente:

kusto_management_client.data_connections.delete(resource_group_name=resource_group_name, cluster_name=kusto_cluster_name, database_name=kusto_database_name, data_connection_name=kusto_data_connection_name)

Passaggi successivi