本文內容
本文說明如何將 JSON 格式化的數據內嵌至 Azure Data Explorer 資料庫。 您將從原始和對應的 JSON 的簡單範例開始,繼續多行 JSON,然後處理包含數位和字典的更複雜的 JSON 架構。 這些範例詳細說明使用 Kusto 查詢語言 (KQL) 、C# 或 Python 擷取 JSON 格式化數據的程式。
必要條件
Microsoft 帳戶或 Microsoft Entra 使用者身分識別。 不需要 Azure 訂用帳戶。
Azure 資料總管叢集和資料庫。
建立叢集和資料庫 。
Azure Data Explorer 支援兩種 JSON 檔案格式:
json
:以行分隔的 JSON。 輸入數據中的每個行都只有一個 JSON 記錄。 此格式支援剖析批注和單引號屬性。 如需詳細資訊,請參閱 JSON 行 。
multijson
:多行 JSON。 剖析器會忽略行分隔符,並將前一個位置的記錄讀取到有效 JSON 的結尾。
注意
使用擷 取精靈擷取 時,預設格式為 multijson
。 格式可以處理多行 JSON 記錄和 JSON 記錄的陣列。 遇到剖析錯誤時,會捨棄整個檔案。 若要忽略無效的 JSON 記錄,請選取 [忽略數據格式錯誤] 選項,這會將格式切換為 json
(JSON Line) 。
如果您使用 JSON Line 格式 () json
,則剖析期間會略過不代表有效 JSON 記錄的行。
擷取 JSON 格式化數據需要您使用擷取屬性 來指定格式 。 擷取 JSON 數據需要 對應 ,這會將 JSON 來源項目對應至其目標數據行。 擷取數據時,請使用 IngestionMapping
屬性搭配其 ingestionMappingReference
(預先定義的對應) 擷取屬性或其 IngestionMappings
屬性。 本文將使用 ingestionMappingReference
擷取屬性,該屬性是在用於擷取的數據表上預先定義。 在下列範例中,我們將先將 JSON 記錄擷取為原始數據到單一數據行數據表。 然後,我們將使用 對應,將每個屬性內嵌至其對應的數據行。
簡單 JSON 範例
下列範例是具有一般結構的簡單 JSON。 數據具有數個裝置所收集的溫度和濕度資訊。 每個記錄都會以標識碼和時間戳標示。
{
"timestamp": "2019-05-02 15:23:50.0369439",
"deviceId": "2945c8aa-f13e-4c48-4473-b81440bb5ca2",
"messageId": "7f316225-839a-4593-92b5-1812949279b3",
"temperature": 31.0301639051317,
"humidity": 62.0791099602725
}
擷取原始 JSON 記錄
在此範例中,您會將 JSON 記錄擷取為原始數據至單一數據行數據表。 數據擷取之後,就會執行數據操作、使用查詢和更新原則。
使用 Kusto 查詢語言 以原始 JSON 格式 內嵌數據。
登入 https://dataexplorer.azure.com 。
選取 [新增叢集]。
在 [新增叢集] 對話方塊中,以 https://<ClusterName>.<Region>.kusto.windows.net/
格式輸入您的叢集 URL,然後選取 [新增] 。
貼入下列命令,並選取 [執行] 以建立資料表。
.create table RawEvents (Event: dynamic)
此查詢會建立具有動態 數據類型之單Event
一數據行的數據表。
建立 JSON 對應。
.create table RawEvents ingestion json mapping 'RawEventMapping' '[{"column":"Event","Properties":{"path":"$"}}]'
此命令會建立對應,並將 JSON 根路徑 $
對應至數據 Event
行。
將數據內嵌到 RawEvents
資料表中。
.ingest into table RawEvents ('https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json') with '{"format":"json", "ingestionMappingReference":"RawEventMapping"}'
使用 C# 以原始 JSON 格式 內嵌資料。
RawEvents
建立數據表。
var kustoUri = "https://<clusterName>.<region>.kusto.windows.net/";
var connectionStringBuilder = new KustoConnectionStringBuilder(kustoUri)
{
FederatedSecurity = true,
UserID = userId,
Password = password,
Authority = tenantId,
InitialCatalog = databaseName
};
using var kustoClient = KustoClientFactory.CreateCslAdminProvider(connectionStringBuilder);
var tableName = "RawEvents";
var command = CslCommandGenerator.GenerateTableCreateCommand(
tableName,
new[] { Tuple.Create("Events", "System.Object") }
);
await kustoClient.ExecuteControlCommandAsync(command);
建立 JSON 對應。
var tableMappingName = "RawEventMapping";
command = CslCommandGenerator.GenerateTableMappingCreateCommand(
IngestionMappingKind.Json,
tableName,
tableMappingName,
new ColumnMapping[]
{
new() { ColumnName = "Events", Properties = new Dictionary<string, string> { { "path", "$" } } }
}
);
await kustoClient.ExecuteControlCommandAsync(command);
此命令會建立對應,並將 JSON 根路徑 $
對應至數據 Event
行。
將數據內嵌到 RawEvents
資料表中。
var ingestUri = "https://ingest-<clusterName>.<region>.kusto.windows.net/";
var ingestConnectionStringBuilder = new KustoConnectionStringBuilder(ingestUri)
{
FederatedSecurity = true,
UserID = userId,
Password = password,
Authority = tenantId,
InitialCatalog = databaseName
};
using var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestConnectionStringBuilder);
var blobPath = "https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json";
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
Format = DataSourceFormat.json,
IngestionMapping = new IngestionMapping { IngestionMappingReference = tableMappingName }
};
await ingestClient.IngestFromStorageAsync(blobPath, properties);
注意
數據會根據 批處理原則 進行匯總,導致幾分鐘的延遲。
使用 Python 以原始 JSON 格式 內嵌數據。
RawEvents
建立數據表。
KUSTO_URI = "https://<ClusterName>.<Region>.kusto.windows.net/"
KCSB_DATA = KustoConnectionStringBuilder.with_aad_device_authentication(KUSTO_URI, AAD_TENANT_ID)
KUSTO_CLIENT = KustoClient(KCSB_DATA)
TABLE = "RawEvents"
CREATE_TABLE_COMMAND = ".create table " + TABLE + " (Events: dynamic)"
RESPONSE = KUSTO_CLIENT.execute_mgmt(DATABASE, CREATE_TABLE_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
建立 JSON 對應。
MAPPING = "RawEventMapping"
CREATE_MAPPING_COMMAND = ".create table " + TABLE + " ingestion json mapping '" + MAPPING + """' '[{"column":"Event","path":"$"}]'"""
RESPONSE = KUSTO_CLIENT.execute_mgmt(DATABASE, CREATE_MAPPING_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
將數據內嵌到 RawEvents
資料表中。
INGEST_URI = "https://ingest-<ClusterName>.<Region>.kusto.windows.net/"
KCSB_INGEST = KustoConnectionStringBuilder.with_aad_device_authentication(INGEST_URI, AAD_TENANT_ID)
INGESTION_CLIENT = KustoIngestClient(KCSB_INGEST)
BLOB_PATH = 'https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json'
INGESTION_PROPERTIES = IngestionProperties(database=DATABASE, table=TABLE, dataFormat=DataFormat.JSON, ingestion_mapping_reference=MAPPING)
BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)
INGESTION_CLIENT.ingest_from_blob(
BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)
注意
數據會根據 批處理原則 進行匯總,導致幾分鐘的延遲。
內嵌對應的 JSON 記錄
在此範例中,您會內嵌 JSON 記錄數據。 每個 JSON 屬性都會對應至數據表中的單一數據行。
建立新的數據表,其架構與 JSON 輸入數據類似。 我們將針對下列所有範例和擷取命令使用此資料表。
.create table Events (Time: datetime, Device: string, MessageId: string, Temperature: double, Humidity: double)
建立 JSON 對應。
.create table Events ingestion json mapping 'FlatEventMapping' '[{"column":"Time","Properties":{"path":"$.timestamp"}},{"column":"Device","Properties":{"path":"$.deviceId"}},{"column":"MessageId","Properties":{"path":"$.messageId"}},{"column":"Temperature","Properties":{"path":"$.temperature"}},{"column":"Humidity","Properties":{"path":"$.humidity"}}]'
在此對應中,如數據表架構所定義,專案 timestamp
會內嵌至數據行 Time
作為 datetime
數據類型。
將數據內嵌到 Events
資料表中。
.ingest into table Events ('https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json') with '{"format":"json", "ingestionMappingReference":"FlatEventMapping"}'
檔案 'simple.json' 有一些以行分隔的 JSON 記錄。 格式為 json
,而擷取命令中使用的對應就是 FlatEventMapping
您所建立的。
建立新的數據表,其架構與 JSON 輸入數據類似。 我們將針對下列所有範例和擷取命令使用此資料表。
var tableName = "Events";
var command = CslCommandGenerator.GenerateTableCreateCommand(
tableName,
new[]
{
Tuple.Create("Time", "System.DateTime"),
Tuple.Create("Device", "System.String"),
Tuple.Create("MessageId", "System.String"),
Tuple.Create("Temperature", "System.Double"),
Tuple.Create("Humidity", "System.Double")
}
);
await kustoClient.ExecuteControlCommandAsync(command);
建立 JSON 對應。
var tableMappingName = "FlatEventMapping";
command = CslCommandGenerator.GenerateTableMappingCreateCommand(
IngestionMappingKind.Json,
tableName,
tableMappingName,
new ColumnMapping[]
{
new() { ColumnName = "Time", Properties = new Dictionary<string, string> { { MappingConsts.Path, "$.timestamp" } } },
new() { ColumnName = "Device", Properties = new Dictionary<string, string> { { MappingConsts.Path, "$.deviceId" } } },
new() { ColumnName = "MessageId", Properties = new Dictionary<string, string> { { MappingConsts.Path, "$.messageId" } } },
new() { ColumnName = "Temperature", Properties = new Dictionary<string, string> { { MappingConsts.Path, "$.temperature" } } },
new() { ColumnName = "Humidity", Properties = new Dictionary<string, string> { { MappingConsts.Path, "$.humidity" } } }
}
);
await kustoClient.ExecuteControlCommandAsync(command);
在此對應中,如數據表架構所定義,專案 timestamp
會內嵌至數據行 Time
作為 datetime
數據類型。
將數據內嵌到 Events
資料表中。
var blobPath = "https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json";
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
Format = DataSourceFormat.json,
IngestionMapping = new IngestionMapping { IngestionMappingReference = tableMappingName }
};
await ingestClient.IngestFromStorageAsync(blobPath, properties).ConfigureAwait(false);
檔案 'simple.json' 有一些以行分隔的 JSON 記錄。 格式為 json
,而擷取命令中使用的對應就是 FlatEventMapping
您所建立的。
建立新的數據表,其架構與 JSON 輸入數據類似。 我們將針對下列所有範例和擷取命令使用此資料表。
TABLE = "Events"
CREATE_TABLE_COMMAND = ".create table " + TABLE + " (Time: datetime, Device: string, MessageId: string, Temperature: double, Humidity: double)"
RESPONSE = KUSTO_CLIENT.execute_mgmt(DATABASE, CREATE_TABLE_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
建立 JSON 對應。
MAPPING = "FlatEventMapping"
CREATE_MAPPING_COMMAND = ".create table Events ingestion json mapping '" + MAPPING + """' '[{"column":"Time","Properties":{"path":"$.timestamp"}},{"column":"Device","Properties":{"path":"$.deviceId"}},{"column":"MessageId","Properties":{"path":"$.messageId"}},{"column":"Temperature","Properties":{"path":"$.temperature"}},{"column":"Humidity","Properties":{"path":"$.humidity"}}]'"""
RESPONSE = KUSTO_CLIENT.execute_mgmt(DATABASE, CREATE_MAPPING_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
將數據內嵌到 Events
資料表中。
BLOB_PATH = 'https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json'
INGESTION_PROPERTIES = IngestionProperties(database=DATABASE, table=TABLE, dataFormat=DataFormat.JSON, ingestion_mapping_reference=MAPPING)
BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)
INGESTION_CLIENT.ingest_from_blob(
BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)
檔案 'simple.json' 有幾行分隔的 JSON 記錄。 格式為 json
,而擷取命令中使用的對應就是 FlatEventMapping
您所建立的。
內嵌多行 JSON 記錄
在此範例中,您會內嵌多行 JSON 記錄。 每個 JSON 屬性都會對應至數據表中的單一數據行。 檔案 'multilined.json' 有一些縮排的 JSON 記錄。 格式 multijson
表示要依 JSON 結構讀取記錄。
將數據擷取到數據表中 Events
。
.ingest into table Events ('https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/multilined.json') with '{"format":"multijson", "ingestionMappingReference":"FlatEventMapping"}'
將數據擷取到數據表中 Events
。
var tableMappingName = "FlatEventMapping";
var blobPath = "https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/multilined.json";
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
Format = DataSourceFormat.multijson,
IngestionMapping = new IngestionMapping { IngestionMappingReference = tableMappingName }
};
await ingestClient.IngestFromStorageAsync(blobPath, properties).ConfigureAwait(false);
將數據擷取到數據表中 Events
。
MAPPING = "FlatEventMapping"
BLOB_PATH = 'https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/multilined.json'
INGESTION_PROPERTIES = IngestionProperties(database=DATABASE, table=TABLE, dataFormat=DataFormat.MULTIJSON, ingestion_mapping_reference=MAPPING)
BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)
INGESTION_CLIENT.ingest_from_blob(
BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)
內嵌包含數位的 JSON 記錄
陣列資料類型是已排序的值集合。 JSON 陣組的擷取是由 更新原則 完成。 JSON 會依原狀內嵌至中繼數據表。 更新原則會在 RawEvents
數據表上執行預先定義的函式,並將結果重新內嵌至目標數據表。 我們將使用下列結構內嵌資料:
{
"records":
[
{
"timestamp": "2019-05-02 15:23:50.0000000",
"deviceId": "ddbc1bf5-096f-42c0-a771-bc3dca77ac71",
"messageId": "7f316225-839a-4593-92b5-1812949279b3",
"temperature": 31.0301639051317,
"humidity": 62.0791099602725
},
{
"timestamp": "2019-05-02 15:23:51.0000000",
"deviceId": "ddbc1bf5-096f-42c0-a771-bc3dca77ac71",
"messageId": "57de2821-7581-40e4-861e-ea3bde102364",
"temperature": 33.7529423105311,
"humidity": 75.4787976739364
}
]
}
建立 update policy
函式,此函式會展開的 records
集合,讓集合中的每個值都使用 mv-expand
運算元接收個別的數據列。 我們將使用數據表 RawEvents
作為源數據表和 Events
目標數據表。
.create function EventRecordsExpand() {
RawEvents
| mv-expand records = Event.records
| project
Time = todatetime(records["timestamp"]),
Device = tostring(records["deviceId"]),
MessageId = tostring(records["messageId"]),
Temperature = todouble(records["temperature"]),
Humidity = todouble(records["humidity"])
}
函式收到的架構必須符合目標數據表的架構。 使用 getschema
運算子來檢閱架構。
EventRecordsExpand() | getschema
將更新原則新增至目標資料表。 此原則會自動對中繼數據表中 RawEvents
任何新內嵌的數據執行查詢,並將結果擷取到 Events
數據表中。 定義零保留原則,以避免保存中繼數據表。
.alter table Events policy update @'[{"Source": "RawEvents", "Query": "EventRecordsExpand()", "IsEnabled": "True"}]'
將數據擷取到數據表中 RawEvents
。
.ingest into table RawEvents ('https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/array.json') with '{"format":"multijson", "ingestionMappingReference":"RawEventMapping"}'
檢閱數據表中的數據 Events
。
Events
建立更新函式,此函式會展開的 records
集合,讓集合中的每個值都使用 mv-expand
運算元接收個別的數據列。 我們將使用數據表 RawEvents
作為源數據表和 Events
目標數據表。
var command = CslCommandGenerator.GenerateCreateFunctionCommand(
"EventRecordsExpand",
"UpdateFunctions",
string.Empty,
null,
@"RawEvents
| mv-expand records = Event
| project
Time = todatetime(records['timestamp']),
Device = tostring(records['deviceId']),
MessageId = tostring(records['messageId']),
Temperature = todouble(records['temperature']),
Humidity = todouble(records['humidity'])",
ifNotExists: false
);
await kustoClient.ExecuteControlCommandAsync(command);
將更新原則新增至目標資料表。 此原則會自動對中繼數據表中 RawEvents
任何新內嵌的數據執行查詢,並將其結果內嵌至 Events
數據表。 定義零保留原則,以避免保存中繼數據表。
command = ".alter table Events policy update @'[{'Source': 'RawEvents', 'Query': 'EventRecordsExpand()', 'IsEnabled': 'True'}]";
await kustoClient.ExecuteControlCommandAsync(command);
將數據擷取到數據表中 RawEvents
。
var blobPath = "https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/array.json";
var tableName = "RawEvents";
var tableMappingName = "RawEventMapping";
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
Format = DataSourceFormat.multijson,
IngestionMapping = new IngestionMapping { IngestionMappingReference = tableMappingName }
};
await ingestClient.IngestFromStorageAsync(blobPath, properties);
檢閱數據表中的數據 Events
。
建立更新函式,此函式會展開的 records
集合,讓集合中的每個值都使用 mv-expand
運算元接收個別的數據列。 我們將使用數據表 RawEvents
作為源數據表和 Events
目標數據表。
CREATE_FUNCTION_COMMAND =
'''.create function EventRecordsExpand() {
RawEvents
| mv-expand records = Event
| project
Time = todatetime(records["timestamp"]),
Device = tostring(records["deviceId"]),
MessageId = tostring(records["messageId"]),
Temperature = todouble(records["temperature"]),
Humidity = todouble(records["humidity"])
}'''
RESPONSE = KUSTO_CLIENT.execute_mgmt(DATABASE, CREATE_FUNCTION_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
將更新原則新增至目標資料表。 此原則會自動對中繼數據表中 RawEvents
任何新內嵌的數據執行查詢,並將其結果內嵌至 Events
數據表。 定義零保留原則,以避免保存中繼數據表。
CREATE_UPDATE_POLICY_COMMAND =
""".alter table Events policy update @'[{'Source': 'RawEvents', 'Query': 'EventRecordsExpand()', 'IsEnabled': 'True'}]"""
RESPONSE = KUSTO_CLIENT.execute_mgmt(DATABASE, CREATE_UPDATE_POLICY_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
將數據擷取到數據表中 RawEvents
。
TABLE = "RawEvents"
MAPPING = "RawEventMapping"
BLOB_PATH = 'https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/array.json'
INGESTION_PROPERTIES = IngestionProperties(database=DATABASE, table=TABLE, dataFormat=DataFormat.MULTIJSON, ingestion_mapping_reference=MAPPING)
BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)
INGESTION_CLIENT.ingest_from_blob(
BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)
檢閱數據表中的數據 Events
。
相關內容