pozyskiwanie danych przy użyciu biblioteki języka Python w usłudze Azure Data Explorer
W tym artykule pozyskasz dane przy użyciu biblioteki azure Data Explorer Python. Azure Data Explorer to szybka i wysoce skalowalna usługa eksploracji danych na potrzeby danych dziennika i telemetrycznych. Usługa Azure Data Explorer udostępnia dwie biblioteki klienckie dla języka Python: bibliotekę pozyskiwania i bibliotekę danych. Te biblioteki umożliwiają pozyskiwanie lub ładowanie danych do klastra i wykonywanie zapytań o dane z kodu.
Najpierw utwórz tabelę i mapowanie danych w klastrze. Następnie umieścisz pozyskiwanie w kolejce do klastra i sprawdzisz poprawność wyników.
Wymagania wstępne
- Konto Microsoft lub tożsamość użytkownika Microsoft Entra. Subskrypcja platformy Azure nie jest wymagana.
- Baza danych i klaster usługi Azure Data Explorer. Utwórz klaster i bazę danych.
- Python 3.4+.
Instalowanie biblioteki danych i biblioteki pozyskiwania
Zainstaluj biblioteki azure-kusto-data i azure-kusto-ingest.
pip install azure-kusto-data
pip install azure-kusto-ingest
Dodawanie instrukcji importu i stałych
Zaimportuj klasy z biblioteki azure-kusto-data.
from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
from azure.kusto.data.exceptions import KustoServiceError
from azure.kusto.data.helpers import dataframe_from_result_table
Aby uwierzytelnić aplikację, usługa Azure Data Explorer używa identyfikatora dzierżawy Microsoft Entra. Aby znaleźć swój identyfikator dzierżawy, użyj następującego adresu URL, zastępując domenę domeną yourDomain.
https://login.microsoftonline.com/<YourDomain>/.well-known/openid-configuration/
Jeśli na przykład Twoja domena to contoso.com, adres URL to https://login.microsoftonline.com/contoso.com/.well-known/openid-configuration/. Kliknij ten adres URL, aby wyświetlić wyniki. Pierwszy wiersz wygląda w następujący sposób.
"authorization_endpoint":"https://login.microsoftonline.com/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"
Identyfikator dzierżawy w tym przypadku to 6babcaad-604b-40ac-a9d7-9fd97c0b779f
. Przed uruchomieniem tego kodu ustaw wartości dla stałych AAD_TENANT_ID, KUSTO_URI, KUSTO_INGEST_URI oraz KUSTO_DATABASE.
AAD_TENANT_ID = "<TenantId>"
KUSTO_URI = "https://<ClusterName>.<Region>.kusto.windows.net/"
KUSTO_INGEST_URI = "https://ingest-<ClusterName>.<Region>.kusto.windows.net/"
KUSTO_DATABASE = "<DatabaseName>"
Teraz możesz utworzyć parametry połączenia. W poniższym przykładzie użyto uwierzytelniania urządzenia w celu uzyskania dostępu do klastra. Można również użyć uwierzytelniania tożsamości zarządzanej, certyfikatu aplikacji Microsoft Entra, klucza aplikacji Microsoft Entra i Microsoft Entra użytkownika i hasła.
W kolejnym kroku utworzysz tabelę docelową i mapowanie.
KCSB_INGEST = KustoConnectionStringBuilder.with_interactive_login(
KUSTO_INGEST_URI)
KCSB_DATA = KustoConnectionStringBuilder.with_interactive_login(
KUSTO_URI)
DESTINATION_TABLE = "StormEvents"
DESTINATION_TABLE_COLUMN_MAPPING = "StormEvents_CSV_Mapping"
Ustawianie informacji o pliku źródłowym
Zaimportuj dodatkowe klasy i ustaw stałe dla pliku źródła danych. W tym przykładzie używany jest przykładowy plik hostowany w usłudze Azure Blob Storage. Przykładowy zestaw danych StormEvents zawiera dane związane z pogodą z National Centers for Environmental Information.
from azure.kusto.data import DataFormat
from azure.kusto.ingest import QueuedIngestClient, IngestionProperties, FileDescriptor, BlobDescriptor, DataFormat, ReportLevel, ReportMethod
CONTAINER = "samplefiles"
ACCOUNT_NAME = "kustosamples"
SAS_TOKEN = "" # If relevant add SAS token
FILE_PATH = "StormEvents.csv"
FILE_SIZE = 64158321 # in bytes
BLOB_PATH = "https://" + ACCOUNT_NAME + ".blob.core.windows.net/" + \
CONTAINER + "/" + FILE_PATH + SAS_TOKEN
Tworzenie tabeli w klastrze
Utwórz tabelę, która będzie zgodna ze schematem danych w pliku StormEvents.csv. Po uruchomieniu tego kodu zwraca komunikat podobny do następującego: Aby się zalogować, użyj przeglądarki internetowej, aby otworzyć stronę https://microsoft.com/devicelogin i wprowadzić kod F3W4VWZDM do uwierzytelnienia. Postępuj zgodnie z instrukcjami, aby się zalogować, a następnie wróć w celu uruchomienia kolejnego bloku kodu. Kolejne bloki kodu umożliwiające nawiązanie połączenia wymagają ponownego zalogowania.
KUSTO_CLIENT = KustoClient(KCSB_DATA)
CREATE_TABLE_COMMAND = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)"
RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_TABLE_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
Definiowanie mapowania pozyskiwania
Zmapuj przychodzące dane CSV na nazwy kolumn i typy danych używane podczas tworzenia tabeli. Spowoduje to zamapowanie pól danych źródłowych na kolumny tabeli docelowej
CREATE_MAPPING_COMMAND = """.create table StormEvents ingestion csv mapping 'StormEvents_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EpisodeId","datatype":"int","Ordinal":2},{"Name":"EventId","datatype":"int","Ordinal":3},{"Name":"State","datatype":"string","Ordinal":4},{"Name":"EventType","datatype":"string","Ordinal":5},{"Name":"InjuriesDirect","datatype":"int","Ordinal":6},{"Name":"InjuriesIndirect","datatype":"int","Ordinal":7},{"Name":"DeathsDirect","datatype":"int","Ordinal":8},{"Name":"DeathsIndirect","datatype":"int","Ordinal":9},{"Name":"DamageProperty","datatype":"int","Ordinal":10},{"Name":"DamageCrops","datatype":"int","Ordinal":11},{"Name":"Source","datatype":"string","Ordinal":12},{"Name":"BeginLocation","datatype":"string","Ordinal":13},{"Name":"EndLocation","datatype":"string","Ordinal":14},{"Name":"BeginLat","datatype":"real","Ordinal":16},{"Name":"BeginLon","datatype":"real","Ordinal":17},{"Name":"EndLat","datatype":"real","Ordinal":18},{"Name":"EndLon","datatype":"real","Ordinal":19},{"Name":"EpisodeNarrative","datatype":"string","Ordinal":20},{"Name":"EventNarrative","datatype":"string","Ordinal":21},{"Name":"StormSummary","datatype":"dynamic","Ordinal":22}]'"""
RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_MAPPING_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
Wysyłanie komunikatu do kolejki w celu pozyskiwania
Wyślij komunikat do kolejki, aby ściągnąć dane z magazynu obiektów blob i pozyskać dane do usługi Azure Data Explorer.
INGESTION_CLIENT = QueuedIngestClient(KCSB_INGEST)
# All ingestion properties are documented here: https://learn.microsoft.com/azure/kusto/management/data-ingest#ingestion-properties
INGESTION_PROPERTIES = IngestionProperties(database=KUSTO_DATABASE, table=DESTINATION_TABLE, data_format=DataFormat.CSV,
ingestion_mapping_reference=DESTINATION_TABLE_COLUMN_MAPPING, additional_properties={'ignoreFirstRecord': 'true'})
# FILE_SIZE is the raw size of the data in bytes
BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)
INGESTION_CLIENT.ingest_from_blob(
BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)
print('Done queuing up ingestion with Azure Data Explorer')
Wykonywanie zapytań o dane pozyskane do tabeli
Poczekaj od pięciu do 10 minut na zaplanowanie pozyskiwania w kolejce i załadowanie danych do usługi Azure Data Explorer. Następnie uruchom następujący kod, aby uzyskać liczbę rekordów w tabeli StormEvents.
QUERY = "StormEvents | count"
RESPONSE = KUSTO_CLIENT.execute_query(KUSTO_DATABASE, QUERY)
dataframe_from_result_table(RESPONSE.primary_results[0])
Uruchamianie zapytań dotyczących rozwiązywania problemów
Zaloguj się do portalu https://dataexplorer.azure.com i nawiąż połączenie z klastrem. Uruchom następujące polecenie w bazie danych, aby sprawdzić, czy wystąpiły jakieś niepowodzenia pozyskiwania w ciągu ostatnich czterech godzin. Przed uruchomieniem zastąp nazwę bazy danych.
.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"
Uruchom następujące polecenie, aby wyświetlić stan wszystkich operacji pozyskiwania z ostatnich czterech godzin. Przed uruchomieniem zastąp nazwę bazy danych.
.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Table == "StormEvents" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId
Czyszczenie zasobów
Jeśli planujesz postępować zgodnie z innymi artykułami, zachowaj utworzone zasoby. W przeciwnym razie uruchom następujące polecenie w bazie danych, aby wyczyścić tabelę StormEvents.
.drop table StormEvents