Partilhar via


Ingerir dados usando o Kusto .NET SDK

Há duas bibliotecas de cliente para .NET: uma biblioteca de ingestão e uma biblioteca de dados. Para obter mais informações sobre o SDK do .NET, consulte sobre o SDK do .NET. Essas bibliotecas permitem que você ingira (carregue) dados em um cluster e consulte dados do seu código. Neste artigo, você primeiro cria uma tabela e um mapeamento de dados em um cluster de teste. A seguir, tu enfileiras uma ingestão no cluster e validas os resultados.

Pré-requisitos

  • Uma conta Microsoft ou uma identidade de utilizador do Microsoft Entra. Uma assinatura do Azure não é necessária.
  • Um cluster e um banco de dados. Crie um cluster e um banco de dados.

Instalar a biblioteca de importação

Install-Package Microsoft.Azure.Kusto.Ingest

Adicionar autenticação e construir cadeia de conexão

Autenticação

Para autenticar um aplicativo, o SDK usa sua ID de locatário do Microsoft Entra. Para encontrar o seu ID de inquilino, utilize o seguinte URL, substituindo o seu domínio por YourDomain.

https://login.microsoftonline.com/<YourDomain>/.well-known/openid-configuration/

Por exemplo, se o seu domínio for contoso.com, o URL é: https://login.microsoftonline.com/contoso.com/.well-known/openid-configuration/. Clique neste URL para ver os resultados; A primeira linha é a seguinte.

"authorization_endpoint":"https://login.microsoftonline.com/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"

O ID do inquilino, neste caso, é aaaabbbb-0000-cccc-1111-dddd2222eeee.

Este exemplo usa uma autenticação de usuário interativa do Microsoft Entra para acessar o cluster. Você também pode usar a autenticação do aplicativo Microsoft Entra com certificado ou segredo do aplicativo. Certifique-se de definir os valores corretos para tenantId e clusterUri antes de executar este código.

O SDK fornece uma maneira conveniente de configurar o método de autenticação como parte da cadeia de conexão. Para obter documentação completa sobre cadeias de conexão, consulte cadeias de conexão.

Observação

A versão atual do SDK não oferece suporte à autenticação interativa do usuário no .NET Core. Se necessário, use o nome de utilizador/senha do Microsoft Entra ou a autenticação da aplicação.

Construir a cadeia de conexão

Agora você pode construir a cadeia de conexão. Você criará a tabela de destino e o mapeamento em uma etapa posterior.

var kustoUri = "https://<ClusterName>.<Region>.kusto.windows.net/";
var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var kustoConnectionStringBuilder = new KustoConnectionStringBuilder(kustoUri).WithAadUserPromptAuthentication(tenantId);

Definir informações do arquivo de origem

Defina o caminho para o arquivo de origem. Este exemplo usa um arquivo de exemplo hospedado no Armazenamento de Blob do Azure. O conjunto de dados de exemplo do StormEvents contém dados relacionados com o clima dos Centros Nacionais de Informação Ambiental.

var blobPath = "https://kustosamples.blob.core.windows.net/samplefiles/StormEvents.csv";

Criar uma tabela no cluster de teste

Crie uma tabela nomeada StormEvents que corresponda ao esquema dos dados no StormEvents.csv arquivo.

Sugestão

Os trechos de código a seguir criam uma instância de um cliente para quase todas as chamadas. Isso é feito para tornar cada trecho executável individualmente. Na produção, as instâncias do cliente são reentrantes e devem ser mantidas pelo tempo necessário. Uma única instância de cliente por URI é suficiente, mesmo quando se trabalha com vários bancos de dados (o banco de dados pode ser especificado em um nível de comando).

var databaseName = "<DatabaseName>";
var tableName = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command = CslCommandGenerator.GenerateTableCreateCommand(
        tableName,
        new[]
        {
            Tuple.Create("StartTime", "System.DateTime"),
            Tuple.Create("EndTime", "System.DateTime"),
            Tuple.Create("EpisodeId", "System.Int32"),
            Tuple.Create("EventId", "System.Int32"),
            Tuple.Create("State", "System.String"),
            Tuple.Create("EventType", "System.String"),
            Tuple.Create("InjuriesDirect", "System.Int32"),
            Tuple.Create("InjuriesIndirect", "System.Int32"),
            Tuple.Create("DeathsDirect", "System.Int32"),
            Tuple.Create("DeathsIndirect", "System.Int32"),
            Tuple.Create("DamageProperty", "System.Int32"),
            Tuple.Create("DamageCrops", "System.Int32"),
            Tuple.Create("Source", "System.String"),
            Tuple.Create("BeginLocation", "System.String"),
            Tuple.Create("EndLocation", "System.String"),
            Tuple.Create("BeginLat", "System.Double"),
            Tuple.Create("BeginLon", "System.Double"),
            Tuple.Create("EndLat", "System.Double"),
            Tuple.Create("EndLon", "System.Double"),
            Tuple.Create("EpisodeNarrative", "System.String"),
            Tuple.Create("EventNarrative", "System.String"),
            Tuple.Create("StormSummary", "System.Object"),
        }
    );
    await kustoClient.ExecuteControlCommandAsync(databaseName, command);
}

Definir o mapeamento de ingestão

