Condividi tramite


Inserire dati usando la libreria Python di Esplora dati di Azure

In questo articolo vengono inseriti dati usando la libreria Python di Esplora dati di Azure. Esplora dati di Azure è un servizio di esplorazione dati rapido e a scalabilità elevata per dati di log e di telemetria. Esplora dati di Azure offre due librerie client per Python: una libreria di inserimenti e una libreria dati. Queste librerie consentono di inserire o caricare dati in un cluster ed eseguire query sui dati dal codice.

Creare prima di tutto una tabella e un mapping dei dati in un cluster. Quindi accodare l'inserimento nel cluster e convalidare i risultati.

Prerequisiti

Installare i dati e inserire le librerie

Installare azure-kusto-data e azure-kusto-ingest.

pip install azure-kusto-data
pip install azure-kusto-ingest

Aggiungere istruzioni di importazione e costanti

Importare classi da azure-kusto-data.

from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
from azure.kusto.data.exceptions import KustoServiceError
from azure.kusto.data.helpers import dataframe_from_result_table

Per autenticare un'applicazione, Esplora dati di Azure usa l'ID tenant di Microsoft Entra. Per trovare l'ID tenant, usare l'URL seguente, sostituendo il dominio per YourDomain.

https://login.microsoftonline.com/<YourDomain>/.well-known/openid-configuration/

Ad esempio, se il dominio è contoso.com, l'URL è: https://login.microsoftonline.com/contoso.com/.well-known/openid-configuration/. Fare clic su questo URL per visualizzare i risultati; la prima riga è la seguente.

"authorization_endpoint":"https://login.microsoftonline.com/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"

L'ID tenant in questo caso è aaaabbbb-0000-cccc-1111-dddd2222eeee. Impostare i valori per AAD_TENANT_ID, KUSTO_URI, KUSTO_INGEST_URI e KUSTO_DATABASE prima di eseguire questo codice.

AAD_TENANT_ID = "<TenantId>"
KUSTO_URI = "https://<ClusterName>.<Region>.kusto.windows.net/"
KUSTO_INGEST_URI = "https://ingest-<ClusterName>.<Region>.kusto.windows.net/"
KUSTO_DATABASE = "<DatabaseName>"

Creare ora la stringa di connessione. L'esempio seguente usa l'autenticazione del dispositivo per accedere al cluster. È anche possibile usare l'autenticazione dell'identità gestita , il certificato dell'applicazione Microsoft Entra, la chiave dell'applicazione Microsoft Entra e la password.

Crea la tabella di destinazione e il mapping in un passaggio successivo.

KCSB_INGEST = KustoConnectionStringBuilder.with_interactive_login(
    KUSTO_INGEST_URI)

KCSB_DATA = KustoConnectionStringBuilder.with_interactive_login(
    KUSTO_URI)

DESTINATION_TABLE = "StormEvents"
DESTINATION_TABLE_COLUMN_MAPPING = "StormEvents_CSV_Mapping"

Impostare le informazioni sul file di origine

Importare classi aggiuntive e impostare costanti per il file di origine dati. Questo esempio usa un file di esempio ospitato in Archiviazione BLOB di Azure. Il set di dati di esempio StormEvents contiene dati relativi al meteo dei National Center for Environmental Information.

from azure.kusto.data import DataFormat
from azure.kusto.ingest import QueuedIngestClient, IngestionProperties, FileDescriptor, BlobDescriptor, DataFormat, ReportLevel, ReportMethod

CONTAINER = "samplefiles"
ACCOUNT_NAME = "kustosamples"
SAS_TOKEN = ""  # If relevant add SAS token
FILE_PATH = "StormEvents.csv"
FILE_SIZE = 64158321    # in bytes

BLOB_PATH = "https://" + ACCOUNT_NAME + ".blob.core.windows.net/" + \
    CONTAINER + "/" + FILE_PATH + SAS_TOKEN

Creare una tabella nel cluster

Creare una tabella che corrisponda allo schema dei dati nel file StormEvents.csv. Quando questo codice viene eseguito, restituisce un messaggio simile al seguente: Per accedere, usare un Web browser per aprire la pagina https://microsoft.com/devicelogin e immettere il codice F3W4VWZDM per l'autenticazione. Seguire la procedura per accedere, quindi tornare a eseguire il blocco di codice successivo. I blocchi di codice successivi che effettuano una connessione richiedono di eseguire di nuovo l'accesso.

