Azure Veri Gezgini .NET SDK'sını kullanarak veri alma

Azure Veri Gezgini, günlük ve telemetri verileri için hızlı ve üst düzeyde ölçeklenebilir veri keşfetme hizmetidir. .NET için iki istemci kitaplığı sağlar: alma kitaplığı ve veri kitaplığı. .NET SDK hakkında daha fazla bilgi için bkz. .NET SDK hakkında. Bu kitaplıklar verileri bir kümeye almanıza (yüklemenize ve kodunuzdan verileri sorgulamanıza olanak tanır. Bu makalede, önce bir test kümesinde bir tablo ve veri eşlemesi oluşturursunuz. Ardından kümeye bir alımı kuyruğa alır ve sonuçları doğrularsınız.

Ön koşullar

Alma kitaplığını yükleme

Install-Package Microsoft.Azure.Kusto.Ingest

Kimlik doğrulaması ekleme ve bağlantı dizesi oluşturma

Kimlik Doğrulaması

Azure Veri Gezgini SDK, bir uygulamanın kimliğini doğrulamak için AAD kiracı kimliğinizi kullanır. Kiracı kimliğinizi bulmak için aşağıdaki URL'yi kullanın ve YourDomain yerine kendi etki alanınızı yazın.

https://login.windows.net/<YourDomain>/.well-known/openid-configuration/

Örneğin, etki alanınız contoso.com olduğunda URL şöyle olur: https://login.windows.net/contoso.com/.well-known/openid-configuration/. Sonuçları görmek için bu URL'ye tıklayın; ilk satır aşağıdaki gibidir.

"authorization_endpoint":"https://login.windows.net/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"

Bu örnekte kiracı kimliği 6babcaad-604b-40ac-a9d7-9fd97c0b779f değeridir.

Bu örnekte, kümeye erişmek için etkileşimli bir AAD kullanıcı kimlik doğrulaması kullanılır. Sertifika veya uygulama gizli dizisi ile AAD uygulama kimlik doğrulaması da kullanabilirsiniz. Bu kodu çalıştırmadan önce ve clusterUri için tenantId doğru değerleri ayarladığınızdan emin olun.

Azure Veri Gezgini SDK'sı, bağlantı dizesinin bir parçası olarak kimlik doğrulama yöntemini ayarlamak için kullanışlı bir yol sağlar. Azure Veri Gezgini bağlantı dizeleriyle ilgili tüm belgeler için bkz. bağlantı dizeleri.

Not

SDK'nın geçerli sürümü .NET Core'da etkileşimli kullanıcı kimlik doğrulamasını desteklemez. Gerekirse, bunun yerine AAD kullanıcı adını/parolasını veya uygulama kimlik doğrulamasını kullanın.

Bağlantı dizesini oluşturma

Artık Azure Veri Gezgini bağlantı dizesini oluşturabilirsiniz. Hedef tabloyu ve eşlemeyi sonraki bir adımda oluşturacaksınız.

var tenantId = "<TenantId>";
var kustoUri = "https://<ClusterName>.<Region>.kusto.windows.net/";

var kustoConnectionStringBuilder = new KustoConnectionStringBuilder(kustoUri).WithAadUserPromptAuthentication(tenantId);

Kaynak dosya bilgilerini ayarlama

Kaynak dosyanın yolunu ayarlayın. Bu örnekte, Azure Blob Depolama'da barındırılan bir örnek dosya kullanılır. StormEvents örnek veri kümesi, Ulusal Çevre Bilgileri Merkezleri'nden gelen hava durumu verilerini içerir.

var blobPath = "https://kustosamples.blob.core.windows.net/samplefiles/StormEvents.csv";

Test kümenizde tablo oluşturma

Dosyadaki StormEvents.csv verilerin şemasıyla eşleşen adlı StormEvents bir tablo oluşturun.

İpucu

Aşağıdaki kod parçacıkları, neredeyse her çağrı için bir istemci örneği oluşturur. Bu, her kod parçacığını ayrı ayrı çalıştırılabilir hale getirmek için yapılır. Üretimde, istemci örnekleri yeniden girin ve gerektiği kadar tutulmalıdır. URI başına tek bir istemci örneği, birden çok veritabanıyla çalışırken bile yeterlidir (veritabanı bir komut düzeyinde belirtilebilir).

var databaseName = "<DatabaseName>";
var table = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableCreateCommand(
            table,
            new[]
            {
                Tuple.Create("StartTime", "System.DateTime"),
                Tuple.Create("EndTime", "System.DateTime"),
                Tuple.Create("EpisodeId", "System.Int32"),
                Tuple.Create("EventId", "System.Int32"),
                Tuple.Create("State", "System.String"),
                Tuple.Create("EventType", "System.String"),
                Tuple.Create("InjuriesDirect", "System.Int32"),
                Tuple.Create("InjuriesIndirect", "System.Int32"),
                Tuple.Create("DeathsDirect", "System.Int32"),
                Tuple.Create("DeathsIndirect", "System.Int32"),
                Tuple.Create("DamageProperty", "System.Int32"),
                Tuple.Create("DamageCrops", "System.Int32"),
                Tuple.Create("Source", "System.String"),
                Tuple.Create("BeginLocation", "System.String"),
                Tuple.Create("EndLocation", "System.String"),
                Tuple.Create("BeginLat", "System.Double"),
                Tuple.Create("BeginLon", "System.Double"),
                Tuple.Create("EndLat", "System.Double"),
                Tuple.Create("EndLon", "System.Double"),
                Tuple.Create("EpisodeNarrative", "System.String"),
                Tuple.Create("EventNarrative", "System.String"),
                Tuple.Create("StormSummary", "System.Object"),
            });

    kustoClient.ExecuteControlCommand(databaseName, command);
}

