Menyerap data menggunakan pustaka Azure Data Explorer Python

Dalam artikel ini, Anda menyerap data menggunakan pustaka Azure Data Explorer Python. Azure Data Explorer adalah layanan eksplorasi data yang cepat dan sangat dapat diskalakan untuk data log dan telemetri. Azure Data Explorer menyediakan dua pustaka klien untuk Python: pustaka penyerapan dan pustaka data. Pustaka ini memungkinkan Anda untuk menyerap, atau memuat, data ke dalam kluster dan mengkueri data dari kode Anda.

Pertama, buat tabel dan pemetaan data dalam kluster. Anda kemudian mengantre penyerapan ke kluster dan memvalidasi hasilnya.

Prasyarat

Menginstal pustaka data dan penyerapan

Instal azure-kusto-data dan azure-kusto-ingest.

pip install azure-kusto-data
pip install azure-kusto-ingest

Menambahkan pernyataan dan konstanta impor

Mengimpor kelas dari azure-kusto-data.

from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
from azure.kusto.data.exceptions import KustoServiceError
from azure.kusto.data.helpers import dataframe_from_result_table

Untuk mengautentikasi aplikasi, Azure Data Explorer menggunakan ID penyewa Microsoft Entra Anda. Untuk menemukan ID penyewa Anda, gunakan URL berikut, mengganti domain Anda untuk YourDomain.

https://login.microsoftonline.com/<YourDomain>/.well-known/openid-configuration/

Misalnya, jika domain Anda contoso.com, URL-nya adalah: https://login.microsoftonline.com/contoso.com/.well-known/openid-configuration/. Klik URL ini untuk melihat hasilnya; baris pertama adalah sebagai berikut.

"authorization_endpoint":"https://login.microsoftonline.com/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"

ID penyewa dalam hal ini adalah 6babcaad-604b-40ac-a9d7-9fd97c0b779f. Atur nilai untuk AAD_TENANT_ID, KUSTO_URI, KUSTO_INGEST_URI, dan KUSTO_DATABASE sebelum menjalankan kode ini.

AAD_TENANT_ID = "<TenantId>"
KUSTO_URI = "https://<ClusterName>.<Region>.kusto.windows.net/"
KUSTO_INGEST_URI = "https://ingest-<ClusterName>.<Region>.kusto.windows.net/"
KUSTO_DATABASE = "<DatabaseName>"

Sekarang buat string koneksi. Contoh berikut menggunakan autentikasi perangkat untuk mengakses kluster. Anda juga dapat menggunakan autentikasi identitas terkelola, sertifikat aplikasi Microsoft Entra, kunci aplikasi Microsoft Entra, dan pengguna dan kata sandi Microsoft Entra.

Anda membuat tabel tujuan dan pemetaan di langkah selanjutnya.

KCSB_INGEST = KustoConnectionStringBuilder.with_interactive_login(
    KUSTO_INGEST_URI)

KCSB_DATA = KustoConnectionStringBuilder.with_interactive_login(
    KUSTO_URI)

DESTINATION_TABLE = "StormEvents"
DESTINATION_TABLE_COLUMN_MAPPING = "StormEvents_CSV_Mapping"

Mengatur informasi file sumber

Impor kelas tambahan dan atur konstanta untuk file sumber data. Contoh ini menggunakan file sampel yang dihosting di Azure Blob Storage. Himpunan data sampel StormEvents berisi data terkait cuaca dari Pusat Nasional untuk Informasi Lingkungan.

from azure.kusto.data import DataFormat
from azure.kusto.ingest import QueuedIngestClient, IngestionProperties, FileDescriptor, BlobDescriptor, DataFormat, ReportLevel, ReportMethod

CONTAINER = "samplefiles"
ACCOUNT_NAME = "kustosamples"
SAS_TOKEN = ""  # If relevant add SAS token
FILE_PATH = "StormEvents.csv"
FILE_SIZE = 64158321    # in bytes

BLOB_PATH = "https://" + ACCOUNT_NAME + ".blob.core.windows.net/" + \
    CONTAINER + "/" + FILE_PATH + SAS_TOKEN

Membuat tabel di kluster Anda

Buat tabel yang cocok dengan skema data dalam file StormEvents.csv. Saat kode ini berjalan, kode mengembalikan pesan seperti pesan berikut: Untuk masuk, gunakan browser web untuk membuka halaman https://microsoft.com/devicelogin dan memasukkan kode F3W4VWZDM untuk mengautentikasi. Ikuti langkah-langkah untuk masuk, lalu kembali untuk menjalankan blok kode berikutnya. Blok kode berikutnya yang membuat koneksi mengharuskan Anda untuk masuk lagi.

