Nota
O acesso a esta página requer autorização. Pode tentar iniciar sessão ou alterar os diretórios.
O acesso a esta página requer autorização. Pode tentar alterar os diretórios.
Há duas bibliotecas de cliente para .NET: uma biblioteca de ingestão e uma biblioteca de dados. Para obter mais informações sobre o SDK do .NET, consulte sobre o SDK do .NET. Essas bibliotecas permitem que você ingira (carregue) dados em um cluster e consulte dados do seu código. Neste artigo, você primeiro cria uma tabela e um mapeamento de dados em um cluster de teste. A seguir, tu enfileiras uma ingestão no cluster e validas os resultados.
Pré-requisitos
- Uma conta Microsoft ou uma identidade de utilizador do Microsoft Entra. Uma assinatura do Azure não é necessária.
- Um cluster e um banco de dados. Crie um cluster e um banco de dados.
Instalar a biblioteca de importação
Install-Package Microsoft.Azure.Kusto.Ingest
Adicionar autenticação e construir cadeia de conexão
Autenticação
Para autenticar um aplicativo, o SDK usa sua ID de locatário do Microsoft Entra. Para encontrar o seu ID de inquilino, utilize o seguinte URL, substituindo o seu domínio por YourDomain.
https://login.microsoftonline.com/<YourDomain>/.well-known/openid-configuration/
Por exemplo, se o seu domínio for contoso.com, o URL é: https://login.microsoftonline.com/contoso.com/.well-known/openid-configuration/. Clique neste URL para ver os resultados; A primeira linha é a seguinte.
"authorization_endpoint":"https://login.microsoftonline.com/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"
O ID do inquilino, neste caso, é aaaabbbb-0000-cccc-1111-dddd2222eeee
.
Este exemplo usa uma autenticação de usuário interativa do Microsoft Entra para acessar o cluster. Você também pode usar a autenticação do aplicativo Microsoft Entra com certificado ou segredo do aplicativo. Certifique-se de definir os valores corretos para tenantId
e clusterUri
antes de executar este código.
O SDK fornece uma maneira conveniente de configurar o método de autenticação como parte da cadeia de conexão. Para obter documentação completa sobre cadeias de conexão, consulte cadeias de conexão.
Observação
A versão atual do SDK não oferece suporte à autenticação interativa do usuário no .NET Core. Se necessário, use o nome de utilizador/senha do Microsoft Entra ou a autenticação da aplicação.
Construir a cadeia de conexão
Agora você pode construir a cadeia de conexão. Você criará a tabela de destino e o mapeamento em uma etapa posterior.
var kustoUri = "https://<ClusterName>.<Region>.kusto.windows.net/";
var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var kustoConnectionStringBuilder = new KustoConnectionStringBuilder(kustoUri).WithAadUserPromptAuthentication(tenantId);
Definir informações do arquivo de origem
Defina o caminho para o arquivo de origem. Este exemplo usa um arquivo de exemplo hospedado no Armazenamento de Blob do Azure. O conjunto de dados de exemplo do StormEvents contém dados relacionados com o clima dos Centros Nacionais de Informação Ambiental.
var blobPath = "https://kustosamples.blob.core.windows.net/samplefiles/StormEvents.csv";
Criar uma tabela no cluster de teste
Crie uma tabela nomeada StormEvents
que corresponda ao esquema dos dados no StormEvents.csv
arquivo.
Sugestão
Os trechos de código a seguir criam uma instância de um cliente para quase todas as chamadas. Isso é feito para tornar cada trecho executável individualmente. Na produção, as instâncias do cliente são reentrantes e devem ser mantidas pelo tempo necessário. Uma única instância de cliente por URI é suficiente, mesmo quando se trabalha com vários bancos de dados (o banco de dados pode ser especificado em um nível de comando).
var databaseName = "<DatabaseName>";
var tableName = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command = CslCommandGenerator.GenerateTableCreateCommand(
tableName,
new[]
{
Tuple.Create("StartTime", "System.DateTime"),
Tuple.Create("EndTime", "System.DateTime"),
Tuple.Create("EpisodeId", "System.Int32"),
Tuple.Create("EventId", "System.Int32"),
Tuple.Create("State", "System.String"),
Tuple.Create("EventType", "System.String"),
Tuple.Create("InjuriesDirect", "System.Int32"),
Tuple.Create("InjuriesIndirect", "System.Int32"),
Tuple.Create("DeathsDirect", "System.Int32"),
Tuple.Create("DeathsIndirect", "System.Int32"),
Tuple.Create("DamageProperty", "System.Int32"),
Tuple.Create("DamageCrops", "System.Int32"),
Tuple.Create("Source", "System.String"),
Tuple.Create("BeginLocation", "System.String"),
Tuple.Create("EndLocation", "System.String"),
Tuple.Create("BeginLat", "System.Double"),
Tuple.Create("BeginLon", "System.Double"),
Tuple.Create("EndLat", "System.Double"),
Tuple.Create("EndLon", "System.Double"),
Tuple.Create("EpisodeNarrative", "System.String"),
Tuple.Create("EventNarrative", "System.String"),
Tuple.Create("StormSummary", "System.Object"),
}
);
await kustoClient.ExecuteControlCommandAsync(databaseName, command);
}
Definir o mapeamento de ingestão
Mapeie os dados CSV de entrada para os nomes de coluna usados ao criar a tabela. Provisionar um objeto de mapeamento de coluna CSV nessa tabela.
var tableMappingName = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command = CslCommandGenerator.GenerateTableMappingCreateCommand(
IngestionMappingKind.Csv,
tableName,
tableMappingName,
new ColumnMapping[]
{
new() { ColumnName = "StartTime", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "0" } } },
new() { ColumnName = "EndTime", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "1" } } },
new() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "2" } } },
new() { ColumnName = "EventId", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "3" } } },
new() { ColumnName = "State", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "4" } } },
new() { ColumnName = "EventType", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "5" } } },
new() { ColumnName = "InjuriesDirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "6" } } },
new() { ColumnName = "InjuriesIndirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "7" } } },
new() { ColumnName = "DeathsDirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "8" } } },
new() { ColumnName = "DeathsIndirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "9" } } },
new() { ColumnName = "DamageProperty", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "10" } } },
new() { ColumnName = "DamageCrops", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "11" } } },
new() { ColumnName = "Source", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "12" } } },
new() { ColumnName = "BeginLocation", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "13" } } },
new() { ColumnName = "EndLocation", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "14" } } },
new() { ColumnName = "BeginLat", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "15" } } },
new() { ColumnName = "BeginLon", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "16" } } },
new() { ColumnName = "EndLat", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "17" } } },
new() { ColumnName = "EndLon", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "18" } } },
new() { ColumnName = "EpisodeNarrative", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "19" } } },
new() { ColumnName = "EventNarrative", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "20" } } },
new() { ColumnName = "StormSummary", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "21" } } }
}
);
await kustoClient.ExecuteControlCommandAsync(databaseName, command);
}
Definir a política de agrupamento para a sua tabela
O envio em lote de dados recebidos otimiza o tamanho do fragmento de dados, que é controlado pela política de agrupamento de ingestão. Modifique a política com o comando de gerenciamento de política de lotes de ingestão. Use esta política para reduzir a latência dos dados que chegam lentamente.
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command = CslCommandGenerator.GenerateTableAlterIngestionBatchingPolicyCommand(
databaseName,
tableName,
new IngestionBatchingPolicy(
maximumBatchingTimeSpan: TimeSpan.FromSeconds(10),
maximumNumberOfItems: 100,
maximumRawDataSizeMB: 1024
)
);
kustoClient.ExecuteControlCommand(command);
}
Recomendamos definir um Raw Data Size
valor para os dados ingeridos e diminuir incrementalmente o tamanho para 250 MB, enquanto verificamos se o desempenho melhora.
Você pode usar a propriedade Flush Immediately
para ignorar o processamento em lote, embora isso não seja recomendado para ingestão em grande escala, pois pode resultar em um desempenho insatisfatório.
Enfileirar uma mensagem para processamento
Enfileire uma mensagem para extrair dados do armazenamento de blob e ingerir os dados. Uma ligação é estabelecida ao cluster de ingestão e outro cliente é criado para trabalhar com esse endpoint.
Sugestão
Os trechos de código a seguir criam uma instância de um cliente para quase todas as chamadas. Isso é feito para tornar cada trecho executável individualmente. Na produção, as instâncias do cliente são reentrantes e devem ser mantidas pelo tempo necessário. Uma única instância de cliente por URI é suficiente, mesmo quando se trabalha com vários bancos de dados (o banco de dados pode ser especificado em um nível de comando).
var ingestUri = "https://ingest-<clusterName>.<region>.kusto.windows.net";
var ingestConnectionStringBuilder = new KustoConnectionStringBuilder(ingestUri).WithAadUserPromptAuthentication(tenantId);
using var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestConnectionStringBuilder);
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
Format = DataSourceFormat.csv,
IngestionMapping = new IngestionMapping
{
IngestionMappingReference = tableMappingName,
IngestionMappingKind = IngestionMappingKind.Csv
},
IgnoreFirstRecord = true
};
await ingestClient.IngestFromStorageAsync(blobPath, properties);
Validar se os dados foram ingeridos na tabela
Espere de cinco a dez minutos para que a ingestão enfileirada seja agendada e os dados sejam carregados no seu cluster. Em seguida, execute o código a seguir para obter a contagem de registros na StormEvents
tabela.
using var cslQueryProvider = KustoClientFactory.CreateCslQueryProvider(kustoConnectionStringBuilder);
var query = $"{tableName} | count";
var results = cslQueryProvider.ExecuteQuery<long>(databaseName, query);
Console.WriteLine(results.Single());
Executar consultas de solução de problemas
Inicie sessão em https://dataexplorer.azure.com e ligue-se ao cluster. Execute o seguinte comando em seu banco de dados para ver se houve alguma falha de ingestão nas últimas quatro horas. Substitua o nome do banco de dados antes de executar.
.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"
Execute o seguinte comando para visualizar o status de todas as operações de ingestão nas últimas quatro horas. Substitua o nome do banco de dados antes de executar.
.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId
Limpeza de recursos
Se você pretende seguir nossos outros artigos, mantenha os recursos que você criou. Caso contrário, execute o seguinte comando no banco de dados para limpar a StormEvents
tabela.
.drop table StormEvents