Freigeben über


Aufnehmen von Daten mithilfe der Python-Bibliothek von Azure Data Explorer

In diesem Artikel nehmen Sie Daten mithilfe der Python-Bibliothek des Azure-Daten-Explorers ein. Azure-Daten-Explorer ist ein schneller und hochgradig skalierbarer Dienst zur Untersuchung von Daten (Protokoll- und Telemetriedaten). Azure Data Explorer stellt zwei Clientbibliotheken für Python bereit: eine Ingestionsbibliothek und eine Datenbibliothek. Mit diesen Bibliotheken können Sie Daten in einen Cluster aufnehmen oder laden und Daten aus Ihrem Code abfragen.

Erstellen Sie zunächst eine Tabelle und Datenzuordnung in einem Cluster. Anschließend wird die Datenaufnahme in die Warteschlange des Clusters gestellt und die Ergebnisse validiert.

Voraussetzungen

Installieren Sie die Daten- und Ingest-Bibliotheken

Installieren Sie azure-kusto-data und azure-kusto-ingest.

pip install azure-kusto-data
pip install azure-kusto-ingest

Importanweisungen und Konstanten hinzufügen

Importieren von Klassen aus azure-kusto-data.

from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
from azure.kusto.data.exceptions import KustoServiceError
from azure.kusto.data.helpers import dataframe_from_result_table

Um eine Anwendung zu authentifizieren, verwendet Azure Data Explorer Ihre Microsoft Entra-Mandanten-ID. Um Ihre Mandanten-ID zu finden, verwenden Sie die folgende URL, und ersetzen Sie Ihre Domäne durch Ihre Domäne.

https://login.microsoftonline.com/<YourDomain>/.well-known/openid-configuration/

Wenn Ihre Domäne beispielsweise contoso.com ist, lautet die URL: https://login.microsoftonline.com/contoso.com/.well-known/openid-configuration/. Klicken Sie auf diese URL, um die Ergebnisse anzuzeigen; die erste Zeile lautet wie folgt.

"authorization_endpoint":"https://login.microsoftonline.com/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"

Die Mandanten-ID in diesem Fall lautet aaaabbbb-0000-cccc-1111-dddd2222eeee. Legen Sie die Werte für AAD_TENANT_ID, KUSTO_URI, KUSTO_INGEST_URI und KUSTO_DATABASE fest, bevor Sie diesen Code ausführen.

AAD_TENANT_ID = "<TenantId>"
KUSTO_URI = "https://<ClusterName>.<Region>.kusto.windows.net/"
KUSTO_INGEST_URI = "https://ingest-<ClusterName>.<Region>.kusto.windows.net/"
KUSTO_DATABASE = "<DatabaseName>"

Erstellen Sie nun die Verbindungszeichenfolge. Im folgenden Beispiel wird die Geräteauthentifizierung verwendet, um auf den Cluster zuzugreifen. Sie können auch verwaltete Identitätsauthentifizierung , Microsoft Entra-Anwendungszertifikat, Microsoft Entra-Anwendungsschlüssel und Microsoft Entra-Benutzerund Kennwort verwenden.

Sie erstellen die Zieltabelle und die Abbildung in einem späteren Arbeitsschritt.

KCSB_INGEST = KustoConnectionStringBuilder.with_interactive_login(
    KUSTO_INGEST_URI)

KCSB_DATA = KustoConnectionStringBuilder.with_interactive_login(
    KUSTO_URI)

DESTINATION_TABLE = "StormEvents"
DESTINATION_TABLE_COLUMN_MAPPING = "StormEvents_CSV_Mapping"

Festlegen von Quelldateiinformationen

Importieren Sie zusätzliche Klassen, und legen Sie Konstanten für die Datenquellendatei fest. In diesem Beispiel wird eine Beispieldatei verwendet, die in Azure Blob Storage gehostet wird. Das StormEvents-Beispiel-Dataset enthält Wetterdaten aus den Nationalen Zentren für Umweltinformationen.

from azure.kusto.data import DataFormat
from azure.kusto.ingest import QueuedIngestClient, IngestionProperties, FileDescriptor, BlobDescriptor, DataFormat, ReportLevel, ReportMethod

CONTAINER = "samplefiles"
ACCOUNT_NAME = "kustosamples"
SAS_TOKEN = ""  # If relevant add SAS token
FILE_PATH = "StormEvents.csv"
FILE_SIZE = 64158321    # in bytes

BLOB_PATH = "https://" + ACCOUNT_NAME + ".blob.core.windows.net/" + \
    CONTAINER + "/" + FILE_PATH + SAS_TOKEN

Erstellen einer Tabelle in Ihrem Cluster