Mapeie os dados CSV de entrada para os nomes de coluna usados ao criar a tabela. Provisionar um objeto de mapeamento de coluna CSV nessa tabela.

var tableMappingName = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command = CslCommandGenerator.GenerateTableMappingCreateCommand(
        IngestionMappingKind.Csv,
        tableName,
        tableMappingName,
        new ColumnMapping[]
        {
            new() { ColumnName = "StartTime", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "0" } } },
            new() { ColumnName = "EndTime", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "1" } } },
            new() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "2" } } },
            new() { ColumnName = "EventId", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "3" } } },
            new() { ColumnName = "State", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "4" } } },
            new() { ColumnName = "EventType", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "5" } } },
            new() { ColumnName = "InjuriesDirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "6" } } },
            new() { ColumnName = "InjuriesIndirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "7" } } },
            new() { ColumnName = "DeathsDirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "8" } } },
            new() { ColumnName = "DeathsIndirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "9" } } },
            new() { ColumnName = "DamageProperty", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "10" } } },
            new() { ColumnName = "DamageCrops", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "11" } } },
            new() { ColumnName = "Source", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "12" } } },
            new() { ColumnName = "BeginLocation", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "13" } } },
            new() { ColumnName = "EndLocation", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "14" } } },
            new() { ColumnName = "BeginLat", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "15" } } },
            new() { ColumnName = "BeginLon", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "16" } } },
            new() { ColumnName = "EndLat", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "17" } } },
            new() { ColumnName = "EndLon", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "18" } } },
            new() { ColumnName = "EpisodeNarrative", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "19" } } },
            new() { ColumnName = "EventNarrative", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "20" } } },
            new() { ColumnName = "StormSummary", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "21" } } }
        }
    );
    
    await kustoClient.ExecuteControlCommandAsync(databaseName, command);
}

Definir a política de agrupamento para a sua tabela

O envio em lote de dados recebidos otimiza o tamanho do fragmento de dados, que é controlado pela política de agrupamento de ingestão. Modifique a política com o comando de gerenciamento de política de lotes de ingestão. Use esta política para reduzir a latência dos dados que chegam lentamente.

using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command = CslCommandGenerator.GenerateTableAlterIngestionBatchingPolicyCommand(
        databaseName,
        tableName,
        new IngestionBatchingPolicy(
            maximumBatchingTimeSpan: TimeSpan.FromSeconds(10),
            maximumNumberOfItems: 100,
            maximumRawDataSizeMB: 1024
        )
    );
    kustoClient.ExecuteControlCommand(command);
}

Recomendamos definir um Raw Data Size valor para os dados ingeridos e diminuir incrementalmente o tamanho para 250 MB, enquanto verificamos se o desempenho melhora.

Você pode usar a propriedade Flush Immediately para ignorar o processamento em lote, embora isso não seja recomendado para ingestão em grande escala, pois pode resultar em um desempenho insatisfatório.

Enfileirar uma mensagem para processamento

Enfileire uma mensagem para extrair dados do armazenamento de blob e ingerir os dados. Uma ligação é estabelecida ao cluster de ingestão e outro cliente é criado para trabalhar com esse endpoint.

Sugestão

Os trechos de código a seguir criam uma instância de um cliente para quase todas as chamadas. Isso é feito para tornar cada trecho executável individualmente. Na produção, as instâncias do cliente são reentrantes e devem ser mantidas pelo tempo necessário. Uma única instância de cliente por URI é suficiente, mesmo quando se trabalha com vários bancos de dados (o banco de dados pode ser especificado em um nível de comando).

var ingestUri = "https://ingest-<clusterName>.<region>.kusto.windows.net";
var ingestConnectionStringBuilder = new KustoConnectionStringBuilder(ingestUri).WithAadUserPromptAuthentication(tenantId);
using var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestConnectionStringBuilder);
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
    Format = DataSourceFormat.csv,
    IngestionMapping = new IngestionMapping
    {
        IngestionMappingReference = tableMappingName,
        IngestionMappingKind = IngestionMappingKind.Csv
    },
    IgnoreFirstRecord = true
};
await ingestClient.IngestFromStorageAsync(blobPath, properties);

Validar se os dados foram ingeridos na tabela

Espere de cinco a dez minutos para que a ingestão enfileirada seja agendada e os dados sejam carregados no seu cluster. Em seguida, execute o código a seguir para obter a contagem de registros na StormEvents tabela.

using var cslQueryProvider = KustoClientFactory.CreateCslQueryProvider(kustoConnectionStringBuilder);
var query = $"{tableName} | count";
var results = cslQueryProvider.ExecuteQuery<long>(databaseName, query);
Console.WriteLine(results.Single());

Executar consultas de solução de problemas

Inicie sessão em https://dataexplorer.azure.com e ligue-se ao cluster. Execute o seguinte comando em seu banco de dados para ver se houve alguma falha de ingestão nas últimas quatro horas. Substitua o nome do banco de dados antes de executar.

.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"

Execute o seguinte comando para visualizar o status de todas as operações de ingestão nas últimas quatro horas. Substitua o nome do banco de dados antes de executar.

.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId

Limpeza de recursos

Se você pretende seguir nossos outros artigos, mantenha os recursos que você criou. Caso contrário, execute o seguinte comando no banco de dados para limpar a StormEvents tabela.

.drop table StormEvents