Menyerap data menggunakan pustaka Azure Data Explorer Python
Dalam artikel ini, Anda menyerap data menggunakan pustaka Azure Data Explorer Python. Azure Data Explorer adalah layanan eksplorasi data yang cepat dan sangat dapat diskalakan untuk data log dan telemetri. Azure Data Explorer menyediakan dua pustaka klien untuk Python: pustaka penyerapan dan pustaka data. Pustaka ini memungkinkan Anda untuk menyerap, atau memuat, data ke dalam kluster dan mengkueri data dari kode Anda.
Pertama, buat tabel dan pemetaan data dalam kluster. Anda kemudian mengantre penyerapan ke kluster dan memvalidasi hasilnya.
Prasyarat
- Akun Microsoft atau identitas pengguna Microsoft Entra. Langganan Azure tidak diperlukan.
- Kluster dan database Azure Data Explorer. Membuat kluster dan database.
- Python 3.4+.
Menginstal pustaka data dan penyerapan
Instal azure-kusto-data dan azure-kusto-ingest.
pip install azure-kusto-data
pip install azure-kusto-ingest
Menambahkan pernyataan dan konstanta impor
Mengimpor kelas dari azure-kusto-data.
from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
from azure.kusto.data.exceptions import KustoServiceError
from azure.kusto.data.helpers import dataframe_from_result_table
Untuk mengautentikasi aplikasi, Azure Data Explorer menggunakan ID penyewa Microsoft Entra Anda. Untuk menemukan ID penyewa Anda, gunakan URL berikut, mengganti domain Anda untuk YourDomain.
https://login.microsoftonline.com/<YourDomain>/.well-known/openid-configuration/
Misalnya, jika domain Anda contoso.com, URL-nya adalah: https://login.microsoftonline.com/contoso.com/.well-known/openid-configuration/. Klik URL ini untuk melihat hasilnya; baris pertama adalah sebagai berikut.
"authorization_endpoint":"https://login.microsoftonline.com/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"
ID penyewa dalam hal ini adalah 6babcaad-604b-40ac-a9d7-9fd97c0b779f
. Atur nilai untuk AAD_TENANT_ID, KUSTO_URI, KUSTO_INGEST_URI, dan KUSTO_DATABASE sebelum menjalankan kode ini.
AAD_TENANT_ID = "<TenantId>"
KUSTO_URI = "https://<ClusterName>.<Region>.kusto.windows.net/"
KUSTO_INGEST_URI = "https://ingest-<ClusterName>.<Region>.kusto.windows.net/"
KUSTO_DATABASE = "<DatabaseName>"
Sekarang buat string koneksi. Contoh berikut menggunakan autentikasi perangkat untuk mengakses kluster. Anda juga dapat menggunakan autentikasi identitas terkelola, sertifikat aplikasi Microsoft Entra, kunci aplikasi Microsoft Entra, dan pengguna dan kata sandi Microsoft Entra.
Anda membuat tabel tujuan dan pemetaan di langkah selanjutnya.
KCSB_INGEST = KustoConnectionStringBuilder.with_interactive_login(
KUSTO_INGEST_URI)
KCSB_DATA = KustoConnectionStringBuilder.with_interactive_login(
KUSTO_URI)
DESTINATION_TABLE = "StormEvents"
DESTINATION_TABLE_COLUMN_MAPPING = "StormEvents_CSV_Mapping"
Mengatur informasi file sumber
Impor kelas tambahan dan atur konstanta untuk file sumber data. Contoh ini menggunakan file sampel yang dihosting di Azure Blob Storage. Himpunan data sampel StormEvents berisi data terkait cuaca dari Pusat Nasional untuk Informasi Lingkungan.
from azure.kusto.data import DataFormat
from azure.kusto.ingest import QueuedIngestClient, IngestionProperties, FileDescriptor, BlobDescriptor, DataFormat, ReportLevel, ReportMethod
CONTAINER = "samplefiles"
ACCOUNT_NAME = "kustosamples"
SAS_TOKEN = "" # If relevant add SAS token
FILE_PATH = "StormEvents.csv"
FILE_SIZE = 64158321 # in bytes
BLOB_PATH = "https://" + ACCOUNT_NAME + ".blob.core.windows.net/" + \
CONTAINER + "/" + FILE_PATH + SAS_TOKEN
Membuat tabel di kluster Anda
Buat tabel yang cocok dengan skema data dalam file StormEvents.csv. Saat kode ini berjalan, kode mengembalikan pesan seperti pesan berikut: Untuk masuk, gunakan browser web untuk membuka halaman https://microsoft.com/devicelogin dan memasukkan kode F3W4VWZDM untuk mengautentikasi. Ikuti langkah-langkah untuk masuk, lalu kembali untuk menjalankan blok kode berikutnya. Blok kode berikutnya yang membuat koneksi mengharuskan Anda untuk masuk lagi.
KUSTO_CLIENT = KustoClient(KCSB_DATA)
CREATE_TABLE_COMMAND = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)"
RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_TABLE_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
Menentukan pemetaan penyerapan
Petakan data CSV masuk ke nama kolom dan jenis data yang digunakan saat membuat tabel. Ini memetakan bidang data sumber ke kolom tabel tujuan
CREATE_MAPPING_COMMAND = """.create table StormEvents ingestion csv mapping 'StormEvents_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EpisodeId","datatype":"int","Ordinal":2},{"Name":"EventId","datatype":"int","Ordinal":3},{"Name":"State","datatype":"string","Ordinal":4},{"Name":"EventType","datatype":"string","Ordinal":5},{"Name":"InjuriesDirect","datatype":"int","Ordinal":6},{"Name":"InjuriesIndirect","datatype":"int","Ordinal":7},{"Name":"DeathsDirect","datatype":"int","Ordinal":8},{"Name":"DeathsIndirect","datatype":"int","Ordinal":9},{"Name":"DamageProperty","datatype":"int","Ordinal":10},{"Name":"DamageCrops","datatype":"int","Ordinal":11},{"Name":"Source","datatype":"string","Ordinal":12},{"Name":"BeginLocation","datatype":"string","Ordinal":13},{"Name":"EndLocation","datatype":"string","Ordinal":14},{"Name":"BeginLat","datatype":"real","Ordinal":16},{"Name":"BeginLon","datatype":"real","Ordinal":17},{"Name":"EndLat","datatype":"real","Ordinal":18},{"Name":"EndLon","datatype":"real","Ordinal":19},{"Name":"EpisodeNarrative","datatype":"string","Ordinal":20},{"Name":"EventNarrative","datatype":"string","Ordinal":21},{"Name":"StormSummary","datatype":"dynamic","Ordinal":22}]'"""
RESPONSE = KUSTO_CLIENT.execute_mgmt(KUSTO_DATABASE, CREATE_MAPPING_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
Mengantrekan pesan untuk penyerapan
Antrekan pesan untuk menarik data dari penyimpanan blob dan menyerap data tersebut ke Azure Data Explorer.
INGESTION_CLIENT = QueuedIngestClient(KCSB_INGEST)
# All ingestion properties are documented here: https://learn.microsoft.com/azure/kusto/management/data-ingest#ingestion-properties
INGESTION_PROPERTIES = IngestionProperties(database=KUSTO_DATABASE, table=DESTINATION_TABLE, data_format=DataFormat.CSV,
ingestion_mapping_reference=DESTINATION_TABLE_COLUMN_MAPPING, additional_properties={'ignoreFirstRecord': 'true'})
# FILE_SIZE is the raw size of the data in bytes
BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)
INGESTION_CLIENT.ingest_from_blob(
BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)
print('Done queuing up ingestion with Azure Data Explorer')
Data kueri yang diserap ke dalam tabel
Tunggu selama lima hingga 10 menit hingga penyerapan antrean menjadwalkan penyerapan dan memuat data ke Azure Data Explorer. Kemudian jalankan kode berikut untuk mendapatkan hitungan rekaman dalam tabel StormEvents.
QUERY = "StormEvents | count"
RESPONSE = KUSTO_CLIENT.execute_query(KUSTO_DATABASE, QUERY)
dataframe_from_result_table(RESPONSE.primary_results[0])
Menjalankan kueri pemecahan masalah
Masuk ke https://dataexplorer.azure.com dan sambungkan ke kluster Anda. Jalankan perintah berikut dalam database Anda untuk melihat apakah ada kegagalan penyerapan dalam empat jam terakhir. Ganti nama database sebelum berjalan.
.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"
Jalankan perintah berikut untuk melihat status semua operasi penyerapan dalam empat jam terakhir. Ganti nama database sebelum berjalan.
.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Table == "StormEvents" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId
Membersihkan sumber daya
Jika Anda berencana untuk mengikuti artikel kami yang lain, pertahankan sumber daya yang Anda buat. Jika tidak, jalankan perintah berikut dalam database Anda untuk membersihkan tabel StormEvents.
.drop table StormEvents
Langkah selanjutnya
Saran dan Komentar
https://aka.ms/ContentUserFeedback.
Segera hadir: Sepanjang tahun 2024 kami akan menghentikan penggunaan GitHub Issues sebagai mekanisme umpan balik untuk konten dan menggantinya dengan sistem umpan balik baru. Untuk mengetahui informasi selengkapnya, lihat:Kirim dan lihat umpan balik untuk