KUSTO_CLIENT = KustoClient(KCSB_DATA)
CREATE_TABLE_COMMAND = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)"

RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_TABLE_COMMAND)

dataframe_from_result_table(RESPONSE.primary_results[0])

Menentukan pemetaan penyerapan

Petakan data CSV masuk ke nama kolom dan jenis data yang digunakan saat membuat tabel. Ini memetakan bidang data sumber ke kolom tabel tujuan

CREATE_MAPPING_COMMAND = """.create table StormEvents ingestion csv mapping 'StormEvents_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EpisodeId","datatype":"int","Ordinal":2},{"Name":"EventId","datatype":"int","Ordinal":3},{"Name":"State","datatype":"string","Ordinal":4},{"Name":"EventType","datatype":"string","Ordinal":5},{"Name":"InjuriesDirect","datatype":"int","Ordinal":6},{"Name":"InjuriesIndirect","datatype":"int","Ordinal":7},{"Name":"DeathsDirect","datatype":"int","Ordinal":8},{"Name":"DeathsIndirect","datatype":"int","Ordinal":9},{"Name":"DamageProperty","datatype":"int","Ordinal":10},{"Name":"DamageCrops","datatype":"int","Ordinal":11},{"Name":"Source","datatype":"string","Ordinal":12},{"Name":"BeginLocation","datatype":"string","Ordinal":13},{"Name":"EndLocation","datatype":"string","Ordinal":14},{"Name":"BeginLat","datatype":"real","Ordinal":16},{"Name":"BeginLon","datatype":"real","Ordinal":17},{"Name":"EndLat","datatype":"real","Ordinal":18},{"Name":"EndLon","datatype":"real","Ordinal":19},{"Name":"EpisodeNarrative","datatype":"string","Ordinal":20},{"Name":"EventNarrative","datatype":"string","Ordinal":21},{"Name":"StormSummary","datatype":"dynamic","Ordinal":22}]'"""

RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_MAPPING_COMMAND)

dataframe_from_result_table(RESPONSE.primary_results[0])

Mengantrekan pesan untuk penyerapan

Antrekan pesan untuk menarik data dari penyimpanan blob dan menyerap data tersebut ke Azure Data Explorer.

INGESTION_CLIENT = QueuedIngestClient(KCSB_INGEST)

# All ingestion properties are documented here: https://learn.microsoft.com/azure/kusto/management/data-ingest#ingestion-properties
INGESTION_PROPERTIES = IngestionProperties(database=KUSTO_DATABASE, table=DESTINATION_TABLE, data_format=DataFormat.CSV,
                                           ingestion_mapping_reference=DESTINATION_TABLE_COLUMN_MAPPING, additional_properties={'ignoreFirstRecord': 'true'})
# FILE_SIZE is the raw size of the data in bytes
BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)
INGESTION_CLIENT.ingest_from_blob(
    BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)

print('Done queuing up ingestion with Azure Data Explorer')

Data kueri yang diserap ke dalam tabel

Tunggu selama lima hingga 10 menit hingga penyerapan antrean menjadwalkan penyerapan dan memuat data ke Azure Data Explorer. Kemudian jalankan kode berikut untuk mendapatkan hitungan rekaman dalam tabel StormEvents.

QUERY = "StormEvents | count"

RESPONSE = KUSTO_CLIENT.execute_query(KUSTO_DATABASE, QUERY)

dataframe_from_result_table(RESPONSE.primary_results[0])

Menjalankan kueri pemecahan masalah

Masuk ke https://dataexplorer.azure.com dan sambungkan ke kluster Anda. Jalankan perintah berikut dalam database Anda untuk melihat apakah ada kegagalan penyerapan dalam empat jam terakhir. Ganti nama database sebelum berjalan.

.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"

Jalankan perintah berikut untuk melihat status semua operasi penyerapan dalam empat jam terakhir. Ganti nama database sebelum berjalan.

.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Table == "StormEvents" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId

Membersihkan sumber daya

Jika Anda berencana untuk mengikuti artikel kami yang lain, pertahankan sumber daya yang Anda buat. Jika tidak, jalankan perintah berikut dalam database Anda untuk membersihkan tabel StormEvents.

.drop table StormEvents

Langkah selanjutnya