Veri alımı eşlemesini tanımlama

Gelen CSV verilerini tabloyu oluştururken kullanılan sütun adlarına eşleyin. Bu tabloda bir CSV sütun eşleme nesnesi sağlayın.

var tableMapping = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableMappingCreateCommand(
            Data.Ingestion.IngestionMappingKind.Csv,
            table,
            tableMapping,
            new[] {
                new ColumnMapping() { ColumnName = "StartTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "0" } } },
                new ColumnMapping() { ColumnName = "EndTime", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "1" } } },
                new ColumnMapping() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "2" } } },
                new ColumnMapping() { ColumnName = "EventId", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "3" } } },
                new ColumnMapping() { ColumnName = "State", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "4" } } },
                new ColumnMapping() { ColumnName = "EventType", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "5" } } },
                new ColumnMapping() { ColumnName = "InjuriesDirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "6" } } },
                new ColumnMapping() { ColumnName = "InjuriesIndirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "7" } } },
                new ColumnMapping() { ColumnName = "DeathsDirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "8" } } },
                new ColumnMapping() { ColumnName = "DeathsIndirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "9" } } },
                new ColumnMapping() { ColumnName = "DamageProperty", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "10" } } },
                new ColumnMapping() { ColumnName = "DamageCrops", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "11" } } },
                new ColumnMapping() { ColumnName = "Source", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "12" } } },
                new ColumnMapping() { ColumnName = "BeginLocation", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "13" } } },
                new ColumnMapping() { ColumnName = "EndLocation", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "14" } } },
                new ColumnMapping() { ColumnName = "BeginLat", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "15" } } },
                new ColumnMapping() { ColumnName = "BeginLon", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "16" } } },
                new ColumnMapping() { ColumnName = "EndLat", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "17" } } },
                new ColumnMapping() { ColumnName = "EndLon", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "18" } } },
                new ColumnMapping() { ColumnName = "EpisodeNarrative", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "19" } } },
                new ColumnMapping() { ColumnName = "EventNarrative", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "20" } } },
                new ColumnMapping() { ColumnName = "StormSummary", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "21" } } }
        });

    kustoClient.ExecuteControlCommand(databaseName, command);
}

Tablonuz için toplu işlem ilkesi tanımlama

