Megosztás a következőn keresztül:


Adatok betöltése az Apache Flink használatával az Azure Data Explorer

Az Apache Flink egy keretrendszer és elosztott feldolgozási motor, amely az állapotalapú számításokhoz kötetlen és határolókeretes adatfolyamokon keresztül használható.

Az Flink-összekötő egy nyílt forráskód projekt, amely bármely Flink-fürtön futtatható. Adatgyűjtőt implementál az adatok Flink-fürtből való áthelyezéséhez. Az Apache Flink összekötőjével olyan gyors és méretezhető alkalmazásokat hozhat létre, amelyek adatvezérelt forgatókönyveket céloznak meg, például gépi tanulást (ML), extract-Transform-Load (ETL) és Log Analyticset.

Ebből a cikkből megtudhatja, hogyan küldhet adatokat a Flinkből a táblába az Flink-összekötő használatával. Létre kell hoznia egy táblát és egy adatleképezést, közvetlen Flinket kell létrehoznia, hogy adatokat küldjön a táblába, majd ellenőrizze az eredményeket.

Előfeltételek

A Mavent függőségek kezelésére használó Flink-projektek esetén integrálja az Azure-Data Explorer Flink Connector Core-fogadóját függőségként hozzáadva:

<dependency>
    <groupId>com.microsoft.azure.kusto</groupId>
    <artifactId>flink-connector-kusto</artifactId>
    <version>1.0.0</version>
</dependency>

Az olyan projektek esetében, amelyek nem használják a Mavent a függőségek kezelésére, klónozza az Azure Data Explorer Connector for Apache Flink adattárát, és hozza létre helyileg. Ez a módszer lehetővé teszi, hogy manuálisan adja hozzá az összekötőt a helyi Maven-adattárhoz a paranccsal mvn clean install -DskipTests.

Az Flinkből hitelesítést végezhet egy Microsoft Entra ID alkalmazás vagy egy felügyelt identitás használatával.

Ez a szolgáltatásnév lesz az összekötő által használt identitás, amellyel adatokat írhat a táblában a Kusto-ban. Később engedélyeket fog adni a szolgáltatásnévnek a Kusto-erőforrások eléréséhez.

  1. Jelentkezzen be az Azure-előfizetésbe az Azure CLI-vel. Ezután végezze el a hitelesítést a böngészőben.

    az login
    
  2. Válassza ki az előfizetést a rendszerbiztonsági tag üzemeltetéséhez. Erre a lépésre akkor van szükség, ha több előfizetéssel rendelkezik.

    az account set --subscription YOUR_SUBSCRIPTION_GUID
    
  3. Hozza létre a szolgáltatásnevet. Ebben a példában a szolgáltatásnév neve my-service-principal.

    az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
    
  4. A visszaadott JSON-adatokból másolja ki a appId, passwordés tenant értéket későbbi használatra.

    {
      "appId": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn",
      "displayName": "my-service-principal",
      "name": "my-service-principal",
      "password": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn",
      "tenant": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn"
    }
    

Létrehozta a Microsoft Entra alkalmazást és szolgáltatásnevet.

  1. Adja meg az alkalmazás felhasználói engedélyeit az adatbázishoz:

    // Grant database user permissions
    .add database <MyDatabase> users ('aadapp=<Application ID>;<Tenant ID>')
    
  2. Adjon az alkalmazásnak adatbetöltési vagy rendszergazdai engedélyeket a táblán. A szükséges engedélyek a választott adatírási módszertől függenek. A Betöltési engedélyek elegendőek a SinkV2 számára, míg a WriteAndSink rendszergazdai engedélyeket igényel.

    // Grant table ingestor permissions (SinkV2)
    .add table <MyTable> ingestors ('aadapp=<Application ID>;<Tenant ID>')
    
    // Grant table admin permissions (WriteAheadSink)
    .add table <MyTable> admins ('aadapp=<Application ID>;<Tenant ID>')
    

További információ az engedélyezésről: Kusto szerepköralapú hozzáférés-vezérlés.

