Artikel ini memperlihatkan kepada Anda cara menyerap data berformat JSON ke dalam database Azure Data Explorer. Anda mulai dengan contoh sederhana JSON mentah dan dipetakan, lanjutkan ke JSON multibaris, lalu mengatasi skema JSON yang lebih kompleks yang berisi array dan kamus. Contoh merinci proses penyerapan data berformat JSON menggunakan Bahasa Kueri Kusto (KQL), C#, atau Python.
Catatan
Kami tidak merekomendasikan penggunaan .ingest perintah manajemen dalam skenario produksi. Sebagai gantinya, gunakan konektor data atau serap data secara terprogram menggunakan salah satu pustaka klien Kusto.
Prasyarat
- Akun Microsoft atau identitas pengguna Microsoft Entra. Langganan Azure tidak diperlukan.
- Kluster dan database Azure Data Explorer.
Membuat kluster dan database.
Azure Data Explorer mendukung dua format file JSON:
-
json: JSON yang dipisahkan baris. Setiap baris dalam data input memiliki tepat satu rekaman JSON. Format ini mendukung penguraian komentar dan properti yang dikutip tunggal. Untuk informasi selengkapnya, lihat Baris JSON.
-
multijson: JSON multibaris. Pengurai mengabaikan pemisah baris dan membaca rekaman dari posisi sebelumnya ke akhir JSON yang valid.
Catatan
Saat menyerap menggunakan pengalaman Dapatkan data, format defaultnya adalah multijson. Format ini dapat menangani rekaman JSON multibaris dan array rekaman JSON. Ketika kesalahan penguraian ditemui, seluruh file dibuang. Untuk mengabaikan rekaman JSON yang tidak valid, pilih opsi untuk "Abaikan kesalahan format data.", yang mengalihkan format ke json (Baris JSON).
Jika Anda menggunakan format Baris JSON (json), baris yang tidak mewakili rekaman JSON yang valid dilewati selama penguraian.
Penyerapan data berformat JSON mengharuskan Anda menentukan format menggunakan properti penyerapan. Penyerapan data JSON memerlukan pemetaan, yang memetakan entri sumber JSON ke kolom targetnya. Saat menyerap data, gunakan IngestionMapping properti dengan ingestionMappingReference properti penyerapan (untuk pemetaan yang telah ditentukan) atau propertinya IngestionMappings . Artikel ini menggunakan ingestionMappingReference properti penyerapan, yang telah ditentukan sebelumnya pada tabel yang digunakan untuk penyerapan. Dalam contoh berikut, kita mulai dengan menyerap rekaman JSON sebagai data mentah ke satu tabel kolom. Kemudian kita menggunakan pemetaan untuk menyerap setiap properti ke kolom yang dipetakan.
Contoh JSON sederhana
Contoh berikut adalah JSON sederhana, dengan struktur datar. Data memiliki informasi suhu dan kelembaban, yang dikumpulkan oleh beberapa perangkat. Setiap rekaman ditandai dengan ID dan tanda waktu.
{
"timestamp": "2019-05-02 15:23:50.0369439",
"deviceId": "2945c8aa-f13e-4c48-4473-b81440bb5ca2",
"messageId": "7f316225-839a-4593-92b5-1812949279b3",
"temperature": 31.0301639051317,
"humidity": 62.0791099602725
}
Menyerap catatan JSON mentah
Dalam contoh ini, Anda menyerap rekaman JSON sebagai data mentah ke satu tabel kolom. Manipulasi data, menggunakan kueri, dan kebijakan pembaruan dilakukan setelah data diserap.
Gunakan Bahasa Kueri Kusto untuk menyerap data dalam format JSON mentah.
Masuklah ke https://dataexplorer.azure.com.
Pilih Tambahkan kluster.
Dalam kotak dialog Tambahkan kluster , masukkan URL kluster Anda dalam formulir https://<ClusterName>.<Region>.kusto.windows.net/, lalu pilih Tambahkan.
Tempel dalam perintah berikut, dan pilih Jalankan untuk membuat tabel.
.create table RawEvents (Event: dynamic)
Kueri ini membuat tabel dengan satu Event kolom tipe data dinamis .
Buat pemetaan JSON.
.create table RawEvents ingestion json mapping 'RawEventMapping' '[{"column":"Event","Properties":{"path":"$"}}]'
Perintah ini membuat pemetaan, dan memetakan jalur $ akar JSON ke Event kolom.
Serap data ke RawEvents dalam tabel.
.ingest into table RawEvents ('https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json') with '{"format":"json", "ingestionMappingReference":"RawEventMapping"}'
Gunakan C# untuk menyerap data dalam format JSON mentah.
RawEvents Buat tabel.
var kustoUri = "https://<clusterName>.<region>.kusto.windows.net/";
var connectionStringBuilder = new KustoConnectionStringBuilder(kustoUri)
{
FederatedSecurity = true,
UserID = userId,
Password = password,
Authority = tenantId,
InitialCatalog = databaseName
};
using var kustoClient = KustoClientFactory.CreateCslAdminProvider(connectionStringBuilder);
var tableName = "RawEvents";
var command = CslCommandGenerator.GenerateTableCreateCommand(
tableName,
new[] { Tuple.Create("Events", "System.Object") }
);
await kustoClient.ExecuteControlCommandAsync(command);
Buat pemetaan JSON.
var tableMappingName = "RawEventMapping";
command = CslCommandGenerator.GenerateTableMappingCreateCommand(
IngestionMappingKind.Json,
tableName,
tableMappingName,
new ColumnMapping[]
{
new() { ColumnName = "Events", Properties = new Dictionary<string, string> { { "path", "$" } } }
}
);
await kustoClient.ExecuteControlCommandAsync(command);
Perintah ini membuat pemetaan, dan memetakan jalur $ akar JSON ke Event kolom.
Serap data ke RawEvents dalam tabel.
var ingestUri = "https://ingest-<clusterName>.<region>.kusto.windows.net/";
var ingestConnectionStringBuilder = new KustoConnectionStringBuilder(ingestUri)
{
FederatedSecurity = true,
UserID = userId,
Password = password,
Authority = tenantId,
InitialCatalog = databaseName
};
using var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestConnectionStringBuilder);
var blobPath = "https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json";
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
Format = DataSourceFormat.json,
IngestionMapping = new IngestionMapping { IngestionMappingReference = tableMappingName }
};
await ingestClient.IngestFromStorageAsync(blobPath, properties);
Catatan
Data dikumpulkan sesuai dengan kebijakan batching, menghasilkan latensi beberapa menit.
Gunakan Python untuk menyerap data dalam format JSON mentah.
RawEvents Buat tabel.
KUSTO_URI = "https://<ClusterName>.<Region>.kusto.windows.net/"
KCSB_DATA = KustoConnectionStringBuilder.with_aad_device_authentication(KUSTO_URI, AAD_TENANT_ID)
KUSTO_CLIENT = KustoClient(KCSB_DATA)
TABLE = "RawEvents"
CREATE_TABLE_COMMAND = ".create table " + TABLE + " (Events: dynamic)"
RESPONSE = KUSTO_CLIENT.execute_mgmt(DATABASE, CREATE_TABLE_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
Buat pemetaan JSON.
MAPPING = "RawEventMapping"
CREATE_MAPPING_COMMAND = ".create table " + TABLE + " ingestion json mapping '" + MAPPING + """' '[{"column":"Event","path":"$"}]'"""
RESPONSE = KUSTO_CLIENT.execute_mgmt(DATABASE, CREATE_MAPPING_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
Serap data ke RawEvents dalam tabel.
INGEST_URI = "https://ingest-<ClusterName>.<Region>.kusto.windows.net/"
KCSB_INGEST = KustoConnectionStringBuilder.with_aad_device_authentication(INGEST_URI, AAD_TENANT_ID)
INGESTION_CLIENT = KustoIngestClient(KCSB_INGEST)
BLOB_PATH = 'https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json'
INGESTION_PROPERTIES = IngestionProperties(database=DATABASE, table=TABLE, dataFormat=DataFormat.JSON, ingestion_mapping_reference=MAPPING)
BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)
INGESTION_CLIENT.ingest_from_blob(
BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)
Catatan
Data dikumpulkan sesuai dengan kebijakan batching, menghasilkan latensi beberapa menit.
Menyerap rekaman JSON yang dipetakan
Dalam contoh ini, Anda menyerap data rekaman JSON. Setiap properti JSON dipetakan ke satu kolom dalam tabel.
Buat tabel baru, dengan skema serupa dengan data input JSON. Kami menggunakan tabel ini untuk semua contoh dan perintah penyerapan berikut.
.create table Events (Time: datetime, Device: string, MessageId: string, Temperature: double, Humidity: double)
Buat pemetaan JSON.
.create table Events ingestion json mapping 'FlatEventMapping' '[{"column":"Time","Properties":{"path":"$.timestamp"}},{"column":"Device","Properties":{"path":"$.deviceId"}},{"column":"MessageId","Properties":{"path":"$.messageId"}},{"column":"Temperature","Properties":{"path":"$.temperature"}},{"column":"Humidity","Properties":{"path":"$.humidity"}}]'
Dalam pemetaan ini, seperti yang didefinisikan oleh skema tabel, timestamp entri diserap ke kolom Time sebagai datetime jenis data.
Serap data ke Events dalam tabel.
.ingest into table Events ('https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json') with '{"format":"json", "ingestionMappingReference":"FlatEventMapping"}'
File 'simple.json' memiliki beberapa rekaman JSON yang dipisahkan baris. Formatnya adalah json, dan pemetaan yang digunakan dalam perintah penyerapan adalah yang FlatEventMapping Anda buat.
Buat tabel baru, dengan skema serupa dengan data input JSON. Kami menggunakan tabel ini untuk semua contoh dan perintah penyerapan berikut.
var tableName = "Events";
var command = CslCommandGenerator.GenerateTableCreateCommand(
tableName,
new[]
{
Tuple.Create("Time", "System.DateTime"),
Tuple.Create("Device", "System.String"),
Tuple.Create("MessageId", "System.String"),
Tuple.Create("Temperature", "System.Double"),
Tuple.Create("Humidity", "System.Double")
}
);
await kustoClient.ExecuteControlCommandAsync(command);
Buat pemetaan JSON.
var tableMappingName = "FlatEventMapping";
command = CslCommandGenerator.GenerateTableMappingCreateCommand(
IngestionMappingKind.Json,
tableName,
tableMappingName,
new ColumnMapping[]
{
new() { ColumnName = "Time", Properties = new Dictionary<string, string> { { MappingConsts.Path, "$.timestamp" } } },
new() { ColumnName = "Device", Properties = new Dictionary<string, string> { { MappingConsts.Path, "$.deviceId" } } },
new() { ColumnName = "MessageId", Properties = new Dictionary<string, string> { { MappingConsts.Path, "$.messageId" } } },
new() { ColumnName = "Temperature", Properties = new Dictionary<string, string> { { MappingConsts.Path, "$.temperature" } } },
new() { ColumnName = "Humidity", Properties = new Dictionary<string, string> { { MappingConsts.Path, "$.humidity" } } }
}
);
await kustoClient.ExecuteControlCommandAsync(command);
Dalam pemetaan ini, seperti yang didefinisikan oleh skema tabel, timestamp entri diserap ke kolom Time sebagai datetime jenis data.
Serap data ke Events dalam tabel.
var blobPath = "https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json";
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
Format = DataSourceFormat.json,
IngestionMapping = new IngestionMapping { IngestionMappingReference = tableMappingName }
};
await ingestClient.IngestFromStorageAsync(blobPath, properties).ConfigureAwait(false);
File 'simple.json' memiliki beberapa rekaman JSON yang dipisahkan baris. Formatnya adalah json, dan pemetaan yang digunakan dalam perintah penyerapan adalah yang FlatEventMapping Anda buat.
Buat tabel baru, dengan skema serupa dengan data input JSON. Kami menggunakan tabel ini untuk semua contoh dan perintah penyerapan berikut.
TABLE = "Events"
CREATE_TABLE_COMMAND = ".create table " + TABLE + " (Time: datetime, Device: string, MessageId: string, Temperature: double, Humidity: double)"
RESPONSE = KUSTO_CLIENT.execute_mgmt(DATABASE, CREATE_TABLE_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
Buat pemetaan JSON.
MAPPING = "FlatEventMapping"
CREATE_MAPPING_COMMAND = ".create table Events ingestion json mapping '" + MAPPING + """' '[{"column":"Time","Properties":{"path":"$.timestamp"}},{"column":"Device","Properties":{"path":"$.deviceId"}},{"column":"MessageId","Properties":{"path":"$.messageId"}},{"column":"Temperature","Properties":{"path":"$.temperature"}},{"column":"Humidity","Properties":{"path":"$.humidity"}}]'"""
RESPONSE = KUSTO_CLIENT.execute_mgmt(DATABASE, CREATE_MAPPING_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
Serap data ke Events dalam tabel.
BLOB_PATH = 'https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json'
INGESTION_PROPERTIES = IngestionProperties(database=DATABASE, table=TABLE, dataFormat=DataFormat.JSON, ingestion_mapping_reference=MAPPING)
BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)
INGESTION_CLIENT.ingest_from_blob(
BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)
File 'simple.json' memiliki beberapa baris rekaman JSON yang dipisahkan. Formatnya adalah json, dan pemetaan yang digunakan dalam perintah penyerapan adalah yang FlatEventMapping Anda buat.
Menyerap rekaman JSON multibaris
Dalam contoh ini, Anda menyerap rekaman JSON multibaris. Setiap properti JSON dipetakan ke satu kolom dalam tabel. File 'multilined.json' memiliki beberapa rekaman JSON yang terindentasi. Format multijson menunjukkan untuk membaca rekaman berdasarkan struktur JSON.
Serap data ke Events dalam tabel.
.ingest into table Events ('https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/multilined.json') with '{"format":"multijson", "ingestionMappingReference":"FlatEventMapping"}'
Serap data ke Events dalam tabel.
var tableMappingName = "FlatEventMapping";
var blobPath = "https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/multilined.json";
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
Format = DataSourceFormat.multijson,
IngestionMapping = new IngestionMapping { IngestionMappingReference = tableMappingName }
};
await ingestClient.IngestFromStorageAsync(blobPath, properties).ConfigureAwait(false);
Serap data ke Events dalam tabel.
MAPPING = "FlatEventMapping"
BLOB_PATH = 'https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/multilined.json'
INGESTION_PROPERTIES = IngestionProperties(database=DATABASE, table=TABLE, dataFormat=DataFormat.MULTIJSON, ingestion_mapping_reference=MAPPING)
BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)
INGESTION_CLIENT.ingest_from_blob(
BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)
Menyerap rekaman JSON yang berisi array
Jenis data array adalah kumpulan nilai yang diurutkan. Penyerapan array JSON dilakukan oleh kebijakan pembaruan. JSON diserap apa adanya ke tabel perantara. Kebijakan pembaruan menjalankan fungsi yang telah ditentukan sebelumnya pada RawEvents tabel, menyerap kembali hasilnya ke tabel target. Kami menyerap data dengan struktur berikut:
{
"records":
[
{
"timestamp": "2019-05-02 15:23:50.0000000",
"deviceId": "ddbc1bf5-096f-42c0-a771-bc3dca77ac71",
"messageId": "7f316225-839a-4593-92b5-1812949279b3",
"temperature": 31.0301639051317,
"humidity": 62.0791099602725
},
{
"timestamp": "2019-05-02 15:23:51.0000000",
"deviceId": "ddbc1bf5-096f-42c0-a771-bc3dca77ac71",
"messageId": "57de2821-7581-40e4-861e-ea3bde102364",
"temperature": 33.7529423105311,
"humidity": 75.4787976739364
}
]
}
update policy Buat fungsi yang memperluas kumpulan records sehingga setiap nilai dalam koleksi menerima baris terpisah, menggunakan mv-expand operator. Kami menggunakan tabel RawEvents sebagai tabel sumber dan Events sebagai tabel target.
.create function EventRecordsExpand() {
RawEvents
| mv-expand records = Event.records
| project
Time = todatetime(records["timestamp"]),
Device = tostring(records["deviceId"]),
MessageId = tostring(records["messageId"]),
Temperature = todouble(records["temperature"]),
Humidity = todouble(records["humidity"])
}
Skema yang diterima oleh fungsi harus cocok dengan skema tabel target. Gunakan getschema operator untuk meninjau skema.
EventRecordsExpand() | getschema
Tambahkan kebijakan pembaruan ke tabel target. Kebijakan ini secara otomatis menjalankan kueri pada data yang baru diserap dalam RawEvents tabel perantara dan menyerap hasilnya ke Events dalam tabel. Tentukan kebijakan retensi nol untuk menghindari bertahannya tabel perantara.
.alter table Events policy update @'[{"Source": "RawEvents", "Query": "EventRecordsExpand()", "IsEnabled": "True"}]'
Serap data ke RawEvents dalam tabel.
.ingest into table RawEvents ('https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/array.json') with '{"format":"multijson", "ingestionMappingReference":"RawEventMapping"}'
Tinjau data dalam Events tabel.
Events
Buat fungsi pembaruan yang memperluas kumpulan records sehingga setiap nilai dalam koleksi menerima baris terpisah, menggunakan mv-expand operator. Kami menggunakan tabel RawEvents sebagai tabel sumber dan Events sebagai tabel target.
var command = CslCommandGenerator.GenerateCreateFunctionCommand(
"EventRecordsExpand",
"UpdateFunctions",
string.Empty,
null,
@"RawEvents
| mv-expand records = Event
| project
Time = todatetime(records['timestamp']),
Device = tostring(records['deviceId']),
MessageId = tostring(records['messageId']),
Temperature = todouble(records['temperature']),
Humidity = todouble(records['humidity'])",
ifNotExists: false
);
await kustoClient.ExecuteControlCommandAsync(command);
Catatan
Skema yang diterima oleh fungsi harus cocok dengan skema tabel target.
Tambahkan kebijakan pembaruan ke tabel target. Kebijakan ini secara otomatis menjalankan kueri pada data yang baru diserap dalam RawEvents tabel perantara dan menyerap hasilnya ke Events dalam tabel. Tentukan kebijakan retensi nol untuk menghindari bertahannya tabel perantara.
command = ".alter table Events policy update @'[{'Source': 'RawEvents', 'Query': 'EventRecordsExpand()', 'IsEnabled': 'True'}]";
await kustoClient.ExecuteControlCommandAsync(command);
Serap data ke RawEvents dalam tabel.
var blobPath = "https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/array.json";
var tableName = "RawEvents";
var tableMappingName = "RawEventMapping";
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
Format = DataSourceFormat.multijson,
IngestionMapping = new IngestionMapping { IngestionMappingReference = tableMappingName }
};
await ingestClient.IngestFromStorageAsync(blobPath, properties);
Tinjau data dalam Events tabel.
Buat fungsi pembaruan yang memperluas kumpulan records sehingga setiap nilai dalam koleksi menerima baris terpisah, menggunakan mv-expand operator. Kami menggunakan tabel RawEvents sebagai tabel sumber dan Events sebagai tabel target.
CREATE_FUNCTION_COMMAND =
'''.create function EventRecordsExpand() {
RawEvents
| mv-expand records = Event
| project
Time = todatetime(records["timestamp"]),
Device = tostring(records["deviceId"]),
MessageId = tostring(records["messageId"]),
Temperature = todouble(records["temperature"]),
Humidity = todouble(records["humidity"])
}'''
RESPONSE = KUSTO_CLIENT.execute_mgmt(DATABASE, CREATE_FUNCTION_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
Catatan
Skema yang diterima oleh fungsi harus sesuai dengan skema tabel target.
Tambahkan kebijakan pembaruan ke tabel target. Kebijakan ini secara otomatis menjalankan kueri pada data yang baru diserap dalam RawEvents tabel perantara dan menyerap hasilnya ke Events dalam tabel. Tentukan kebijakan retensi nol untuk menghindari bertahannya tabel perantara.
CREATE_UPDATE_POLICY_COMMAND =
""".alter table Events policy update @'[{'Source': 'RawEvents', 'Query': 'EventRecordsExpand()', 'IsEnabled': 'True'}]"""
RESPONSE = KUSTO_CLIENT.execute_mgmt(DATABASE, CREATE_UPDATE_POLICY_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
Serap data ke RawEvents dalam tabel.
TABLE = "RawEvents"
MAPPING = "RawEventMapping"
BLOB_PATH = 'https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/array.json'
INGESTION_PROPERTIES = IngestionProperties(database=DATABASE, table=TABLE, dataFormat=DataFormat.MULTIJSON, ingestion_mapping_reference=MAPPING)
BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)
INGESTION_CLIENT.ingest_from_blob(
BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)
Tinjau data dalam Events tabel.
Konten terkait