Gegevens opnemen met behulp van de Python-bibliotheek voor Azure Data Explorer

In dit artikel neemt u gegevens op met behulp van de Azure Data Explorer Python-bibliotheek. Azure Data Explorer is een snelle en zeer schaalbare service voor gegevensverkenning voor telemetrische gegevens en gegevens uit logboeken. Azure Data Explorer biedt twee clientbibliotheken voor Python: een ingest-bibliotheek en een data-bibliotheek. Met deze bibliotheken kunt u gegevens opnemen of laden in een cluster en query's uitvoeren op gegevens uit uw code.

Maak eerst een tabel en gegevenstoewijzing in een cluster. Vervolgens plaatst u op te nemen gegevens in de wachtrij en valideert u de resultaten.

Vereisten

  • Een Microsoft-account of een Microsoft Entra gebruikersidentiteit. Een Azure-abonnement is niet vereist.
  • Een Azure Data Explorer-cluster en -database. Maak een cluster en database.
  • Python 3.4+.

De bibliotheken data en ingest installeren

Installeer azure-kusto-data en azure-kusto-ingest.

pip install azure-kusto-data
pip install azure-kusto-ingest

Importinstructies en constanten toevoegen

Klassen importeren uit azure-kusto-data.

from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
from azure.kusto.data.exceptions import KustoServiceError
from azure.kusto.data.helpers import dataframe_from_result_table

Azure Data Explorer gebruikt uw Microsoft Entra tenant-id om een toepassing te verifiëren. Gebruik de volgende URL om uw tenant-id te vinden, waarbij u uw domein vervangt door YourDomain.

https://login.microsoftonline.com/<YourDomain>/.well-known/openid-configuration/

Dus als uw domein contoso.com is, wordt de URL bijvoorbeeld: https://login.microsoftonline.com/contoso.com/.well-known/openid-configuration/. Klik op deze URL om de resultaten weer te geven. De eerste regel is als volgt.

"authorization_endpoint":"https://login.microsoftonline.com/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"

De tenant-id is in dit geval 6babcaad-604b-40ac-a9d7-9fd97c0b779f. Stel de waarden voor AAD_TENANT_ID, KUSTO_URI, KUSTO_INGEST_URI en KUSTO_DATABASE in voordat deze code wordt uitgevoerd.

AAD_TENANT_ID = "<TenantId>"
KUSTO_URI = "https://<ClusterName>.<Region>.kusto.windows.net/"
KUSTO_INGEST_URI = "https://ingest-<ClusterName>.<Region>.kusto.windows.net/"
KUSTO_DATABASE = "<DatabaseName>"

Nu maakt u de verbindingsreeks. In het volgende voorbeeld wordt apparaatverificatie gebruikt voor toegang tot het cluster. U kunt ook verificatie van beheerde identiteit gebruiken, Microsoft Entra toepassingscertificaat, Microsoft Entra toepassingssleutel en Microsoft Entra gebruiker en wachtwoord.

U maakt de doeltabel en toewijzing in een latere stap.

KCSB_INGEST = KustoConnectionStringBuilder.with_interactive_login(
    KUSTO_INGEST_URI)

KCSB_DATA = KustoConnectionStringBuilder.with_interactive_login(
    KUSTO_URI)

DESTINATION_TABLE = "StormEvents"
DESTINATION_TABLE_COLUMN_MAPPING = "StormEvents_CSV_Mapping"

Gegevens van bronbestand instellen

Importeer aanvullende klassen en stel constanten in voor het gegevensbronbestand. In dit voorbeeld wordt een voorbeeldbestand gebruikt dat wordt gehost in Azure Blob Storage. De StormEvents-voorbeeldgegevensset bevat weergerelateerde gegevens van de National Centers for Environmental Information.

from azure.kusto.data import DataFormat
from azure.kusto.ingest import QueuedIngestClient, IngestionProperties, FileDescriptor, BlobDescriptor, DataFormat, ReportLevel, ReportMethod

CONTAINER = "samplefiles"
ACCOUNT_NAME = "kustosamples"
SAS_TOKEN = ""  # If relevant add SAS token
FILE_PATH = "StormEvents.csv"
FILE_SIZE = 64158321    # in bytes

BLOB_PATH = "https://" + ACCOUNT_NAME + ".blob.core.windows.net/" + \
    CONTAINER + "/" + FILE_PATH + SAS_TOKEN

Een tabel maken in het cluster

