Menyerap data sampel berformat JSON ke Azure Data Explorer
Artikel
Artikel ini memperlihatkan kepada Anda cara menyerap data berformat JSON ke dalam database Azure Data Explorer. Anda akan mulai dengan contoh sederhana JSON mentah dan dipetakan, lanjutkan ke JSON multibaris, lalu mengatasi skema JSON yang lebih kompleks yang berisi array dan kamus. Contohnya merinci proses penyerapan data berformat JSON menggunakan Bahasa Kueri Kusto (KQL), C#, atau Python.
Catatan
Kami tidak merekomendasikan penggunaan .ingest perintah manajemen dalam skenario produksi. Sebagai gantinya, gunakan konektor data atau serap data secara terprogram menggunakan salah satu pustaka klien Kusto.
Prasyarat
Akun Microsoft atau identitas pengguna Microsoft Entra. Langganan Azure tidak diperlukan.
Azure Data Explorer mendukung dua format file JSON:
json: JSON yang dipisahkan garis. Setiap baris dalam data input memiliki tepat satu rekaman JSON. Format ini mendukung penguraian komentar dan properti yang dikutip tunggal. Untuk informasi selengkapnya, lihat Baris JSON.
multijson: JSON multibaris. Pengurai mengabaikan pemisah baris dan membaca rekaman dari posisi sebelumnya ke akhir JSON yang valid.
Catatan
Saat menyerap menggunakan wizard penyerapan, format defaultnya adalah multijson. Format ini dapat menangani rekaman JSON multibaris dan array rekaman JSON. Ketika terjadi kesalahan penguraian, seluruh file akan dibuang. Untuk mengabaikan rekaman JSON yang tidak valid, pilih opsi untuk "Abaikan kesalahan format data.", yang akan mengalihkan format ke json (Baris JSON).
Jika Anda menggunakan format Baris JSON (json), baris yang tidak mewakili rekaman JSON yang valid dilewati selama penguraian.
Menyerap dan memetakan data berformat JSON
Penyerapan data berformat JSON mengharuskan Anda menentukan format menggunakan properti penyerapan. Penyerapan data JSON memerlukan pemetaan, yang memetakan entri sumber JSON ke kolom targetnya. Saat menyerap data, gunakan IngestionMapping properti dengan ingestionMappingReference properti penyerapan (untuk pemetaan yang telah ditentukan sebelumnya) atau propertinya IngestionMappings . Artikel ini akan menggunakan ingestionMappingReference properti penyerapan, yang telah ditentukan sebelumnya pada tabel yang digunakan untuk penyerapan. Dalam contoh di bawah ini, kita akan mulai dengan menyerap rekaman JSON sebagai data mentah ke satu tabel kolom. Kemudian kita akan menggunakan pemetaan untuk menyerap setiap properti ke kolom yang dipetakan.
Contoh JSON sederhana
Contoh berikut adalah JSON sederhana, dengan struktur datar. Data memiliki informasi suhu dan kelembaban, yang dikumpulkan oleh beberapa perangkat. Setiap rekaman ditandai dengan ID dan tanda waktu.
Dalam contoh ini, Anda menyerap rekaman JSON sebagai data mentah ke satu tabel kolom. Manipulasi data, menggunakan kueri, dan kebijakan pembaruan dilakukan setelah data diserap.
Dalam kotak dialog Tambahkan kluster , masukkan URL kluster Anda dalam formulir https://<ClusterName>.<Region>.kusto.windows.net/, lalu pilih Tambahkan.
Tempel dalam perintah berikut, dan pilih Jalankan untuk membuat tabel.
.create table RawEvents (Event: dynamic)
Kueri ini membuat tabel dengan satu Event kolom dari jenis data dinamis .
Perintah ini membuat pemetaan, dan memetakan jalur $ akar JSON ke Event kolom.
Serap data ke RawEvents dalam tabel.
.ingest into table RawEvents ('https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json') with '{"format":"json", "ingestionMappingReference":"RawEventMapping"}'
Gunakan C# untuk menyerap data dalam format JSON mentah.
RawEvents Buat tabel.
var kustoUri = "https://<clusterName>.<region>.kusto.windows.net/";
var connectionStringBuilder = new KustoConnectionStringBuilder(kustoUri)
{
FederatedSecurity = true,
UserID = userId,
Password = password,
Authority = tenantId,
InitialCatalog = databaseName
};
using var kustoClient = KustoClientFactory.CreateCslAdminProvider(connectionStringBuilder);
var tableName = "RawEvents";
var command = CslCommandGenerator.GenerateTableCreateCommand(
tableName,
new[] { Tuple.Create("Events", "System.Object") }
);
await kustoClient.ExecuteControlCommandAsync(command);
Dalam pemetaan ini, seperti yang ditentukan oleh skema tabel, timestamp entri akan diserap ke kolom Time sebagai datetime jenis data.
Serap data ke Events dalam tabel.
.ingest into table Events ('https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json') with '{"format":"json", "ingestionMappingReference":"FlatEventMapping"}'
File 'simple.json' memiliki beberapa rekaman JSON yang dipisahkan baris. Formatnya adalah json, dan pemetaan yang digunakan dalam perintah penyerapan adalah yang FlatEventMapping Anda buat.
Buat tabel baru, dengan skema serupa dengan data input JSON. Kita akan menggunakan tabel ini untuk semua contoh dan perintah penyerapan berikut.
Dalam pemetaan ini, seperti yang ditentukan oleh skema tabel, timestamp entri akan diserap ke kolom Time sebagai datetime jenis data.
Serap data ke Events dalam tabel.
var blobPath = "https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json";
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
Format = DataSourceFormat.json,
IngestionMapping = new IngestionMapping { IngestionMappingReference = tableMappingName }
};
await ingestClient.IngestFromStorageAsync(blobPath, properties).ConfigureAwait(false);
File 'simple.json' memiliki beberapa rekaman JSON yang dipisahkan baris. Formatnya adalah json, dan pemetaan yang digunakan dalam perintah penyerapan adalah yang FlatEventMapping Anda buat.
Buat tabel baru, dengan skema serupa dengan data input JSON. Kita akan menggunakan tabel ini untuk semua contoh dan perintah penyerapan berikut.
File 'simple.json' memiliki beberapa baris rekaman JSON yang dipisahkan. Formatnya adalah json, dan pemetaan yang digunakan dalam perintah penyerapan adalah yang FlatEventMapping Anda buat.
Menyerap rekaman JSON multibaris
Dalam contoh ini, Anda menyerap rekaman JSON multibaris. Setiap properti JSON dipetakan ke satu kolom dalam tabel. File 'multilined.json' memiliki beberapa rekaman JSON yang diindentasi. Format multijson menunjukkan untuk membaca rekaman berdasarkan struktur JSON.
.ingest into table Events ('https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/multilined.json') with '{"format":"multijson", "ingestionMappingReference":"FlatEventMapping"}'
Serap data ke Events dalam tabel.
var tableMappingName = "FlatEventMapping";
var blobPath = "https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/multilined.json";
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
Format = DataSourceFormat.multijson,
IngestionMapping = new IngestionMapping { IngestionMappingReference = tableMappingName }
};
await ingestClient.IngestFromStorageAsync(blobPath, properties).ConfigureAwait(false);
Jenis data array adalah kumpulan nilai yang diurutkan. Penyerapan array JSON dilakukan oleh kebijakan pembaruan. JSON diserap apa adanya ke tabel perantara. Kebijakan pembaruan menjalankan fungsi yang telah ditentukan sebelumnya pada RawEvents tabel, menyerap kembali hasilnya ke tabel target. Kami akan menyerap data dengan struktur berikut:
update policy Buat fungsi yang memperluas kumpulan records sehingga setiap nilai dalam koleksi menerima baris terpisah, menggunakan mv-expand operator . Kita akan menggunakan tabel RawEvents sebagai tabel sumber dan Events sebagai tabel target.
.create function EventRecordsExpand() {
RawEvents
| mv-expand records = Event.records
| project
Time = todatetime(records["timestamp"]),
Device = tostring(records["deviceId"]),
MessageId = tostring(records["messageId"]),
Temperature = todouble(records["temperature"]),
Humidity = todouble(records["humidity"])
}
Skema yang diterima oleh fungsi harus cocok dengan skema tabel target. Gunakan getschema operator untuk meninjau skema.
EventRecordsExpand() | getschema
Tambahkan kebijakan pembaruan ke tabel target. Kebijakan ini akan secara otomatis menjalankan kueri pada data yang baru diserap dalam RawEvents tabel perantara dan menyerap hasilnya ke Events dalam tabel. Tentukan kebijakan retensi nol untuk menghindari pertahankan tabel perantara.
.ingest into table RawEvents ('https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/array.json') with '{"format":"multijson", "ingestionMappingReference":"RawEventMapping"}'
Tinjau data dalam Events tabel.
Events
Buat fungsi pembaruan yang memperluas kumpulan records sehingga setiap nilai dalam koleksi menerima baris terpisah, menggunakan mv-expand operator . Kita akan menggunakan tabel RawEvents sebagai tabel sumber dan Events sebagai tabel target.
var command = CslCommandGenerator.GenerateCreateFunctionCommand(
"EventRecordsExpand",
"UpdateFunctions",
string.Empty,
null,
@"RawEvents
| mv-expand records = Event
| project
Time = todatetime(records['timestamp']),
Device = tostring(records['deviceId']),
MessageId = tostring(records['messageId']),
Temperature = todouble(records['temperature']),
Humidity = todouble(records['humidity'])",
ifNotExists: false
);
await kustoClient.ExecuteControlCommandAsync(command);
Catatan
Skema yang diterima oleh fungsi harus cocok dengan skema tabel target.
Tambahkan kebijakan pembaruan ke tabel target. Kebijakan ini akan secara otomatis menjalankan kueri pada data yang baru diserap dalam RawEvents tabel perantara dan menyerap hasilnya ke Events dalam tabel. Tentukan kebijakan retensi nol untuk menghindari pertahankan tabel perantara.
var blobPath = "https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/array.json";
var tableName = "RawEvents";
var tableMappingName = "RawEventMapping";
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
Format = DataSourceFormat.multijson,
IngestionMapping = new IngestionMapping { IngestionMappingReference = tableMappingName }
};
await ingestClient.IngestFromStorageAsync(blobPath, properties);
Tinjau data dalam Events tabel.
Buat fungsi pembaruan yang memperluas kumpulan records sehingga setiap nilai dalam koleksi menerima baris terpisah, menggunakan mv-expand operator . Kita akan menggunakan tabel RawEvents sebagai tabel sumber dan Events sebagai tabel target.
CREATE_FUNCTION_COMMAND =
'''.create function EventRecordsExpand() {
RawEvents
| mv-expand records = Event
| project
Time = todatetime(records["timestamp"]),
Device = tostring(records["deviceId"]),
MessageId = tostring(records["messageId"]),
Temperature = todouble(records["temperature"]),
Humidity = todouble(records["humidity"])
}'''
RESPONSE = KUSTO_CLIENT.execute_mgmt(DATABASE, CREATE_FUNCTION_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
Catatan
Skema yang diterima oleh fungsi harus cocok dengan skema tabel target.
Tambahkan kebijakan pembaruan ke tabel target. Kebijakan ini akan secara otomatis menjalankan kueri pada data yang baru diserap dalam RawEvents tabel perantara dan menyerap hasilnya ke Events dalam tabel. Tentukan kebijakan retensi nol untuk menghindari pertahankan tabel perantara.