Kusto.Ingest alma kodu örnekleri
Bu kısa kod parçacıkları koleksiyonu, kusto tablosuna veri almayla ilgili çeşitli teknikleri gösterir.
Not
Bu örnekler, alma işleminin hemen ardından alma istemcisi yok edilmiş gibi görünür. Bunu kelimenin tam anlamıyla kabul etme. Alma istemcileri yeniden ve iş parçacığı açısından güvenlidir ve çok sayıda oluşturulmamalıdır. Alma istemci örneklerinin önerilen kardinalitesi, hedef Kusto kümesi başına barındırma işlemi başına bir tanedir.
Tek bir Azure blobundan zaman uyumsuz alma
Tek bir Azure blobundan zaman uyumsuz alım için isteğe bağlı RetryPolicy ile KustoQueuedIngestClient kullanın.
var ingestUri = "https://ingest-<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
//Create Kusto connection string with App Authentication
var kustoConnectionStringBuilderDM = new KustoConnectionStringBuilder(ingestUri)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
// Create an ingest client
// Note, that creating a separate instance per ingestion operation is an anti-pattern.
// IngestClient classes are thread-safe and intended for reuse
using var client = KustoIngestFactory.CreateQueuedIngestClient(
kustoConnectionStringBuilderDM,
// Create your custom retry policy, which will affect how the ingest client handles retrying on transient failures
new QueueOptions { MaxRetries = 0 }
);
var blobUriWithSasKey = "<blobUriWithSasKey>";
// Ingest from blobs according to the required properties
var kustoIngestionProperties = new KustoIngestionProperties("<databaseName>", "<tableName>");
var sourceOptions = new StorageSourceOptions { DeleteSourceOnSuccess = true };
await client.IngestFromStorageAsync(blobUriWithSasKey, kustoIngestionProperties, sourceOptions);
Yerel dosyadan alma
Yerel bir dosyadan almak için KustoDirectIngestClient kullanın.
Not
Sınırlı hacim ve düşük frekanslı alım için bu yöntemi öneririz.
var kustoUri = "https://<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
// Create Kusto connection string with App Authentication
var kustoConnectionStringBuilderEngine = new KustoConnectionStringBuilder(kustoUri)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
// Create a disposable client that will execute the ingestion
using var client = KustoIngestFactory.CreateDirectIngestClient(kustoConnectionStringBuilderEngine);
//Ingest from blobs according to the required properties
var kustoIngestionProperties = new KustoIngestionProperties("<databaseName>", "<tableName>");
await client.IngestFromStorageAsync("<filePath>", kustoIngestionProperties);
Yerel dosyalardan alma ve alımı doğrulama
Yerel dosyalardan almak ve ardından alımı doğrulamak için KustoQueuedIngestClient kullanın.
var ingestUri = "https://ingest-<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
//Create Kusto connection string with App Authentication
var kustoConnectionStringBuilderDM = new KustoConnectionStringBuilder(ingestUri)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
// Create a disposable client that will execute the ingestion
using var client = KustoIngestFactory.CreateQueuedIngestClient(kustoConnectionStringBuilderDM);
// Ingest from blobs according to the required properties
var kustoIngestionProperties = new KustoIngestionProperties("<databaseName>", "<tableName>");
await client.IngestFromStorageAsync("ValidTestFile.csv", kustoIngestionProperties);
await client.IngestFromStorageAsync("InvalidTestFile.csv", kustoIngestionProperties);
// Waiting for the aggregation
Thread.Sleep(TimeSpan.FromMinutes(8));
// Retrieve and validate failures
var ingestionFailures = await client.PeekTopIngestionFailuresAsync();
Ensure.IsTrue(ingestionFailures.Any(), "Failures expected");
// Retrieve, delete and validate failures
ingestionFailures = await client.GetAndDiscardTopIngestionFailuresAsync();
Ensure.IsTrue(ingestionFailures.Any(), "Failures expected");
Yerel dosyalardan alma ve bir kuyruğa rapor durumu
Yerel dosyalardan almak ve ardından durumu kuyruğa bildirmek için KustoQueuedIngestClient kullanın.
var ingestUri = "https://ingest-<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
//Create Kusto connection string with App Authentication
var kustoConnectionStringBuilderDM = new KustoConnectionStringBuilder(ingestUri)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
// Create a disposable client that will execute the ingestion
using var client = KustoIngestFactory.CreateQueuedIngestClient(kustoConnectionStringBuilderDM);
// Ingest from a file according to the required properties
var kustoIngestionProperties = new KustoQueuedIngestionProperties("<databaseName>", "<tableName>")
{
// Setting the report level to FailuresAndSuccesses will cause both successful and failed ingestions to be reported
// (Rather than the default "FailuresOnly" level - which is demonstrated in the
// 'Ingest From Local File(s) using KustoQueuedIngestClient and Ingestion Validation' section)
ReportLevel = IngestionReportLevel.FailuresAndSuccesses,
// Choose the report method of choice. 'Queue' is the default method.
// For the sake of the example, we will choose it anyway.
ReportMethod = IngestionReportMethod.Queue
};
await client.IngestFromStorageAsync("ValidTestFile.csv", kustoIngestionProperties);
await client.IngestFromStorageAsync("InvalidTestFile.csv", kustoIngestionProperties);
// Waiting for the aggregation
Thread.Sleep(TimeSpan.FromMinutes(8));
// Retrieve and validate failures
var ingestionFailures = await client.PeekTopIngestionFailuresAsync();
Ensure.IsTrue(ingestionFailures.Any(), "The failed ingestion should have been reported to the failed ingestions queue");
// Retrieve, delete and validate failures
ingestionFailures = await client.GetAndDiscardTopIngestionFailuresAsync();
Ensure.IsTrue(ingestionFailures.Any(), "The failed ingestion should have been reported to the failed ingestions queue");
// Verify the success has also been reported to the queue
var ingestionSuccesses = await client.GetAndDiscardTopIngestionSuccessesAsync();
Ensure.ConditionIsMet(ingestionSuccesses.Any(), "The successful ingestion should have been reported to the successful ingestions queue");
Yerel dosyalardan alma ve tabloya rapor durumu
Yerel dosyalardan almak ve durumu tabloya bildirmek için KustoQueuedIngestClient kullanın.
var ingestUri = "https://ingest-<clusterName>.<region>.kusto.windows.net";
var appId = "<appId>";
var appKey = "<appKey>";
var appTenant = "<appTenant>";
//Create Kusto connection string with App Authentication
var kustoConnectionStringBuilderDM = new KustoConnectionStringBuilder(ingestUri)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
// Create a disposable client that will execute the ingestion
using var client = KustoIngestFactory.CreateQueuedIngestClient(kustoConnectionStringBuilderDM);
// Ingest from a file according to the required properties
var kustoIngestionProperties = new KustoQueuedIngestionProperties("<databaseName>", "<tableName>")
{
// Setting the report level to FailuresAndSuccesses will cause both successful and failed ingestions to be reported
// (Rather than the default "FailuresOnly" level)
ReportLevel = IngestionReportLevel.FailuresAndSuccesses,
// Choose the report method of choice
ReportMethod = IngestionReportMethod.Table
};
var filePath = "<filePath>";
var fileIdentifier = Guid.NewGuid();
var sourceOptions = new StorageSourceOptions { SourceId = fileIdentifier };
// Execute the ingest operation and save the result.
var clientResult = await client.IngestFromStorageAsync(filePath, kustoIngestionProperties, sourceOptions);
// Use the fileIdentifier you supplied to get the status of your ingestion
var ingestionStatus = clientResult.Result.GetIngestionStatusBySourceId(fileIdentifier);
while (ingestionStatus.Status == Status.Pending)
{
// Wait a minute...
Thread.Sleep(TimeSpan.FromMinutes(1));
// Try again
ingestionStatus = clientResult.Result.GetIngestionStatusBySourceId(fileIdentifier);
}
// Verify the results of the ingestion
Ensure.ConditionIsMet(ingestionStatus.Status == Status.Succeeded, "The file should have been ingested successfully");
İlgili içerik
Geri Bildirim
https://aka.ms/ContentUserFeedback.
Çok yakında: 2024 boyunca, içerik için geri bildirim mekanizması olarak GitHub Sorunları’nı kullanımdan kaldıracak ve yeni bir geri bildirim sistemiyle değiştireceğiz. Daha fazla bilgi için bkz.Gönderin ve geri bildirimi görüntüleyin