Maak een tabel die overeenkomt met het schema van de gegevens in het bestand StormEvents.csv. Wanneer deze code wordt uitgevoerd, wordt een bericht geretourneerd dat lijkt op het volgende bericht: Als u zich wilt aanmelden, gebruikt u een webbrowser om de pagina https://microsoft.com/devicelogin te openen en voert u de code in F3W4VWZDM voor verificatie. Volg de stappen om u aan te melden en ga vervolgens terug om het volgende codeblok uit te voeren. Als volgende codeblokken verbinding moeten maken, moet u zich opnieuw aanmelden.

KUSTO_CLIENT = KustoClient(KCSB_DATA)
CREATE_TABLE_COMMAND = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)"

RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_TABLE_COMMAND)

dataframe_from_result_table(RESPONSE.primary_results[0])

Toewijzing van opname definiëren

Wijs binnenkomende CSV-gegevens toe aan de kolomnamen en gegevenstypen die zijn gebruikt bij het maken van de tabel. Hiermee worden brongegevensvelden toegewezen aan tabelkolommen op de bestemming

CREATE_MAPPING_COMMAND = """.create table StormEvents ingestion csv mapping 'StormEvents_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EpisodeId","datatype":"int","Ordinal":2},{"Name":"EventId","datatype":"int","Ordinal":3},{"Name":"State","datatype":"string","Ordinal":4},{"Name":"EventType","datatype":"string","Ordinal":5},{"Name":"InjuriesDirect","datatype":"int","Ordinal":6},{"Name":"InjuriesIndirect","datatype":"int","Ordinal":7},{"Name":"DeathsDirect","datatype":"int","Ordinal":8},{"Name":"DeathsIndirect","datatype":"int","Ordinal":9},{"Name":"DamageProperty","datatype":"int","Ordinal":10},{"Name":"DamageCrops","datatype":"int","Ordinal":11},{"Name":"Source","datatype":"string","Ordinal":12},{"Name":"BeginLocation","datatype":"string","Ordinal":13},{"Name":"EndLocation","datatype":"string","Ordinal":14},{"Name":"BeginLat","datatype":"real","Ordinal":16},{"Name":"BeginLon","datatype":"real","Ordinal":17},{"Name":"EndLat","datatype":"real","Ordinal":18},{"Name":"EndLon","datatype":"real","Ordinal":19},{"Name":"EpisodeNarrative","datatype":"string","Ordinal":20},{"Name":"EventNarrative","datatype":"string","Ordinal":21},{"Name":"StormSummary","datatype":"dynamic","Ordinal":22}]'"""

RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_MAPPING_COMMAND)

dataframe_from_result_table(RESPONSE.primary_results[0])

Bericht in wachtrij zetten voor opname

Zet een bericht in de wachtrij om gegevens op te halen uit blob-opslag en die gegevens op te nemen in Azure Data Explorer.

INGESTION_CLIENT = QueuedIngestClient(KCSB_INGEST)

# All ingestion properties are documented here: https://learn.microsoft.com/azure/kusto/management/data-ingest#ingestion-properties
INGESTION_PROPERTIES = IngestionProperties(database=KUSTO_DATABASE, table=DESTINATION_TABLE, data_format=DataFormat.CSV,
                                           ingestion_mapping_reference=DESTINATION_TABLE_COLUMN_MAPPING, additional_properties={'ignoreFirstRecord': 'true'})
# FILE_SIZE is the raw size of the data in bytes
BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)
INGESTION_CLIENT.ingest_from_blob(
    BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)

print('Done queuing up ingestion with Azure Data Explorer')

Gegevens opvragen die zijn opgenomen in de tabel

Wacht vijf tot 10 minuten totdat de opname in de wachtrij de opname heeft gepland en de gegevens in Azure Data Explorer zijn geladen. Voer vervolgens de volgende code uit om het aantal records in de tabel StormEvents te bepalen.

QUERY = "StormEvents | count"

RESPONSE = KUSTO_CLIENT.execute_query(KUSTO_DATABASE, QUERY)

dataframe_from_result_table(RESPONSE.primary_results[0])

Query's voor probleemoplossing uitvoeren

Meld u aan bij https://dataexplorer.azure.com en maak verbinding met uw cluster. Voer de volgende opdracht uit in uw database om te zien of er in de afgelopen vier uur fouten zijn opgetreden tijdens het opnemen van gegevens. Vervang de naam van de database voordat u de opdracht uitvoert.

.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"

Voer de volgende opdracht uit om de status op te vragen van alle bewerkingen voor het opnemen van gegevens van de afgelopen vier uur. Vervang de naam van de database voordat u de opdracht uitvoert.

.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Table == "StormEvents" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId

Resources opschonen

Als u van plan bent om onze andere artikelen te volgen, behoudt u de resources die u hebt gemaakt. Voer anders de volgende opdracht uit in uw database om de tabel StormEvents op te schonen.

.drop table StormEvents

Volgende stap