使用 Kusto JAVA SDK 內嵌資料
Azure 資料總管是一項快速又可高度調整的資料探索服務,可用於處理記錄和遙測資料。 JAVA 用戶端程式庫可用來內嵌資料、發出管理命令,以及查詢 Azure Data Explorer叢集中的資料。
在本文中,瞭解如何使用 Azure Data Explorer JAVA 程式庫內嵌資料。 首先,您將在測試叢集中建立資料表和資料對應。 然後,您將使用 JAVA SDK 將 Blob 儲存體的擷取排入佇列到叢集,並驗證結果。
必要條件
- Microsoft 帳戶或Microsoft Entra使用者身分識別。 不需要 Azure 訂用帳戶。
- Azure 資料總管叢集和資料庫。 建立叢集和資料庫。
- Git。
- JDK 1.8 版或更新版本。
- Maven.
- 建立 應用程式註冊,並將其許可權授與資料庫。 儲存用戶端識別碼和用戶端密碼以供稍後使用。
檢閱程式碼
此為選擇性區段。 檢閱下列程式碼片段,以瞭解程式碼的運作方式。 若要略過本節,請移至 執行應用程式。
驗證
程式會搭配 ConnectionStringBuilder' 使用Microsoft Entra驗證認證。
com.microsoft.azure.kusto.data.Client
建立 查詢和管理的 。static Client getClient() throws Exception { ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(endpoint, clientID, clientSecret, tenantID); return ClientFactory.createClient(csb); }
建立並使用
com.microsoft.azure.kusto.ingest.IngestClient
將資料擷取排入 Azure Data Explorer:static IngestClient getIngestionClient() throws Exception { String ingestionEndpoint = "https://ingest-" + URI.create(endpoint).getHost(); ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(ingestionEndpoint, clientID, clientSecret); return IngestClientFactory.createClient(csb); }
管理命令
管理命令,例如 .drop
和 .create
,是藉由呼叫 execute
com.microsoft.azure.kusto.data.Client
物件來執行。
例如, StormEvents
資料表的建立方式如下:
static final String createTableCommand = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)";
static void createTable(String database) {
try {
getClient().execute(database, createTableCommand);
System.out.println("Table created");
} catch (Exception e) {
System.out.println("Failed to create table: " + e.getMessage());
return;
}
}
資料擷取
使用來自現有Azure Blob 儲存體容器的檔案進行佇列擷取。
- 使用
BlobSourceInfo
來指定 Blob 儲存體路徑。 - 用來
IngestionProperties
定義資料表、資料庫、對應名稱和資料類型。 在下列範例中,資料類型為CSV
。
...
static final String blobPathFormat = "https://%s.blob.core.windows.net/%s/%s%s";
static final String blobStorageAccountName = "kustosamples";
static final String blobStorageContainer = "samplefiles";
static final String fileName = "StormEvents.csv";
static final String blobStorageToken = ""; //If relevant add SAS token
....
static void ingestFile(String database) throws InterruptedException {
String blobPath = String.format(blobPathFormat, blobStorageAccountName, blobStorageContainer,
fileName, blobStorageToken);
BlobSourceInfo blobSourceInfo = new BlobSourceInfo(blobPath);
IngestionProperties ingestionProperties = new IngestionProperties(database, tableName);
ingestionProperties.setDataFormat(DATA_FORMAT.csv);
ingestionProperties.setIngestionMapping(ingestionMappingRefName, IngestionMappingKind.Csv);
ingestionProperties.setReportLevel(IngestionReportLevel.FailuresAndSuccesses);
ingestionProperties.setReportMethod(IngestionReportMethod.QueueAndTable);
....
擷取進程會在個別的執行緒中啟動,而 main
執行緒會等候擷取執行緒完成。 此程式使用 CountdownLatch。 擷取 API (IngestClient#ingestFromBlob
) 不是非同步。 while
迴圈用來每隔 5 秒輪詢目前狀態,並等候擷取狀態從 Pending
變更為不同的狀態。 最終狀態可以是 Succeeded
、 Failed
或 PartiallySucceeded
。
....
CountDownLatch ingestionLatch = new CountDownLatch(1);
new Thread(new Runnable() {
@Override
public void run() {
IngestionResult result = null;
try {
result = getIngestionClient().ingestFromBlob(blobSourceInfo, ingestionProperties);
} catch (Exception e) {
ingestionLatch.countDown();
}
try {
IngestionStatus status = result.getIngestionStatusCollection().get(0);
while (status.status == OperationStatus.Pending) {
Thread.sleep(5000);
status = result.getIngestionStatusCollection().get(0);
}
ingestionLatch.countDown();
} catch (Exception e) {
ingestionLatch.countDown();
}
}
}).start();
ingestionLatch.await();
}
提示
還有其他方法可以非同步處理不同應用程式的擷取。 例如,您可以使用 CompletableFuture
來建立管線來定義動作後擷取,例如查詢資料表,或處理回報給 的 IngestionStatus
例外狀況。
執行應用程式
一般
當您執行範例程式碼時,會執行下列動作:
- 卸載資料表:
StormEvents
如果資料表存在) ,則會將其卸載 (。 - 資料表建立:
StormEvents
建立資料表。 - 對應建立:
StormEvents_CSV_Mapping
建立對應。 - 檔案擷取:Azure Blob 儲存體) 中的 CSV 檔案 (已排入佇列以進行擷取。
下列範例程式碼來自 App.java
:
public static void main(final String[] args) throws Exception {
dropTable(database);
createTable(database);
createMapping(database);
ingestFile(database);
}
提示
若要嘗試不同的作業組合,請取消批註/批註 中的 App.java
個別方法。
執行應用程式
從 GitHub 複製範例程式碼:
git clone https://github.com/Azure-Samples/azure-data-explorer-java-sdk-ingest.git cd azure-data-explorer-java-sdk-ingest
使用下列資訊來設定服務主體資訊,作為程式所使用的環境變數:
- 叢集端點
- 資料庫名稱
export AZURE_SP_CLIENT_ID="<replace with appID>" export AZURE_SP_CLIENT_SECRET="<replace with password>" export KUSTO_ENDPOINT="https://<cluster name>.<azure region>.kusto.windows.net" export KUSTO_DB="name of the database"
建置並執行:
mvn clean package java -jar target/adx-java-ingest-jar-with-dependencies.jar
輸出將類似於:
Table dropped Table created Mapping created Waiting for ingestion to complete...
等候幾分鐘,擷取程式才能完成。 成功完成之後,您會看到下列記錄訊息: Ingestion completed successfully
。 此時您可以結束程式,並移至下一個步驟,而不會影響已排入佇列的擷取程式。
驗證
等候五到 10 分鐘,讓佇列擷取排程擷取程式,並將資料載入 Azure Data Explorer。
登入 https://dataexplorer.azure.com,並連線至您的叢集。
執行下列命令以取得資料表中的
StormEvents
記錄計數:StormEvents | count
疑難排解
若要查看過去四小時內的擷取失敗,請在您的資料庫上執行下列命令:
.show ingestion failures | where FailedOn > ago(4h) and Database == "<DatabaseName>"
若要檢視過去四小時內所有擷取作業的狀態,請執行下列命令:
.show operations | where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull" | summarize arg_max(LastUpdatedOn, *) by OperationId
清除資源
如果您不打算使用您所建立的資源,請在資料庫中執行下列命令來卸載 StormEvents
資料表。
.drop table StormEvents
相關內容
意見反應
https://aka.ms/ContentUserFeedback。
即將登場:在 2024 年,我們將逐步淘汰 GitHub 問題作為內容的意見反應機制,並將它取代為新的意見反應系統。 如需詳細資訊,請參閱:提交並檢視相關的意見反應