Creare una connessione dati dell'hub eventi per Esplora dati di Azure Synapse usando Python (anteprima)
Esplora dati di Azure Synapse è un servizio di esplorazione dati rapido e a scalabilità elevata per dati di log e di telemetria. Esplora dati di Azure Synapse offre funzionalità di inserimento dati (caricamento) da hub eventi, hub IoT e BLOB scritti nei contenitori BLOB.
In questo articolo viene creata una connessione dati dell'hub eventi per Esplora dati di Azure Synapse usando Python.
Prerequisiti
Una sottoscrizione di Azure. Creare un account Azure gratuito.
Creare un pool di Esplora dati usando Synapse Studio o il portale di Azure
Creare un database di Esplora dati.
Nel riquadro sinistro di Synapse Studio selezionare Dati.
Selezionare + (Aggiungi nuova risorsa) >Pool di Esplora dati e usare le informazioni seguenti:
Impostazione Valore suggerito Descrizione Nome pool contosodataexplorer Nome del pool di Esplora dati da usare Nome TestDatabase Il nome del database deve essere univoco all'interno del cluster. Periodo di conservazione predefinito 365 Intervallo di tempo (in giorni) per cui è garantito che i dati rimangano disponibili per le query. L'intervallo di tempo viene misurato dal momento in cui i dati vengono inseriti. Periodo cache predefinito 31 L'intervallo di tempo (in giorni) per cui mantenere i dati sottoposti frequentemente a query disponibili nell'archiviazione su unità SSD o nella RAM, invece che nell'archiviazione a lungo termine. Selezionare Crea per creare il database. Per la creazione è in genere necessario meno di un minuto.
Creare una tabella nel cluster di prova
Creare una tabella denominata StormEvents
che corrisponda allo schema dei dati nel file StormEvents.csv
.
Suggerimento
I frammenti di codice seguenti creano un'istanza di un client per quasi ogni chiamata. Questa operazione viene eseguita per rendere eseguibile singolarmente ogni frammento. Nell'ambiente di produzione, le istanze client sono di tipo rientrante e devono essere mantenute a condizione che sia necessario. Una singola istanza client per URI è sufficiente, anche quando si usano più database (il database può essere specificato a livello di comando).
var databaseName = "<DatabaseName>";
var table = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command =
CslCommandGenerator.GenerateTableCreateCommand(
table,
new[]
{
Tuple.Create("StartTime", "System.DateTime"),
Tuple.Create("EndTime", "System.DateTime"),
Tuple.Create("EpisodeId", "System.Int32"),
Tuple.Create("EventId", "System.Int32"),
Tuple.Create("State", "System.String"),
Tuple.Create("EventType", "System.String"),
Tuple.Create("InjuriesDirect", "System.Int32"),
Tuple.Create("InjuriesIndirect", "System.Int32"),
Tuple.Create("DeathsDirect", "System.Int32"),
Tuple.Create("DeathsIndirect", "System.Int32"),
Tuple.Create("DamageProperty", "System.Int32"),
Tuple.Create("DamageCrops", "System.Int32"),
Tuple.Create("Source", "System.String"),
Tuple.Create("BeginLocation", "System.String"),
Tuple.Create("EndLocation", "System.String"),
Tuple.Create("BeginLat", "System.Double"),
Tuple.Create("BeginLon", "System.Double"),
Tuple.Create("EndLat", "System.Double"),
Tuple.Create("EndLon", "System.Double"),
Tuple.Create("EpisodeNarrative", "System.String"),
Tuple.Create("EventNarrative", "System.String"),
Tuple.Create("StormSummary", "System.Object"),
});
kustoClient.ExecuteControlCommand(databaseName, command);
}
Definire il mapping di inserimento
Eseguire il mapping dei dati CSV in ingresso sui nomi di colonna usati durante la creazione della tabella. Effettuare il provisioning di un oggetto di mapping di colonne CSV su tale tabella.
var tableMapping = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command =
CslCommandGenerator.GenerateTableMappingCreateCommand(
Data.Ingestion.IngestionMappingKind.Csv,
table,
tableMapping,
new[] {
new ColumnMapping() { ColumnName = "StartTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "0" } } },
new ColumnMapping() { ColumnName = "EndTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "1" } } },
new ColumnMapping() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "2" } } },
new ColumnMapping() { ColumnName = "EventId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "3" } } },
new ColumnMapping() { ColumnName = "State", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "4" } } },
new ColumnMapping() { ColumnName = "EventType", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "5" } } },
new ColumnMapping() { ColumnName = "InjuriesDirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "6" } } },
new ColumnMapping() { ColumnName = "InjuriesIndirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "7" } } },
new ColumnMapping() { ColumnName = "DeathsDirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "8" } } },
new ColumnMapping() { ColumnName = "DeathsIndirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "9" } } },
new ColumnMapping() { ColumnName = "DamageProperty", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "10" } } },
new ColumnMapping() { ColumnName = "DamageCrops", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "11" } } },
new ColumnMapping() { ColumnName = "Source", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "12" } } },
new ColumnMapping() { ColumnName = "BeginLocation", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "13" } } },
new ColumnMapping() { ColumnName = "EndLocation", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "14" } } },
new ColumnMapping() { ColumnName = "BeginLat", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "15" } } },
new ColumnMapping() { ColumnName = "BeginLon", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "16" } } },
new ColumnMapping() { ColumnName = "EndLat", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "17" } } },
new ColumnMapping() { ColumnName = "EndLon", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "18" } } },
new ColumnMapping() { ColumnName = "EpisodeNarrative", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "19" } } },
new ColumnMapping() { ColumnName = "EventNarrative", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "20" } } },
new ColumnMapping() { ColumnName = "StormSummary", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "21" } } }
});
kustoClient.ExecuteControlCommand(databaseName, command);
}
Installa pacchetto Python
Per installare il pacchetto di Python per Esplora dati di Azure Synapse, aprire un prompt dei comandi con un percorso contenente Python. Esegui questo comando:
pip install azure-common
pip install azure-mgmt-kusto
Autenticazione
Per eseguire l'esempio seguente, sono necessarie un'applicazione Microsoft Entra e un'entità servizio che possano accedere alle risorse. Per creare un'applicazione Microsoft Entra gratuita e aggiungere un'assegnazione di ruolo a livello di sottoscrizione, vedere Creare un'applicazione Microsoft Entra. Sono necessari anche l'ID directory (tenant), l'ID applicazione e il segreto client.
Aggiungere una connessione dati dell'hub eventi
L'esempio seguente spiega come aggiungere una connessione dati dell'hub eventi a livello programmatico. Per informazioni sull'aggiunta di una connessione dati all'hub eventi tramite il portale di Azure, vedere Connettersi all'hub eventi.
from azure.mgmt.kusto import KustoManagementClient
from azure.mgmt.kusto.models import EventHubDataConnection
from azure.common.credentials import ServicePrincipalCredentials
#Directory (tenant) ID
tenant_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
#Application ID
client_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
#Client Secret
client_secret = "xxxxxxxxxxxxxx"
subscription_id = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx"
credentials = ServicePrincipalCredentials(
client_id=client_id,
secret=client_secret,
tenant=tenant_id
)
kusto_management_client = KustoManagementClient(credentials, subscription_id)
resource_group_name = "testrg"
#The cluster and database that are created as part of the Prerequisites
cluster_name = "mykustocluster"
database_name = "mykustodatabase"
data_connection_name = "myeventhubconnect"
#The event hub that is created as part of the Prerequisites
event_hub_resource_id = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.EventHub/namespaces/xxxxxx/eventhubs/xxxxxx";
consumer_group = "$Default"
location = "Central US"
#The table and column mapping that are created as part of the Prerequisites
table_name = "StormEvents"
mapping_rule_name = "StormEvents_CSV_Mapping"
data_format = "csv"
#Returns an instance of LROPoller, check https://learn.microsoft.com/python/api/msrest/msrest.polling.lropoller?view=azure-python
poller = kusto_management_client.data_connections.create_or_update(resource_group_name=resource_group_name, cluster_name=cluster_name, database_name=database_name, data_connection_name=data_connection_name,
parameters=EventHubDataConnection(event_hub_resource_id=event_hub_resource_id, consumer_group=consumer_group, location=location,
table_name=table_name, mapping_rule_name=mapping_rule_name, data_format=data_format))
Impostazione | Valore consigliato | Descrizione campo |
---|---|---|
tenant_id | xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx | ID tenant. Noto anche come ID directory. |
subscriptionId | xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx | L'ID sottoscrizione usato per la creazione di risorse. |
client_id | xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx | L'ID client dell'applicazione che può accedere alle risorse nel tenant. |
client_secret | xxxxxxxxxxxxxx | Il segreto client dell'applicazione che può accedere alle risorse nel tenant. |
resource_group_name | testrg | Nome del gruppo di risorse contenente il cluster. |
cluster_name | mykustocluster | Nome del cluster. |
database_name | mykustodatabase | Nome del database di destinazione nel cluster. |
data_connection_name | myeventhubconnect | Nome desiderato della connessione dati. |
table_name | StormEvents | Nome della tabella di destinazione nel database di destinazione. |
mapping_rule_name | StormEvents_CSV_Mapping | Nome del mapping delle colonne relativo alla tabella di destinazione. |
data_format | csv | Formato dei dati del messaggio. |
event_hub_resource_id | ID risorsa | ID risorsa dell'hub eventi che contiene i dati per l'inserimento. |
consumer_group | $Default | Gruppo di consumer dell'hub eventi. |
posizione | Stati Uniti centrali | Posizione della risorsa di connessione dati. |
Pulire le risorse
Per eliminare la connessione dati, usare il comando seguente:
kusto_management_client.data_connections.delete(resource_group_name=resource_group_name, cluster_name=kusto_cluster_name, database_name=kusto_database_name, data_connection_name=kusto_data_connection_name)