Compartir a través de


Ingesta de datos mediante la biblioteca de Python de Azure Data Explorer

En este artículo, ingerirá datos mediante la biblioteca de Python de Azure Data Explorer. El Explorador de datos de Azure es un servicio de exploración de datos altamente escalable y rápido para datos de telemetría y registro. Azure Data Explorer proporciona dos bibliotecas cliente para Python: una biblioteca de ingesta y una biblioteca de datos. Estas bibliotecas permiten ingerir o cargar datos en un clúster y consultar datos desde el código.

En primer lugar, cree una tabla y un mapeo de datos en un clúster. Usted luego pone en cola la ingestión al clúster y valida los resultados.

Prerrequisitos

Instalación de los datos y las bibliotecas de ingesta

Instale azure-kusto-data y azure-kusto-ingest.

pip install azure-kusto-data
pip install azure-kusto-ingest

Agregar instrucciones y constantes de importación

Importe clases de azure-kusto-data.

from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
from azure.kusto.data.exceptions import KustoServiceError
from azure.kusto.data.helpers import dataframe_from_result_table

Para autenticar una aplicación, Azure Data Explorer usa el identificador de inquilino de Microsoft Entra. Para encontrar su identificación de arrendatario, use la siguiente URL, reemplazando su dominio por YourDomain.

https://login.microsoftonline.com/<YourDomain>/.well-known/openid-configuration/

Por ejemplo, si el dominio es contoso.com, la dirección URL es: https://login.microsoftonline.com/contoso.com/.well-known/openid-configuration/. Haga clic en esta dirección URL para ver los resultados; la primera línea es la siguiente.

"authorization_endpoint":"https://login.microsoftonline.com/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"

El identificador de inquilino en este caso es aaaabbbb-0000-cccc-1111-dddd2222eeee. Establezca los valores de AAD_TENANT_ID, KUSTO_URI, KUSTO_INGEST_URI y KUSTO_DATABASE antes de ejecutar este código.

AAD_TENANT_ID = "<TenantId>"
KUSTO_URI = "https://<ClusterName>.<Region>.kusto.windows.net/"
KUSTO_INGEST_URI = "https://ingest-<ClusterName>.<Region>.kusto.windows.net/"
KUSTO_DATABASE = "<DatabaseName>"

Ahora construya la cadena de conexión. En el ejemplo siguiente se usa la autenticación de dispositivos para acceder al clúster. También puede usar la autenticación de identidad administrada , el certificado de aplicación de Microsoft Entra, la clave de aplicación de Microsoft Entray el usuario y la contraseña de Microsoft Entra.

Cree la tabla de destino y la asignación en un paso posterior.

KCSB_INGEST = KustoConnectionStringBuilder.with_interactive_login(
    KUSTO_INGEST_URI)

KCSB_DATA = KustoConnectionStringBuilder.with_interactive_login(
    KUSTO_URI)

DESTINATION_TABLE = "StormEvents"
DESTINATION_TABLE_COLUMN_MAPPING = "StormEvents_CSV_Mapping"

Establecimiento de la información del archivo de origen

Importe clases adicionales y establezca constantes para el archivo de origen de datos. En este ejemplo se usa un archivo de ejemplo hospedado en Azure Blob Storage. El conjunto de datos de ejemplo StormEvents contiene datos relacionados con el tiempo de los Centros Nacionales de Información Ambiental.

from azure.kusto.data import DataFormat
from azure.kusto.ingest import QueuedIngestClient, IngestionProperties, FileDescriptor, BlobDescriptor, DataFormat, ReportLevel, ReportMethod

CONTAINER = "samplefiles"
ACCOUNT_NAME = "kustosamples"
SAS_TOKEN = ""  # If relevant add SAS token
FILE_PATH = "StormEvents.csv"
FILE_SIZE = 64158321    # in bytes

BLOB_PATH = "https://" + ACCOUNT_NAME + ".blob.core.windows.net/" + \
    CONTAINER + "/" + FILE_PATH + SAS_TOKEN

Creación de una tabla en el clúster

Cree una tabla que coincida con el esquema de los datos del archivo StormEvents.csv. Cuando se ejecuta este código, devuelve un mensaje similar al siguiente: Para iniciar sesión, use un explorador web para abrir la página https://microsoft.com/devicelogin y escriba el código F3W4VWZDM para autenticarse. Siga los pasos para iniciar sesión y vuelva a ejecutar el siguiente bloque de código. Los bloques de código posteriores que establecen una conexión requieren que vuelva a iniciar sesión.