Adatok írása flinkből:

  1. Importálja a szükséges beállításokat:

    import com.microsoft.azure.flink.config.KustoConnectionOptions;
    import com.microsoft.azure.flink.config.KustoWriteOptions;
    
  2. A hitelesítéshez használja az alkalmazást vagy a felügyelt identitást.

    Alkalmazáshitelesítéshez:

    KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
    .setAppId("<Application ID>")
    .setAppKey("<Application key>")
    .setTenantId("<Tenant ID>")
    .setClusterUrl("<Cluster URI>").build();
    

    Felügyelt identitás hitelesítéséhez:

    KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
    .setManagedIdentityAppId("<Object ID>")
    .setClusterUrl("<Cluster URI>").build();
    
  1. Konfigurálja a fogadó paramétereit, például az adatbázist és a táblát:

    KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder()
        .withDatabase("<Database name>").withTable("<Table name>").build();
    

    További lehetőségeket az alábbi táblázatban leírtak szerint adhat hozzá:

    Beállítás Leírás Alapértelmezett érték
    IngestionMappingRef Meglévő betöltési leképezésre hivatkozik.
    FlushImmediately Azonnal kiüríti az adatokat, és teljesítményproblémákat okozhat. Ez a módszer nem ajánlott.
    BatchIntervalMs Az adatok kiürítési gyakoriságát szabályozza. 30 másodperc
    BatchSize Beállítja a rekordok kiürítése előtti pufferelésének kötegméretét. 1000 rekord
    ClientBatchSizeLimit Az összesített adatok mb-ban megadott méretét adja meg a betöltés előtt. 300 MB
    PollForIngestionStatus Ha igaz, az összekötő lekérdezi a betöltési állapotot az adatok kiürítése után. hamis
    DeliveryGuarantee Meghatározza a kézbesítési garancia szemantikáját. Ha pontosan egyszer szeretne szemantikát elérni, használja a WriteAheadSink parancsot. AT_LEAST_ONCE
  2. Streamelési adatok írása az alábbi módszerek egyikével:

    • SinkV2: Ez egy állapot nélküli lehetőség, amely kiüríti az adatokat az ellenőrzőponton, és legalább egyszer konzisztenciát biztosít. Ezt a lehetőséget nagy mennyiségű adatbetöltéshez javasoljuk.
    • WriteAheadSink: Ez a metódus adatokat bocsát ki egy KustoSink számára. Integrálva van a Flink ellenőrzőpont-rendszerével, és pontosan egyszeri garanciát kínál. Az adatok tárolása egy AbstractStateBackendben történik, és csak egy ellenőrzőpont befejezése után lesz véglegesítve.

    Az alábbi példa a SinkV2 függvényt használja. A WriteAheadSink használatához használja a metódust a buildWriteAheadSink helyett build:

    KustoWriteSink.builder().setWriteOptions(kustoWriteOptions)
        .setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/
        , 2 /*Parallelism to use*/);
    

A teljes kódnak így kell kinéznie:

import com.microsoft.azure.flink.config.KustoConnectionOptions;
import com.microsoft.azure.flink.config.KustoWriteOptions;

KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
.setAppId("<Application ID>")
.setAppKey("<Application key>")
.setTenantId("<Tenant ID>")
.setClusterUrl("<Cluster URI>").build();

KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder()
    .withDatabase("<Database name>").withTable("<Table name>").build();

KustoWriteSink.builder().setWriteOptions(kustoWriteOptions)
    .setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/
    , 2 /*Parallelism to use*/);

Az adatok betöltésének ellenőrzése

A kapcsolat konfigurálása után a rendszer adatokat küld a táblába. Az adatok betöltését KQL-lekérdezés futtatásával ellenőrizheti.

  1. Futtassa a következő lekérdezést annak ellenőrzéséhez, hogy az adatok be lesznek-e osztva a táblába:

    <MyTable>
    | count
    
  2. Az adatok megtekintéséhez futtassa a következő lekérdezést:

    <MyTable>
    | take 100