Gelen verileri toplu işleme, veri parça boyutunu en iyi duruma getirerek alma toplu işlem ilkesi tarafından denetlenmektedir. İlkeyi alma toplu ilkesi denetim komutuyla değiştirin. Yavaş gelen verilerin gecikme süresini azaltmak için bu ilkeyi kullanın.

using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableAlterIngestionBatchingPolicyCommand(
        databaseName,
        table,
        new IngestionBatchingPolicy(maximumBatchingTimeSpan: TimeSpan.FromSeconds(10.0), maximumNumberOfItems: 100, maximumRawDataSizeMB: 1024));

    kustoClient.ExecuteControlCommand(command);
}

Alınan veriler için bir Raw Data Size değer tanımlamanızı ve performansın iyileşip iyileşmediğini denetlerken boyutu artımlı olarak 250 MB'a azaltmanızı öneririz.

Özelliğini kullanarak Flush Immediately toplu işlemi atlayabilirsiniz, ancak düşük performansa neden olabileceğinden büyük ölçekli alım için bu önerilmez.

Veri alımı için bir iletiyi kuyruğa alma

Blob depolamadan veri çekmek ve verileri almak için bir iletiyi kuyruğa alın. Alma kümesine bir bağlantı kurulur ve bu uç noktayla çalışmak için başka bir istemci oluşturulur.

İpucu

Aşağıdaki kod parçacıkları, neredeyse her çağrı için bir istemci örneği oluşturur. Bu, her kod parçacığını ayrı ayrı çalıştırılabilir hale getirmek için yapılır. Üretimde, istemci örnekleri yeniden girin ve gerektiği kadar tutulmalıdır. URI başına tek bir istemci örneği, birden çok veritabanıyla çalışırken bile yeterlidir (veritabanı bir komut düzeyinde belirtilebilir).

var ingestUri = "https://ingest-<ClusterName>.<Region>.kusto.windows.net";
var ingestConnectionStringBuilder = new KustoConnectionStringBuilder(ingestUri).WithAadUserPromptAuthentication(tenantId);

using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestConnectionStringBuilder))
{
    var properties =
        new KustoQueuedIngestionProperties(database, table)
        {
            Format = DataSourceFormat.csv,
            IngestionMapping = new IngestionMapping()
            { 
                IngestionMappingReference = tableMapping,
                IngestionMappingKind = IngestionMappingKind.Csv
            },
            IgnoreFirstRecord = true
        };

    ingestClient.IngestFromStorageAsync(blobPath, ingestionProperties: properties).GetAwaiter().GetResult();
}

Verilerin tabloya alındığını doğrulama

Kuyruğa alınan alımın alımı zamanlaması ve verileri Azure Veri Gezgini yüklemesi için beş ile on dakika arasında bekleyin. Ardından aşağıdaki kodu çalıştırarak StormEvents tablosundaki kayıtların sayısını alın.

using (var cslQueryProvider = KustoClientFactory.CreateCslQueryProvider(kustoConnectionStringBuilder))
{
    var query = $"{table} | count";

    var results = cslQueryProvider.ExecuteQuery<long>(databaseName, query);
    Console.WriteLine(results.Single());
}

Sorun giderme sorguları çalıştırma

https://dataexplorer.azure.com adresinde oturum açın ve kümenize bağlanın. Son dört saatte hiç veri alımı hatası olup olmadığını görmek için veritabanınızda aşağıdaki komutu çalıştırın. Çalıştırmadan önce veritabanı adını değiştirin.

.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"

Son dört saatteki tüm veri alım işlemlerinin durumunu görüntülemek için aşağıdaki komutu çalıştırın. Çalıştırmadan önce veritabanı adını değiştirin.

.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId

Kaynakları temizleme

Diğer makalelerimizi izlemeyi planlıyorsanız, oluşturduğunuz kaynakları koruyun. Aksi takdirde, veritabanınızda aşağıdaki komutu çalıştırarak StormEvents tablosunu temizleyin.

.drop table StormEvents

Sonraki adımlar