Nota
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
En este artículo, ingerirá datos mediante la biblioteca de Python de Azure Data Explorer. El Explorador de datos de Azure es un servicio de exploración de datos altamente escalable y rápido para datos de telemetría y registro. Azure Data Explorer proporciona dos bibliotecas cliente para Python: una biblioteca de ingesta y una biblioteca de datos. Estas bibliotecas permiten ingerir o cargar datos en un clúster y consultar datos desde el código.
En primer lugar, cree una tabla y un mapeo de datos en un clúster. Usted luego pone en cola la ingestión al clúster y valida los resultados.
Prerrequisitos
- Una cuenta de Microsoft o una identidad de usuario de Microsoft Entra. No se requiere una suscripción de Azure.
- Un clúster y la base de datos de Azure Data Explorer. Cree un clúster y una base de datos.
- Python 3.4+.
Instalación de los datos y las bibliotecas de ingesta
Instale azure-kusto-data y azure-kusto-ingest.
pip install azure-kusto-data
pip install azure-kusto-ingest
Agregar instrucciones y constantes de importación
Importe clases de azure-kusto-data.
from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
from azure.kusto.data.exceptions import KustoServiceError
from azure.kusto.data.helpers import dataframe_from_result_table
Para autenticar una aplicación, Azure Data Explorer usa el identificador de inquilino de Microsoft Entra. Para encontrar su identificación de arrendatario, use la siguiente URL, reemplazando su dominio por YourDomain.
https://login.microsoftonline.com/<YourDomain>/.well-known/openid-configuration/
Por ejemplo, si el dominio es contoso.com, la dirección URL es: https://login.microsoftonline.com/contoso.com/.well-known/openid-configuration/. Haga clic en esta dirección URL para ver los resultados; la primera línea es la siguiente.
"authorization_endpoint":"https://login.microsoftonline.com/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"
El identificador de inquilino en este caso es aaaabbbb-0000-cccc-1111-dddd2222eeee
. Establezca los valores de AAD_TENANT_ID, KUSTO_URI, KUSTO_INGEST_URI y KUSTO_DATABASE antes de ejecutar este código.
AAD_TENANT_ID = "<TenantId>"
KUSTO_URI = "https://<ClusterName>.<Region>.kusto.windows.net/"
KUSTO_INGEST_URI = "https://ingest-<ClusterName>.<Region>.kusto.windows.net/"
KUSTO_DATABASE = "<DatabaseName>"
Ahora construya la cadena de conexión. En el ejemplo siguiente se usa la autenticación de dispositivos para acceder al clúster. También puede usar la autenticación de identidad administrada , el certificado de aplicación de Microsoft Entra, la clave de aplicación de Microsoft Entray el usuario y la contraseña de Microsoft Entra.
Cree la tabla de destino y la asignación en un paso posterior.
KCSB_INGEST = KustoConnectionStringBuilder.with_interactive_login(
KUSTO_INGEST_URI)
KCSB_DATA = KustoConnectionStringBuilder.with_interactive_login(
KUSTO_URI)
DESTINATION_TABLE = "StormEvents"
DESTINATION_TABLE_COLUMN_MAPPING = "StormEvents_CSV_Mapping"
Establecimiento de la información del archivo de origen
Importe clases adicionales y establezca constantes para el archivo de origen de datos. En este ejemplo se usa un archivo de ejemplo hospedado en Azure Blob Storage. El conjunto de datos de ejemplo StormEvents contiene datos relacionados con el tiempo de los Centros Nacionales de Información Ambiental.
from azure.kusto.data import DataFormat
from azure.kusto.ingest import QueuedIngestClient, IngestionProperties, FileDescriptor, BlobDescriptor, DataFormat, ReportLevel, ReportMethod
CONTAINER = "samplefiles"
ACCOUNT_NAME = "kustosamples"
SAS_TOKEN = "" # If relevant add SAS token
FILE_PATH = "StormEvents.csv"
FILE_SIZE = 64158321 # in bytes
BLOB_PATH = "https://" + ACCOUNT_NAME + ".blob.core.windows.net/" + \
CONTAINER + "/" + FILE_PATH + SAS_TOKEN
Creación de una tabla en el clúster
Cree una tabla que coincida con el esquema de los datos del archivo StormEvents.csv. Cuando se ejecuta este código, devuelve un mensaje similar al siguiente: Para iniciar sesión, use un explorador web para abrir la página https://microsoft.com/devicelogin y escriba el código F3W4VWZDM para autenticarse. Siga los pasos para iniciar sesión y vuelva a ejecutar el siguiente bloque de código. Los bloques de código posteriores que establecen una conexión requieren que vuelva a iniciar sesión.
KUSTO_CLIENT = KustoClient(KCSB_DATA)
CREATE_TABLE_COMMAND = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)"
RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_TABLE_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
Definir el mapeo de ingestión
Asigne los datos CSV entrantes a los nombres de columna y los tipos de datos usados al crear la tabla. Esto asigna campos de datos de origen a columnas de tabla de destino
CREATE_MAPPING_COMMAND = """.create table StormEvents ingestion csv mapping 'StormEvents_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EpisodeId","datatype":"int","Ordinal":2},{"Name":"EventId","datatype":"int","Ordinal":3},{"Name":"State","datatype":"string","Ordinal":4},{"Name":"EventType","datatype":"string","Ordinal":5},{"Name":"InjuriesDirect","datatype":"int","Ordinal":6},{"Name":"InjuriesIndirect","datatype":"int","Ordinal":7},{"Name":"DeathsDirect","datatype":"int","Ordinal":8},{"Name":"DeathsIndirect","datatype":"int","Ordinal":9},{"Name":"DamageProperty","datatype":"int","Ordinal":10},{"Name":"DamageCrops","datatype":"int","Ordinal":11},{"Name":"Source","datatype":"string","Ordinal":12},{"Name":"BeginLocation","datatype":"string","Ordinal":13},{"Name":"EndLocation","datatype":"string","Ordinal":14},{"Name":"BeginLat","datatype":"real","Ordinal":16},{"Name":"BeginLon","datatype":"real","Ordinal":17},{"Name":"EndLat","datatype":"real","Ordinal":18},{"Name":"EndLon","datatype":"real","Ordinal":19},{"Name":"EpisodeNarrative","datatype":"string","Ordinal":20},{"Name":"EventNarrative","datatype":"string","Ordinal":21},{"Name":"StormSummary","datatype":"dynamic","Ordinal":22}]'"""
RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_MAPPING_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
Poner en cola un mensaje para la ingestión
Poner en cola un mensaje para extraer datos de Blob Storage e ingerir esos datos en Azure Data Explorer.
INGESTION_CLIENT = QueuedIngestClient(KCSB_INGEST)
# All ingestion properties are documented here: https://learn.microsoft.com/azure/kusto/management/data-ingest#ingestion-properties
INGESTION_PROPERTIES = IngestionProperties(database=KUSTO_DATABASE, table=DESTINATION_TABLE, data_format=DataFormat.CSV,
ingestion_mapping_reference=DESTINATION_TABLE_COLUMN_MAPPING, additional_properties={'ignoreFirstRecord': 'true'})
# FILE_SIZE is the raw size of the data in bytes
BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)
INGESTION_CLIENT.ingest_from_blob(
BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)
print('Done queuing up ingestion with Azure Data Explorer')
Consulta datos que se han introducido en la tabla
Espere de cinco a diez minutos para que la ingesta en cola pueda ser programada y los datos sean cargados en Azure Data Explorer. A continuación, ejecute el código siguiente para obtener el recuento de registros en la tabla StormEvents.
QUERY = "StormEvents | count"
RESPONSE = KUSTO_CLIENT.execute_query(KUSTO_DATABASE, QUERY)
dataframe_from_result_table(RESPONSE.primary_results[0])
Ejecutar consultas para solucionar problemas
Inicie sesión en https://dataexplorer.azure.com y conéctese al clúster. Ejecute el siguiente comando en la base de datos para ver si hubo errores de ingesta en las últimas cuatro horas. Reemplace el nombre de la base de datos antes de ejecutarse.
.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"
Ejecute el comando siguiente para ver el estado de todas las operaciones de ingesta en las últimas cuatro horas. Reemplace el nombre de la base de datos antes de ejecutarse.
.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Table == "StormEvents" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId
Limpieza de recursos
Si tiene previsto seguir nuestros otros artículos, mantenga los recursos que ha creado. Si no es así, ejecute el siguiente comando en la base de datos para limpiar la tabla StormEvents.
.drop table StormEvents