KUSTO_CLIENT = KustoClient(KCSB_DATA)
CREATE_TABLE_COMMAND = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)"

RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_TABLE_COMMAND)

dataframe_from_result_table(RESPONSE.primary_results[0])

Definir el mapeo de ingestión

Asigne los datos CSV entrantes a los nombres de columna y los tipos de datos usados al crear la tabla. Esto asigna campos de datos de origen a columnas de tabla de destino

CREATE_MAPPING_COMMAND = """.create table StormEvents ingestion csv mapping 'StormEvents_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EpisodeId","datatype":"int","Ordinal":2},{"Name":"EventId","datatype":"int","Ordinal":3},{"Name":"State","datatype":"string","Ordinal":4},{"Name":"EventType","datatype":"string","Ordinal":5},{"Name":"InjuriesDirect","datatype":"int","Ordinal":6},{"Name":"InjuriesIndirect","datatype":"int","Ordinal":7},{"Name":"DeathsDirect","datatype":"int","Ordinal":8},{"Name":"DeathsIndirect","datatype":"int","Ordinal":9},{"Name":"DamageProperty","datatype":"int","Ordinal":10},{"Name":"DamageCrops","datatype":"int","Ordinal":11},{"Name":"Source","datatype":"string","Ordinal":12},{"Name":"BeginLocation","datatype":"string","Ordinal":13},{"Name":"EndLocation","datatype":"string","Ordinal":14},{"Name":"BeginLat","datatype":"real","Ordinal":16},{"Name":"BeginLon","datatype":"real","Ordinal":17},{"Name":"EndLat","datatype":"real","Ordinal":18},{"Name":"EndLon","datatype":"real","Ordinal":19},{"Name":"EpisodeNarrative","datatype":"string","Ordinal":20},{"Name":"EventNarrative","datatype":"string","Ordinal":21},{"Name":"StormSummary","datatype":"dynamic","Ordinal":22}]'"""

RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_MAPPING_COMMAND)

dataframe_from_result_table(RESPONSE.primary_results[0])

Poner en cola un mensaje para la ingestión

Poner en cola un mensaje para extraer datos de Blob Storage e ingerir esos datos en Azure Data Explorer.

INGESTION_CLIENT = QueuedIngestClient(KCSB_INGEST)

# All ingestion properties are documented here: https://learn.microsoft.com/azure/kusto/management/data-ingest#ingestion-properties
INGESTION_PROPERTIES = IngestionProperties(database=KUSTO_DATABASE, table=DESTINATION_TABLE, data_format=DataFormat.CSV,
                                           ingestion_mapping_reference=DESTINATION_TABLE_COLUMN_MAPPING, additional_properties={'ignoreFirstRecord': 'true'})
# FILE_SIZE is the raw size of the data in bytes
BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)
INGESTION_CLIENT.ingest_from_blob(
    BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)

print('Done queuing up ingestion with Azure Data Explorer')

Consulta datos que se han introducido en la tabla

Espere de cinco a diez minutos para que la ingesta en cola pueda ser programada y los datos sean cargados en Azure Data Explorer. A continuación, ejecute el código siguiente para obtener el recuento de registros en la tabla StormEvents.

QUERY = "StormEvents | count"

RESPONSE = KUSTO_CLIENT.execute_query(KUSTO_DATABASE, QUERY)

dataframe_from_result_table(RESPONSE.primary_results[0])

Ejecutar consultas para solucionar problemas

Inicie sesión en https://dataexplorer.azure.com y conéctese al clúster. Ejecute el siguiente comando en la base de datos para ver si hubo errores de ingesta en las últimas cuatro horas. Reemplace el nombre de la base de datos antes de ejecutarse.

.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"

Ejecute el comando siguiente para ver el estado de todas las operaciones de ingesta en las últimas cuatro horas. Reemplace el nombre de la base de datos antes de ejecutarse.

.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Table == "StormEvents" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId

Limpieza de recursos

Si tiene previsto seguir nuestros otros artículos, mantenga los recursos que ha creado. Si no es así, ejecute el siguiente comando en la base de datos para limpiar la tabla StormEvents.

.drop table StormEvents

Paso siguiente