Erstellen Sie eine Tabelle, die dem Schema der Daten in der StormEvents.csv Datei entspricht. Wenn dieser Code ausgeführt wird, wird eine Meldung wie die folgende Meldung zurückgegeben: Um sich anzumelden, verwenden Sie einen Webbrowser, um die Seite https://microsoft.com/devicelogin zu öffnen, und geben Sie den Code F3W4VWZDM ein, um sich zu authentifizieren. Führen Sie die Schritte zum Anmelden aus, und kehren Sie dann zum Ausführen des nächsten Codeblocks zurück. Für nachfolgende Codeblöcke, die eine Verbindung herstellen, müssen Sie sich erneut anmelden.

KUSTO_CLIENT = KustoClient(KCSB_DATA)
CREATE_TABLE_COMMAND = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)"

RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_TABLE_COMMAND)

dataframe_from_result_table(RESPONSE.primary_results[0])

Definieren der Erfassungszuordnung

Ordnen Sie eingehende CSV-Daten den Spaltennamen und Datentypen zu, die beim Erstellen der Tabelle verwendet werden. Dadurch werden Quelldatenfelder zieltabellenspalten zugeordnet.

CREATE_MAPPING_COMMAND = """.create table StormEvents ingestion csv mapping 'StormEvents_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EpisodeId","datatype":"int","Ordinal":2},{"Name":"EventId","datatype":"int","Ordinal":3},{"Name":"State","datatype":"string","Ordinal":4},{"Name":"EventType","datatype":"string","Ordinal":5},{"Name":"InjuriesDirect","datatype":"int","Ordinal":6},{"Name":"InjuriesIndirect","datatype":"int","Ordinal":7},{"Name":"DeathsDirect","datatype":"int","Ordinal":8},{"Name":"DeathsIndirect","datatype":"int","Ordinal":9},{"Name":"DamageProperty","datatype":"int","Ordinal":10},{"Name":"DamageCrops","datatype":"int","Ordinal":11},{"Name":"Source","datatype":"string","Ordinal":12},{"Name":"BeginLocation","datatype":"string","Ordinal":13},{"Name":"EndLocation","datatype":"string","Ordinal":14},{"Name":"BeginLat","datatype":"real","Ordinal":16},{"Name":"BeginLon","datatype":"real","Ordinal":17},{"Name":"EndLat","datatype":"real","Ordinal":18},{"Name":"EndLon","datatype":"real","Ordinal":19},{"Name":"EpisodeNarrative","datatype":"string","Ordinal":20},{"Name":"EventNarrative","datatype":"string","Ordinal":21},{"Name":"StormSummary","datatype":"dynamic","Ordinal":22}]'"""

RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_MAPPING_COMMAND)

dataframe_from_result_table(RESPONSE.primary_results[0])

Nachricht für die Verarbeitung in die Warteschlange einreihen

Eine Nachricht in die Warteschlange stellen, um Daten aus dem Blob-Speicher abzurufen und diese in Azure Data Explorer zu integrieren.

INGESTION_CLIENT = QueuedIngestClient(KCSB_INGEST)

# All ingestion properties are documented here: https://learn.microsoft.com/azure/kusto/management/data-ingest#ingestion-properties
INGESTION_PROPERTIES = IngestionProperties(database=KUSTO_DATABASE, table=DESTINATION_TABLE, data_format=DataFormat.CSV,
                                           ingestion_mapping_reference=DESTINATION_TABLE_COLUMN_MAPPING, additional_properties={'ignoreFirstRecord': 'true'})
# FILE_SIZE is the raw size of the data in bytes
BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)
INGESTION_CLIENT.ingest_from_blob(
    BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)

print('Done queuing up ingestion with Azure Data Explorer')

Abfrage von Daten, die in die Tabelle eingespeist wurden

Warten Sie fünf bis zehn Minuten, bis die in die Warteschlange eingereihte Aufnahme geplant wird und die Daten in Azure Data Explorer geladen werden. Führen Sie dann den folgenden Code aus, um die Anzahl der Datensätze in der Tabelle StormEvents abzurufen.

QUERY = "StormEvents | count"

RESPONSE = KUSTO_CLIENT.execute_query(KUSTO_DATABASE, QUERY)

dataframe_from_result_table(RESPONSE.primary_results[0])

Ausführen von Problembehandlungsabfragen

Melden Sie sich bei https://dataexplorer.azure.com an und verbinden Sie sich mit Ihrem Cluster. Führen Sie den folgenden Befehl in Ihrer Datenbank aus, um festzustellen, ob in den letzten vier Stunden Fehler bei der Einspeisung aufgetreten sind. Ersetzen Sie den Datenbanknamen vor der Ausführung.

.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"

Führen Sie den folgenden Befehl aus, um den Status aller Ingestion-Vorgänge der letzten vier Stunden zur Anzeige zu bringen. Ersetzen Sie den Datenbanknamen vor der Ausführung.

.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Table == "StormEvents" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId

Bereinigen von Ressourcen

Wenn Sie beabsichtigen, unseren anderen Artikeln zu folgen, behalten Sie die von Ihnen erstellten Ressourcen bei. Wenn nicht, führen Sie den folgenden Befehl in Ihrer Datenbank aus, um die Tabelle "StormEvents" zu bereinigen.

.drop table StormEvents

Nächster Schritt