Nota
O acesso a esta página requer autorização. Pode tentar iniciar sessão ou alterar os diretórios.
O acesso a esta página requer autorização. Pode tentar alterar os diretórios.
Neste artigo, você ingere dados usando a biblioteca Python do Azure Data Explorer. O Azure Data Explorer é um serviço de exploração de dados rápido e altamente escalável para dados de log e telemetria. O Azure Data Explorer fornece duas bibliotecas de cliente para Python: uma biblioteca de ingestão e uma biblioteca de dados. Essas bibliotecas permitem que você ingera ou carregue dados em um cluster e consulte dados do seu código.
Primeiro, crie uma tabela e um mapeamento de dados em um cluster. Em seguida, você enfileirou a ingestão no cluster e validou os resultados.
Pré-requisitos
- Uma conta Microsoft ou uma identidade de utilizador do Microsoft Entra. Uma assinatura do Azure não é necessária.
- Um cluster e um banco de dados do Azure Data Explorer. Crie um cluster e um banco de dados.
- Python 3.4+.
Instalar as bibliotecas de dados e de ingestão de dados
Instale azure-kusto-data e azure-kusto-ingest.
pip install azure-kusto-data
pip install azure-kusto-ingest
Adicionar instruções de importação e constantes
Importe classes de azure-kusto-data.
from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
from azure.kusto.data.exceptions import KustoServiceError
from azure.kusto.data.helpers import dataframe_from_result_table
Para autenticar um aplicativo, o Azure Data Explorer usa sua ID de locatário do Microsoft Entra. Para encontrar o seu ID de inquilino, utilize o seguinte URL, substituindo o seu domínio por YourDomain.
https://login.microsoftonline.com/<YourDomain>/.well-known/openid-configuration/
Por exemplo, se o seu domínio for contoso.com, o URL é: https://login.microsoftonline.com/contoso.com/.well-known/openid-configuration/. Clique neste URL para ver os resultados; A primeira linha é a seguinte.
"authorization_endpoint":"https://login.microsoftonline.com/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"
O ID do inquilino, neste caso, é aaaabbbb-0000-cccc-1111-dddd2222eeee
. Defina os valores para AAD_TENANT_ID, KUSTO_URI, KUSTO_INGEST_URI e KUSTO_DATABASE antes de executar este código.
AAD_TENANT_ID = "<TenantId>"
KUSTO_URI = "https://<ClusterName>.<Region>.kusto.windows.net/"
KUSTO_INGEST_URI = "https://ingest-<ClusterName>.<Region>.kusto.windows.net/"
KUSTO_DATABASE = "<DatabaseName>"
Agora construa a cadeia de conexão. O exemplo a seguir usa a autenticação de dispositivo para acessar o cluster. Você também pode usar autenticação de identidade gerenciada , certificado de aplicativo Microsoft Entra, chave de aplicativo Microsoft Entra e usuário e senha do Microsoft Entra.
Você cria a tabela de destino e o mapeamento em uma etapa posterior.
KCSB_INGEST = KustoConnectionStringBuilder.with_interactive_login(
KUSTO_INGEST_URI)
KCSB_DATA = KustoConnectionStringBuilder.with_interactive_login(
KUSTO_URI)
DESTINATION_TABLE = "StormEvents"
DESTINATION_TABLE_COLUMN_MAPPING = "StormEvents_CSV_Mapping"
Definir informações do arquivo de origem
Importe classes adicionais e defina constantes para o arquivo de fonte de dados. Este exemplo usa um arquivo de exemplo hospedado no Armazenamento de Blob do Azure. O conjunto de dados de exemplo do StormEvents contém dados relacionados com o clima dos Centros Nacionais de Informação Ambiental.
from azure.kusto.data import DataFormat
from azure.kusto.ingest import QueuedIngestClient, IngestionProperties, FileDescriptor, BlobDescriptor, DataFormat, ReportLevel, ReportMethod
CONTAINER = "samplefiles"
ACCOUNT_NAME = "kustosamples"
SAS_TOKEN = "" # If relevant add SAS token
FILE_PATH = "StormEvents.csv"
FILE_SIZE = 64158321 # in bytes
BLOB_PATH = "https://" + ACCOUNT_NAME + ".blob.core.windows.net/" + \
CONTAINER + "/" + FILE_PATH + SAS_TOKEN
Criar uma tabela no cluster
Crie uma tabela que corresponda ao esquema dos dados no arquivo StormEvents.csv. Quando esse código é executado, ele retorna uma mensagem como a seguinte mensagem: Para entrar, use um navegador da Web para abrir a página https://microsoft.com/devicelogin e insira o código F3W4VWZDM autenticar. Siga os passos para iniciar sessão e, em seguida, regresse para executar o próximo bloco de código. Os blocos de código subsequentes que fazem uma conexão exigem que você entre novamente.
KUSTO_CLIENT = KustoClient(KCSB_DATA)
CREATE_TABLE_COMMAND = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)"
RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_TABLE_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
Definir o mapeamento de ingestão
Mapeie os dados CSV de entrada para os nomes de colunas e tipos de dados usados ao criar a tabela. Isso mapeia os campos de dados de origem para as colunas da tabela de destino
CREATE_MAPPING_COMMAND = """.create table StormEvents ingestion csv mapping 'StormEvents_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EpisodeId","datatype":"int","Ordinal":2},{"Name":"EventId","datatype":"int","Ordinal":3},{"Name":"State","datatype":"string","Ordinal":4},{"Name":"EventType","datatype":"string","Ordinal":5},{"Name":"InjuriesDirect","datatype":"int","Ordinal":6},{"Name":"InjuriesIndirect","datatype":"int","Ordinal":7},{"Name":"DeathsDirect","datatype":"int","Ordinal":8},{"Name":"DeathsIndirect","datatype":"int","Ordinal":9},{"Name":"DamageProperty","datatype":"int","Ordinal":10},{"Name":"DamageCrops","datatype":"int","Ordinal":11},{"Name":"Source","datatype":"string","Ordinal":12},{"Name":"BeginLocation","datatype":"string","Ordinal":13},{"Name":"EndLocation","datatype":"string","Ordinal":14},{"Name":"BeginLat","datatype":"real","Ordinal":16},{"Name":"BeginLon","datatype":"real","Ordinal":17},{"Name":"EndLat","datatype":"real","Ordinal":18},{"Name":"EndLon","datatype":"real","Ordinal":19},{"Name":"EpisodeNarrative","datatype":"string","Ordinal":20},{"Name":"EventNarrative","datatype":"string","Ordinal":21},{"Name":"StormSummary","datatype":"dynamic","Ordinal":22}]'"""
RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_MAPPING_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
Enfileirar uma mensagem para processamento
Enfileire uma mensagem para extrair dados do armazenamento de blob e ingerir esses dados no Azure Data Explorer.
INGESTION_CLIENT = QueuedIngestClient(KCSB_INGEST)
# All ingestion properties are documented here: https://learn.microsoft.com/azure/kusto/management/data-ingest#ingestion-properties
INGESTION_PROPERTIES = IngestionProperties(database=KUSTO_DATABASE, table=DESTINATION_TABLE, data_format=DataFormat.CSV,
ingestion_mapping_reference=DESTINATION_TABLE_COLUMN_MAPPING, additional_properties={'ignoreFirstRecord': 'true'})
# FILE_SIZE is the raw size of the data in bytes
BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)
INGESTION_CLIENT.ingest_from_blob(
BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)
print('Done queuing up ingestion with Azure Data Explorer')
Dados de consulta que foram ingeridos na tabela
Espere entre cinco a dez minutos para que a ingestão em espera agende a ingestão e carregue os dados no Azure Data Explorer. Em seguida, execute o código a seguir para obter a contagem de registros na tabela StormEvents.
QUERY = "StormEvents | count"
RESPONSE = KUSTO_CLIENT.execute_query(KUSTO_DATABASE, QUERY)
dataframe_from_result_table(RESPONSE.primary_results[0])
Executar consultas de solução de problemas
Inicie sessão em https://dataexplorer.azure.com e ligue-se ao cluster. Execute o seguinte comando em seu banco de dados para ver se houve alguma falha de ingestão nas últimas quatro horas. Substitua o nome do banco de dados antes de executar.
.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"
Execute o seguinte comando para visualizar o status de todas as operações de ingestão nas últimas quatro horas. Substitua o nome do banco de dados antes de executar.
.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Table == "StormEvents" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId
Limpeza de recursos
Se você pretende seguir nossos outros artigos, mantenha os recursos que você criou. Caso contrário, execute o seguinte comando em seu banco de dados para limpar a tabela StormEvents.
.drop table StormEvents