Aracılığıyla paylaş


Azure Veri Gezgini Python kitaplığını kullanarak veri alma

Bu makalede, Azure Veri Gezgini Python kitaplığını kullanarak verileri alırsınız. Azure Veri Gezgini, günlük ve telemetri verileri için hızlı ve yüksek oranda ölçeklenebilir veri keşfetme hizmetidir. Azure Veri Gezgini, Python için iki istemci kitaplığı sağlar: veri alım kitaplığı ve veri kitaplığı. Bu kitaplıklar, verileri bir kümeye almanızı veya yüklemenizi ve kodunuzdan veri sorgulamanızı sağlar.

İlk olarak, kümede bir tablo ve veri eşlemesi oluşturun. Ardından kümeye alım kuyruğu oluşturur ve sonuçları doğrularsınız.

Önkoşullar

Verileri yükleme ve kitaplıkları alma

azure-kusto-data ve azure-kusto-ingest'ı yükleyin.

pip install azure-kusto-data
pip install azure-kusto-ingest

İthalat ifadeleri ve sabitleri ekle

Azure-kusto-data'dan sınıfları içeri aktarma.

from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
from azure.kusto.data.exceptions import KustoServiceError
from azure.kusto.data.helpers import dataframe_from_result_table

Azure Veri Gezgini, bir uygulamanın kimliğini doğrulamak için Microsoft Entra kiracı kimliğinizi kullanır. Kiracı kimliğinizi bulmak için aşağıdaki URL'yi kullanın ve etki alanınızı EtkiAlanınız olarak değiştirin.

https://login.microsoftonline.com/<YourDomain>/.well-known/openid-configuration/

Örneğin, etki alanınız contoso.com ise URL şöyledir: https://login.microsoftonline.com/contoso.com/.well-known/openid-configuration/. Sonuçları görmek için bu URL'ye tıklayın; ilk satır aşağıdaki gibidir.

"authorization_endpoint":"https://login.microsoftonline.com/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"

Bu durumda kiracı kimliği aaaabbbb-0000-cccc-1111-dddd2222eeee'dir. Bu kodu çalıştırmadan önce AAD_TENANT_ID, KUSTO_URI, KUSTO_INGEST_URI ve KUSTO_DATABASE değerlerini ayarlayın.

AAD_TENANT_ID = "<TenantId>"
KUSTO_URI = "https://<ClusterName>.<Region>.kusto.windows.net/"
KUSTO_INGEST_URI = "https://ingest-<ClusterName>.<Region>.kusto.windows.net/"
KUSTO_DATABASE = "<DatabaseName>"

Şimdi bağlantı dizesini oluşturun. Aşağıdaki örnek, kümeye erişmek için cihaz kimlik doğrulamasını kullanır. Ayrıca yönetilen kimlik doğrulamasını, Microsoft Entra uygulama sertifikasını, Microsoft Entra uygulama anahtarını ve Microsoft Entra kullanıcı ve parolasını da kullanabilirsiniz.

Hedef tabloyu ve eşlemeyi sonraki bir adımda oluşturacaksınız.

KCSB_INGEST = KustoConnectionStringBuilder.with_interactive_login(
    KUSTO_INGEST_URI)

KCSB_DATA = KustoConnectionStringBuilder.with_interactive_login(
    KUSTO_URI)

DESTINATION_TABLE = "StormEvents"
DESTINATION_TABLE_COLUMN_MAPPING = "StormEvents_CSV_Mapping"

Kaynak dosya bilgilerini ayarlama

Ek sınıfları içeri aktarın ve veri kaynağı dosyası için sabitleri ayarlayın. Bu örnekte Azure Blob Depolama'da barındırılan örnek bir dosya kullanılmaktadır. StormEvents örnek veri kümesi, Ulusal Çevre Bilgileri Merkezlerinden hava durumuyla ilgili verileri içerir.

from azure.kusto.data import DataFormat
from azure.kusto.ingest import QueuedIngestClient, IngestionProperties, FileDescriptor, BlobDescriptor, DataFormat, ReportLevel, ReportMethod

CONTAINER = "samplefiles"
ACCOUNT_NAME = "kustosamples"
SAS_TOKEN = ""  # If relevant add SAS token
FILE_PATH = "StormEvents.csv"
FILE_SIZE = 64158321    # in bytes

BLOB_PATH = "https://" + ACCOUNT_NAME + ".blob.core.windows.net/" + \
    CONTAINER + "/" + FILE_PATH + SAS_TOKEN

Kümenizde tablo oluşturma

