Ingestování dat pomocí sady Kusto Java SDK
Průzkumník dat Azure je rychlá a vysoce škálovatelná služba pro zkoumání dat protokolů a telemetrie. Klientskou knihovnu Java je možné použít k ingestování dat, příkazům pro správu problémů a dotazování dat v clusterech Azure Data Explorer.
V tomto článku se dozvíte, jak ingestovat data pomocí knihovny Azure Data Explorer Java. Nejprve vytvoříte tabulku a mapování dat v testovacím clusteru. Pak pomocí sady Java SDK za frontu zasadíte příjem dat z úložiště objektů blob do clusteru a ověříte výsledky.
Požadavky
- Účet Microsoft nebo identita uživatele Microsoft Entra. Předplatné Azure se nevyžaduje.
- Cluster a databáze Azure Data Explorer. Vytvořte cluster a databázi.
- Git.
- JDK verze 1.8 nebo novější.
- Maven.
- Vytvořte registraci aplikace a udělte jí oprávnění k databázi. Uložte ID klienta a tajný klíč klienta pro pozdější použití.
Kontrola kódu
Tato část je volitelná. Projděte si následující fragmenty kódu a zjistěte, jak kód funguje. Pokud chcete tuto část přeskočit, přejděte ke spuštění aplikace.
Authentication
Program používá Microsoft Entra přihlašovací údaje ověřování s ConnectionStringBuilder'.
Vytvořte pro
com.microsoft.azure.kusto.data.Client
dotazy a správu.static Client getClient() throws Exception { ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(endpoint, clientID, clientSecret, tenantID); return ClientFactory.createClient(csb); }
Vytvořte a použijte k frontě
com.microsoft.azure.kusto.ingest.IngestClient
příjmu dat do Azure Data Explorer:static IngestClient getIngestionClient() throws Exception { String ingestionEndpoint = "https://ingest-" + URI.create(endpoint).getHost(); ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(ingestionEndpoint, clientID, clientSecret); return IngestClientFactory.createClient(csb); }
Příkazy pro správu
Příkazy pro správu, například .drop
a .create
, se spouští voláním execute
objektu com.microsoft.azure.kusto.data.Client
.
Například tabulka se StormEvents
vytvoří takto:
static final String createTableCommand = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)";
static void createTable(String database) {
try {
getClient().execute(database, createTableCommand);
System.out.println("Table created");
} catch (Exception e) {
System.out.println("Failed to create table: " + e.getMessage());
return;
}
}
Přijímání dat
Příjem fronty pomocí souboru z existujícího kontejneru Azure Blob Storage.
- Použijte
BlobSourceInfo
k určení cesty ke službě Blob Storage. - Slouží
IngestionProperties
k definování tabulky, databáze, názvu mapování a datového typu. V následujícím příkladu jeCSV
datový typ .
...
static final String blobPathFormat = "https://%s.blob.core.windows.net/%s/%s%s";
static final String blobStorageAccountName = "kustosamples";
static final String blobStorageContainer = "samplefiles";
static final String fileName = "StormEvents.csv";
static final String blobStorageToken = ""; //If relevant add SAS token
....
static void ingestFile(String database) throws InterruptedException {
String blobPath = String.format(blobPathFormat, blobStorageAccountName, blobStorageContainer,
fileName, blobStorageToken);
BlobSourceInfo blobSourceInfo = new BlobSourceInfo(blobPath);
IngestionProperties ingestionProperties = new IngestionProperties(database, tableName);
ingestionProperties.setDataFormat(DATA_FORMAT.csv);
ingestionProperties.setIngestionMapping(ingestionMappingRefName, IngestionMappingKind.Csv);
ingestionProperties.setReportLevel(IngestionReportLevel.FailuresAndSuccesses);
ingestionProperties.setReportMethod(IngestionReportMethod.QueueAndTable);
....
Proces příjmu dat začíná v samostatném vlákně main
a vlákno čeká na dokončení vlákna příjmu dat. Tento proces používá CountdownLatch. Rozhraní API pro příjem dat (IngestClient#ingestFromBlob
) není asynchronní. Smyčka while
se používá k dotazování aktuálního stavu každých 5 sekund a čeká, až se stav příjmu dat změní Pending
na jiný. Konečný stav může být Succeeded
, Failed
nebo PartiallySucceeded
.
....
CountDownLatch ingestionLatch = new CountDownLatch(1);
new Thread(new Runnable() {
@Override
public void run() {
IngestionResult result = null;
try {
result = getIngestionClient().ingestFromBlob(blobSourceInfo, ingestionProperties);
} catch (Exception e) {
ingestionLatch.countDown();
}
try {
IngestionStatus status = result.getIngestionStatusCollection().get(0);
while (status.status == OperationStatus.Pending) {
Thread.sleep(5000);
status = result.getIngestionStatusCollection().get(0);
}
ingestionLatch.countDown();
} catch (Exception e) {
ingestionLatch.countDown();
}
}
}).start();
ingestionLatch.await();
}
Tip
Existují i jiné metody, které zpracovávají příjem asynchronně pro různé aplikace. Můžete například vytvořit CompletableFuture
kanál definující akci po příjmu dat, jako je dotaz na tabulku nebo zpracování výjimek, které byly nahlášeny do IngestionStatus
.
Spuštění aplikace
Obecné
Při spuštění ukázkového kódu se provedou následující akce:
-
Rozevírací tabulka:
StormEvents
Tabulka se zahodí (pokud existuje). -
Vytvoření tabulky:
StormEvents
Vytvoří se tabulka. -
Vytvoření mapování:
StormEvents_CSV_Mapping
Vytvoří se mapování. - Příjem souborů: Soubor CSV (v Azure Blob Storage) je zařazen do fronty pro příjem dat.
Následující ukázkový kód je z App.java
:
public static void main(final String[] args) throws Exception {
dropTable(database);
createTable(database);
createMapping(database);
ingestFile(database);
}
Tip
Pokud chcete vyzkoušet různé kombinace operací, odkomentujte nebo okomentujte příslušné metody v nástroji App.java
.
Spuštění aplikace
Naklonujte ukázkový kód z GitHubu:
git clone https://github.com/Azure-Samples/azure-data-explorer-java-sdk-ingest.git cd azure-data-explorer-java-sdk-ingest
Nastavte informace o instančním objektu s následujícími informacemi jako proměnné prostředí používané programem:
- Koncový bod clusteru
- Název databáze
export AZURE_SP_CLIENT_ID="<replace with appID>" export AZURE_SP_CLIENT_SECRET="<replace with password>" export KUSTO_ENDPOINT="https://<cluster name>.<azure region>.kusto.windows.net" export KUSTO_DB="name of the database"
Sestavte a spusťte:
mvn clean package java -jar target/adx-java-ingest-jar-with-dependencies.jar
Výstup bude podobný následujícímu:
Table dropped Table created Mapping created Waiting for ingestion to complete...
Počkejte několik minut, než se proces příjmu dat dokončí. Po úspěšném dokončení se zobrazí následující zpráva protokolu: Ingestion completed successfully
. V tomto okamžiku můžete program ukončit a přejít k dalšímu kroku, aniž by to mělo vliv na proces příjmu dat, který už byl zařazen do fronty.
Ověření
Počkejte pět až 10 minut, než příjem dat ve frontě naplánuje proces příjmu dat a načte data do Azure Data Explorer.
Přihlaste se k https://dataexplorer.azure.com a připojte se k vašemu clusteru.
Spuštěním následujícího příkazu získejte počet záznamů v tabulce
StormEvents
:StormEvents | count
Řešení potíží
Pokud chcete zobrazit chyby příjmu dat za poslední čtyři hodiny, spusťte ve své databázi následující příkaz:
.show ingestion failures | where FailedOn > ago(4h) and Database == "<DatabaseName>"
Pokud chcete zobrazit stav všech operací příjmu dat za poslední čtyři hodiny, spusťte následující příkaz:
.show operations | where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull" | summarize arg_max(LastUpdatedOn, *) by OperationId
Vyčištění prostředků
Pokud nemáte v úmyslu používat vytvořené prostředky, spusťte v databázi následující příkaz, kterým tabulku vyřadíte StormEvents
.
.drop table StormEvents