Megosztás a következőn keresztül:


Az Azure Data Explorer és az Apache Flink® integrációja

Az Azure Data Explorer egy teljes mértékben felügyelt, nagy teljesítményű big data-elemzési platform, amely megkönnyíti a nagy mennyiségű adat közel valós idejű elemzését.

Az ADX segít a felhasználóknak a streamelési alkalmazásokból, webhelyekről, IoT-eszközökről stb. származó nagy mennyiségű adat elemzésében. Az Apache Flink ADX-sel való integrálásával valós idejű adatokat dolgozhat fel és elemezhet az ADX-ben.

Előfeltételek

  1. Flink-fürt létrehozása.

  2. Szükség szerint hozzon létre ADX-et adatbázissal és táblával.

  3. Adjon hozzá betöltési engedélyeket a felügyelt identitáshoz a Kusto-ban.

    .add database <DATABASE_NAME> ingestors  ('aadapp=CLIENT_ID_OF_MANAGED_IDENTITY') 
    
  4. Futtasson egy mintaprogramot, amely meghatározza a Kusto-fürt URI-ját (egységes erőforrás-azonosító), a használt adatbázist és felügyelt identitást, valamint azt a táblát, amelyre írnia kell.

  5. Klónozza a flink-connector-kusto projektet: https://github.com/Azure/flink-connector-kusto.git

  6. A tábla létrehozása az ADX-ben a következő paranccsal

    .create table CryptoRatesHeartbeatTimeBatch (processing_dttm: datetime, ['type']: string, last_trade_id: string, product_id: string, sequence: long, ['time']: datetime) 
    
  7. Frissítse a FlinkKustoSinkSample.java fájlt a megfelelő Kusto-fürt URI-jával, adatbázisával és a használt felügyelt identitással.

      String database = "sdktests"; //ADX database name 
    
      String msiClientId = “xxxx-xxxx-xxxx”; //Provide the client id of the Managed identity which is linked to the Flink cluster 
      String cluster = "https://trdp-1665b5eybxs0tbett.z8.kusto.fabric.microsoft.com/"; //Data explorer Cluster URI 
      KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder() 
          .setManagedIdentityAppId(msiClientId).setClusterUrl(cluster).build(); 
      String defaultTable = "CryptoRatesHeartbeatTimeBatch"; //Table where the data needs to be written 
      KustoWriteOptions kustoWriteOptionsHeartbeat = KustoWriteOptions.builder() 
          .withDatabase(database).withTable(defaultTable).withBatchIntervalMs(30000) 
    

    A projekt későbbi buildelése az "mvn clean package" használatával

  8. Keresse meg a "samples-java-1.0-SNAPSHOT-shaded.jar" nevű JAR-fájlt a "sample-java/target" mappában, majd töltse fel ezt a JAR-fájlt a Flink felhasználói felületén, és küldje el a feladatot.

  9. A Kusto-tábla lekérdezése a kimenet ellenőrzéséhez

    screenshot shows query the Kusto table to verify the output.

    Az adatok a Kusto-táblába való írása nem késik a Flinkből.

Reference