StormEvents.csv dosyasındaki verilerin şemasıyla eşleşen bir tablo oluşturun. Bu kod çalıştırıldığında aşağıdaki iletiye benzer bir ileti döndürür: Oturum açmak için bir web tarayıcısı kullanarak sayfayı https://microsoft.com/devicelogin açın ve kimlik doğrulaması için F3W4VWZDM kodu girin. Oturum açmak için adımları izleyin ve bir sonraki kod bloğunu çalıştırmak için geri dönün. Bağlantıyı oluşturan sonraki kod blokları yeniden oturum açmanızı gerektirir.

KUSTO_CLIENT = KustoClient(KCSB_DATA)
CREATE_TABLE_COMMAND = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)"

RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_TABLE_COMMAND)

dataframe_from_result_table(RESPONSE.primary_results[0])

Veri alımı eşlemesini tanımlayın

Gelen CSV verilerini, tabloyu oluştururken kullanılan sütun adlarına ve veri türlerine eşleyin. Bu, kaynak veri alanlarını hedef tablo sütunlarına eşler

CREATE_MAPPING_COMMAND = """.create table StormEvents ingestion csv mapping 'StormEvents_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EpisodeId","datatype":"int","Ordinal":2},{"Name":"EventId","datatype":"int","Ordinal":3},{"Name":"State","datatype":"string","Ordinal":4},{"Name":"EventType","datatype":"string","Ordinal":5},{"Name":"InjuriesDirect","datatype":"int","Ordinal":6},{"Name":"InjuriesIndirect","datatype":"int","Ordinal":7},{"Name":"DeathsDirect","datatype":"int","Ordinal":8},{"Name":"DeathsIndirect","datatype":"int","Ordinal":9},{"Name":"DamageProperty","datatype":"int","Ordinal":10},{"Name":"DamageCrops","datatype":"int","Ordinal":11},{"Name":"Source","datatype":"string","Ordinal":12},{"Name":"BeginLocation","datatype":"string","Ordinal":13},{"Name":"EndLocation","datatype":"string","Ordinal":14},{"Name":"BeginLat","datatype":"real","Ordinal":16},{"Name":"BeginLon","datatype":"real","Ordinal":17},{"Name":"EndLat","datatype":"real","Ordinal":18},{"Name":"EndLon","datatype":"real","Ordinal":19},{"Name":"EpisodeNarrative","datatype":"string","Ordinal":20},{"Name":"EventNarrative","datatype":"string","Ordinal":21},{"Name":"StormSummary","datatype":"dynamic","Ordinal":22}]'"""

RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_MAPPING_COMMAND)

dataframe_from_result_table(RESPONSE.primary_results[0])

İletiyi işleme için kuyruğa al

Blob depolamadan veri çekmek ve bu verileri Azure Veri Gezgini'ne almak için mesajı sıraya koyun.

INGESTION_CLIENT = QueuedIngestClient(KCSB_INGEST)

# All ingestion properties are documented here: https://learn.microsoft.com/azure/kusto/management/data-ingest#ingestion-properties
INGESTION_PROPERTIES = IngestionProperties(database=KUSTO_DATABASE, table=DESTINATION_TABLE, data_format=DataFormat.CSV,
                                           ingestion_mapping_reference=DESTINATION_TABLE_COLUMN_MAPPING, additional_properties={'ignoreFirstRecord': 'true'})
# FILE_SIZE is the raw size of the data in bytes
BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)
INGESTION_CLIENT.ingest_from_blob(
    BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)

print('Done queuing up ingestion with Azure Data Explorer')

Tabloya alınan verileri sorgulayın

Kuyruğa alınmış veri alımının planlanması ve verilerin Azure Veri Gezgini'ne yüklenmesi için beş ile 10 dakika arasında bekleyin. Ardından StormEvents tablosundaki kayıtların sayısını almak için aşağıdaki kodu çalıştırın.

QUERY = "StormEvents | count"

RESPONSE = KUSTO_CLIENT.execute_query(KUSTO_DATABASE, QUERY)

dataframe_from_result_table(RESPONSE.primary_results[0])

Sorun giderme sorgularını çalıştırma

https://dataexplorer.azure.com oturum açın ve kümenize bağlanın. Son dört saat içinde herhangi bir alma hatası olup olmadığını görmek için veritabanınızda aşağıdaki komutu çalıştırın. Çalıştırmadan önce veritabanı adını değiştirin.

.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"

Son dört saat içindeki tüm alım işlemlerinin durumunu görüntülemek için aşağıdaki komutu çalıştırın. Çalıştırmadan önce veritabanı adını değiştirin.

.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Table == "StormEvents" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId

Kaynakları temizle

Diğer makalelerimizi izlemeyi planlıyorsanız, oluşturduğunuz kaynakları koruyun. Aksi takdirde, StormEvents tablosunu temizlemek için veritabanınızda aşağıdaki komutu çalıştırın.

.drop table StormEvents

Sonraki adım