KUSTO_CLIENT = KustoClient(KCSB_DATA)
CREATE_TABLE_COMMAND = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)"

RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_TABLE_COMMAND)

dataframe_from_result_table(RESPONSE.primary_results[0])

Definire il mapping di inserimento

Mappare i dati CSV in ingresso ai nomi di colonna e ai tipi di dati utilizzati per creare la tabella. Mappa i campi dati di origine alle colonne della tabella di destinazione

CREATE_MAPPING_COMMAND = """.create table StormEvents ingestion csv mapping 'StormEvents_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EpisodeId","datatype":"int","Ordinal":2},{"Name":"EventId","datatype":"int","Ordinal":3},{"Name":"State","datatype":"string","Ordinal":4},{"Name":"EventType","datatype":"string","Ordinal":5},{"Name":"InjuriesDirect","datatype":"int","Ordinal":6},{"Name":"InjuriesIndirect","datatype":"int","Ordinal":7},{"Name":"DeathsDirect","datatype":"int","Ordinal":8},{"Name":"DeathsIndirect","datatype":"int","Ordinal":9},{"Name":"DamageProperty","datatype":"int","Ordinal":10},{"Name":"DamageCrops","datatype":"int","Ordinal":11},{"Name":"Source","datatype":"string","Ordinal":12},{"Name":"BeginLocation","datatype":"string","Ordinal":13},{"Name":"EndLocation","datatype":"string","Ordinal":14},{"Name":"BeginLat","datatype":"real","Ordinal":16},{"Name":"BeginLon","datatype":"real","Ordinal":17},{"Name":"EndLat","datatype":"real","Ordinal":18},{"Name":"EndLon","datatype":"real","Ordinal":19},{"Name":"EpisodeNarrative","datatype":"string","Ordinal":20},{"Name":"EventNarrative","datatype":"string","Ordinal":21},{"Name":"StormSummary","datatype":"dynamic","Ordinal":22}]'"""

RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_MAPPING_COMMAND)

dataframe_from_result_table(RESPONSE.primary_results[0])

Metti in coda un messaggio per l'inserimento

Mettere in coda un messaggio per recuperare i dati dall'archiviazione BLOB e ingestire tali dati in Azure Data Explorer.

INGESTION_CLIENT = QueuedIngestClient(KCSB_INGEST)

# All ingestion properties are documented here: https://learn.microsoft.com/azure/kusto/management/data-ingest#ingestion-properties
INGESTION_PROPERTIES = IngestionProperties(database=KUSTO_DATABASE, table=DESTINATION_TABLE, data_format=DataFormat.CSV,
                                           ingestion_mapping_reference=DESTINATION_TABLE_COLUMN_MAPPING, additional_properties={'ignoreFirstRecord': 'true'})
# FILE_SIZE is the raw size of the data in bytes
BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)
INGESTION_CLIENT.ingest_from_blob(
    BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)

print('Done queuing up ingestion with Azure Data Explorer')

Eseguire query sui dati ingestiti nella tabella

Attendere da cinque a 10 minuti per l'inserimento in coda per pianificare l'inserimento e caricare i dati in Esplora dati di Azure. Eseguire quindi il codice seguente per ottenere il conteggio dei record nella tabella StormEvents.

QUERY = "StormEvents | count"

RESPONSE = KUSTO_CLIENT.execute_query(KUSTO_DATABASE, QUERY)

dataframe_from_result_table(RESPONSE.primary_results[0])

Esegui query di risoluzione dei problemi

Effettua l'accesso a https://dataexplorer.azure.com e connettiti al tuo cluster. Eseguire il comando seguente nel database per verificare se si sono verificati errori di inserimento nelle ultime quattro ore. Sostituire il nome del database prima dell'esecuzione.

.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"

Eseguire il comando seguente per visualizzare lo stato di tutte le operazioni di inserimento nelle ultime quattro ore. Sostituire il nome del database prima dell'esecuzione.

.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Table == "StormEvents" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId

Pulire le risorse

Se prevedi di seguire gli altri articoli, mantieni le risorse create. In caso contrario, eseguire il comando seguente nel database per pulire la tabella StormEvents.

.drop table StormEvents

Passo successivo