Az Azure Data Explorer és az Apache Flink® integrációja
Az Azure Data Explorer egy teljes mértékben felügyelt, nagy teljesítményű big data-elemzési platform, amely megkönnyíti a nagy mennyiségű adat közel valós idejű elemzését.
Az ADX segít a felhasználóknak a streamelési alkalmazásokból, webhelyekről, IoT-eszközökről stb. származó nagy mennyiségű adat elemzésében. Az Apache Flink ADX-sel való integrálásával valós idejű adatokat dolgozhat fel és elemezhet az ADX-ben.
Előfeltételek
Az Azure Data Explorer fogadóként való használatának lépései a Flinkben
Flink-fürt létrehozása.
Szükség szerint hozzon létre ADX-et adatbázissal és táblával.
Adjon hozzá betöltési engedélyeket a felügyelt identitáshoz a Kusto-ban.
.add database <DATABASE_NAME> ingestors ('aadapp=CLIENT_ID_OF_MANAGED_IDENTITY')
Futtasson egy mintaprogramot, amely meghatározza a Kusto-fürt URI-ját (egységes erőforrás-azonosító), a használt adatbázist és felügyelt identitást, valamint azt a táblát, amelyre írnia kell.
Klónozza a flink-connector-kusto projektet: https://github.com/Azure/flink-connector-kusto.git
A tábla létrehozása az ADX-ben a következő paranccsal
.create table CryptoRatesHeartbeatTimeBatch (processing_dttm: datetime, ['type']: string, last_trade_id: string, product_id: string, sequence: long, ['time']: datetime)
Frissítse a FlinkKustoSinkSample.java fájlt a megfelelő Kusto-fürt URI-jával, adatbázisával és a használt felügyelt identitással.
String database = "sdktests"; //ADX database name String msiClientId = “xxxx-xxxx-xxxx”; //Provide the client id of the Managed identity which is linked to the Flink cluster String cluster = "https://trdp-1665b5eybxs0tbett.z8.kusto.fabric.microsoft.com/"; //Data explorer Cluster URI KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder() .setManagedIdentityAppId(msiClientId).setClusterUrl(cluster).build(); String defaultTable = "CryptoRatesHeartbeatTimeBatch"; //Table where the data needs to be written KustoWriteOptions kustoWriteOptionsHeartbeat = KustoWriteOptions.builder() .withDatabase(database).withTable(defaultTable).withBatchIntervalMs(30000)
A projekt későbbi buildelése az "mvn clean package" használatával
Keresse meg a "samples-java-1.0-SNAPSHOT-shaded.jar" nevű JAR-fájlt a "sample-java/target" mappában, majd töltse fel ezt a JAR-fájlt a Flink felhasználói felületén, és küldje el a feladatot.
A Kusto-tábla lekérdezése a kimenet ellenőrzéséhez
Az adatok a Kusto-táblába való írása nem késik a Flinkből.
Reference
- Apache Flink webhely
- Az Apache, az Apache Flink, a Flink és a társított nyílt forráskód projektnevek az Apache Software Foundation (ASF) védjegyei.
Visszajelzés
https://aka.ms/ContentUserFeedback.
Hamarosan elérhető: 2024-ben fokozatosan kivezetjük a GitHub-problémákat a tartalom visszajelzési mechanizmusaként, és lecseréljük egy új visszajelzési rendszerre. További információ:Visszajelzés küldése és megtekintése a következőhöz: