Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
In diesem Artikel nehmen Sie Daten mithilfe der Python-Bibliothek des Azure-Daten-Explorers ein. Azure-Daten-Explorer ist ein schneller und hochgradig skalierbarer Dienst zur Untersuchung von Daten (Protokoll- und Telemetriedaten). Azure Data Explorer stellt zwei Clientbibliotheken für Python bereit: eine Ingestionsbibliothek und eine Datenbibliothek. Mit diesen Bibliotheken können Sie Daten in einen Cluster aufnehmen oder laden und Daten aus Ihrem Code abfragen.
Erstellen Sie zunächst eine Tabelle und Datenzuordnung in einem Cluster. Anschließend wird die Datenaufnahme in die Warteschlange des Clusters gestellt und die Ergebnisse validiert.
Voraussetzungen
- Ein Microsoft-Konto oder eine Microsoft Entra-Benutzeridentität. Ein Azure-Abonnement ist nicht erforderlich.
- Ein Azure Data Explorer-Cluster und eine Datenbank. Erstellen Sie einen Cluster und eine Datenbank.
- Python 3.4 oder höher
Installieren Sie die Daten- und Ingest-Bibliotheken
Installieren Sie azure-kusto-data und azure-kusto-ingest.
pip install azure-kusto-data
pip install azure-kusto-ingest
Importanweisungen und Konstanten hinzufügen
Importieren von Klassen aus azure-kusto-data.
from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
from azure.kusto.data.exceptions import KustoServiceError
from azure.kusto.data.helpers import dataframe_from_result_table
Um eine Anwendung zu authentifizieren, verwendet Azure Data Explorer Ihre Microsoft Entra-Mandanten-ID. Um Ihre Mandanten-ID zu finden, verwenden Sie die folgende URL, und ersetzen Sie Ihre Domäne durch Ihre Domäne.
https://login.microsoftonline.com/<YourDomain>/.well-known/openid-configuration/
Wenn Ihre Domäne beispielsweise contoso.com ist, lautet die URL: https://login.microsoftonline.com/contoso.com/.well-known/openid-configuration/. Klicken Sie auf diese URL, um die Ergebnisse anzuzeigen; die erste Zeile lautet wie folgt.
"authorization_endpoint":"https://login.microsoftonline.com/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"
Die Mandanten-ID in diesem Fall lautet aaaabbbb-0000-cccc-1111-dddd2222eeee
. Legen Sie die Werte für AAD_TENANT_ID, KUSTO_URI, KUSTO_INGEST_URI und KUSTO_DATABASE fest, bevor Sie diesen Code ausführen.
AAD_TENANT_ID = "<TenantId>"
KUSTO_URI = "https://<ClusterName>.<Region>.kusto.windows.net/"
KUSTO_INGEST_URI = "https://ingest-<ClusterName>.<Region>.kusto.windows.net/"
KUSTO_DATABASE = "<DatabaseName>"
Erstellen Sie nun die Verbindungszeichenfolge. Im folgenden Beispiel wird die Geräteauthentifizierung verwendet, um auf den Cluster zuzugreifen. Sie können auch verwaltete Identitätsauthentifizierung , Microsoft Entra-Anwendungszertifikat, Microsoft Entra-Anwendungsschlüssel und Microsoft Entra-Benutzerund Kennwort verwenden.
Sie erstellen die Zieltabelle und die Abbildung in einem späteren Arbeitsschritt.
KCSB_INGEST = KustoConnectionStringBuilder.with_interactive_login(
KUSTO_INGEST_URI)
KCSB_DATA = KustoConnectionStringBuilder.with_interactive_login(
KUSTO_URI)
DESTINATION_TABLE = "StormEvents"
DESTINATION_TABLE_COLUMN_MAPPING = "StormEvents_CSV_Mapping"
Festlegen von Quelldateiinformationen
Importieren Sie zusätzliche Klassen, und legen Sie Konstanten für die Datenquellendatei fest. In diesem Beispiel wird eine Beispieldatei verwendet, die in Azure Blob Storage gehostet wird. Das StormEvents-Beispiel-Dataset enthält Wetterdaten aus den Nationalen Zentren für Umweltinformationen.
from azure.kusto.data import DataFormat
from azure.kusto.ingest import QueuedIngestClient, IngestionProperties, FileDescriptor, BlobDescriptor, DataFormat, ReportLevel, ReportMethod
CONTAINER = "samplefiles"
ACCOUNT_NAME = "kustosamples"
SAS_TOKEN = "" # If relevant add SAS token
FILE_PATH = "StormEvents.csv"
FILE_SIZE = 64158321 # in bytes
BLOB_PATH = "https://" + ACCOUNT_NAME + ".blob.core.windows.net/" + \
CONTAINER + "/" + FILE_PATH + SAS_TOKEN
Erstellen einer Tabelle in Ihrem Cluster
Erstellen Sie eine Tabelle, die dem Schema der Daten in der StormEvents.csv Datei entspricht. Wenn dieser Code ausgeführt wird, wird eine Meldung wie die folgende Meldung zurückgegeben: Um sich anzumelden, verwenden Sie einen Webbrowser, um die Seite https://microsoft.com/devicelogin zu öffnen, und geben Sie den Code F3W4VWZDM ein, um sich zu authentifizieren. Führen Sie die Schritte zum Anmelden aus, und kehren Sie dann zum Ausführen des nächsten Codeblocks zurück. Für nachfolgende Codeblöcke, die eine Verbindung herstellen, müssen Sie sich erneut anmelden.
KUSTO_CLIENT = KustoClient(KCSB_DATA)
CREATE_TABLE_COMMAND = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)"
RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_TABLE_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
Definieren der Erfassungszuordnung
Ordnen Sie eingehende CSV-Daten den Spaltennamen und Datentypen zu, die beim Erstellen der Tabelle verwendet werden. Dadurch werden Quelldatenfelder zieltabellenspalten zugeordnet.
CREATE_MAPPING_COMMAND = """.create table StormEvents ingestion csv mapping 'StormEvents_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EpisodeId","datatype":"int","Ordinal":2},{"Name":"EventId","datatype":"int","Ordinal":3},{"Name":"State","datatype":"string","Ordinal":4},{"Name":"EventType","datatype":"string","Ordinal":5},{"Name":"InjuriesDirect","datatype":"int","Ordinal":6},{"Name":"InjuriesIndirect","datatype":"int","Ordinal":7},{"Name":"DeathsDirect","datatype":"int","Ordinal":8},{"Name":"DeathsIndirect","datatype":"int","Ordinal":9},{"Name":"DamageProperty","datatype":"int","Ordinal":10},{"Name":"DamageCrops","datatype":"int","Ordinal":11},{"Name":"Source","datatype":"string","Ordinal":12},{"Name":"BeginLocation","datatype":"string","Ordinal":13},{"Name":"EndLocation","datatype":"string","Ordinal":14},{"Name":"BeginLat","datatype":"real","Ordinal":16},{"Name":"BeginLon","datatype":"real","Ordinal":17},{"Name":"EndLat","datatype":"real","Ordinal":18},{"Name":"EndLon","datatype":"real","Ordinal":19},{"Name":"EpisodeNarrative","datatype":"string","Ordinal":20},{"Name":"EventNarrative","datatype":"string","Ordinal":21},{"Name":"StormSummary","datatype":"dynamic","Ordinal":22}]'"""
RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_MAPPING_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
Nachricht für die Verarbeitung in die Warteschlange einreihen
Eine Nachricht in die Warteschlange stellen, um Daten aus dem Blob-Speicher abzurufen und diese in Azure Data Explorer zu integrieren.
INGESTION_CLIENT = QueuedIngestClient(KCSB_INGEST)
# All ingestion properties are documented here: https://learn.microsoft.com/azure/kusto/management/data-ingest#ingestion-properties
INGESTION_PROPERTIES = IngestionProperties(database=KUSTO_DATABASE, table=DESTINATION_TABLE, data_format=DataFormat.CSV,
ingestion_mapping_reference=DESTINATION_TABLE_COLUMN_MAPPING, additional_properties={'ignoreFirstRecord': 'true'})
# FILE_SIZE is the raw size of the data in bytes
BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)
INGESTION_CLIENT.ingest_from_blob(
BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)
print('Done queuing up ingestion with Azure Data Explorer')
Abfrage von Daten, die in die Tabelle eingespeist wurden
Warten Sie fünf bis zehn Minuten, bis die in die Warteschlange eingereihte Aufnahme geplant wird und die Daten in Azure Data Explorer geladen werden. Führen Sie dann den folgenden Code aus, um die Anzahl der Datensätze in der Tabelle StormEvents abzurufen.
QUERY = "StormEvents | count"
RESPONSE = KUSTO_CLIENT.execute_query(KUSTO_DATABASE, QUERY)
dataframe_from_result_table(RESPONSE.primary_results[0])
Ausführen von Problembehandlungsabfragen
Melden Sie sich bei https://dataexplorer.azure.com an und verbinden Sie sich mit Ihrem Cluster. Führen Sie den folgenden Befehl in Ihrer Datenbank aus, um festzustellen, ob in den letzten vier Stunden Fehler bei der Einspeisung aufgetreten sind. Ersetzen Sie den Datenbanknamen vor der Ausführung.
.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"
Führen Sie den folgenden Befehl aus, um den Status aller Ingestion-Vorgänge der letzten vier Stunden zur Anzeige zu bringen. Ersetzen Sie den Datenbanknamen vor der Ausführung.
.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Table == "StormEvents" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId
Bereinigen von Ressourcen
Wenn Sie beabsichtigen, unseren anderen Artikeln zu folgen, behalten Sie die von Ihnen erstellten Ressourcen bei. Wenn nicht, führen Sie den folgenden Befehl in Ihrer Datenbank aus, um die Tabelle "StormEvents" zu bereinigen.
.drop table StormEvents