Поделиться через


Интеграция azure Data Обозреватель и Apache Flink®

Azure Data Explorer — это полностью управляемая высокопроизводительная платформа для анализа больших данных, которая позволяет легко анализировать большие объемы данных почти в реальном времени.

ADX помогает пользователям анализировать большие объемы данных из потоковых приложений, веб-сайтов, устройств Интернета вещей и т. д. Интеграция Apache Flink с ADX помогает обрабатывать данные в режиме реального времени и анализировать их в ADX.

Необходимые компоненты

  1. Создание кластера Flink.

  2. Создайте ADX с базой данных и таблицей по мере необходимости.

  3. Добавьте разрешения ingestor для управляемого удостоверения в Kusto.

    .add database <DATABASE_NAME> ingestors  ('aadapp=CLIENT_ID_OF_MANAGED_IDENTITY') 
    
  4. Запустите пример программы, определяющей универсальный код ресурса (универсальный идентификатор ресурса) кластера Kusto, используемую базу данных и управляемое удостоверение, а также таблицу, в которую требуется записать.

  5. Клонируйте проект flink-connector-kusto: https://github.com/Azure/flink-connector-kusto.git

  6. Создание таблицы в ADX с помощью следующей команды

    .create table CryptoRatesHeartbeatTimeBatch (processing_dttm: datetime, ['type']: string, last_trade_id: string, product_id: string, sequence: long, ['time']: datetime) 
    
  7. Обновите файл FlinkKustoSinkSample.java с правильным универсальным кодом ресурса (URI) кластера Kusto, базой данных и используемым управляемым удостоверением.

      String database = "sdktests"; //ADX database name 
    
      String msiClientId = “xxxx-xxxx-xxxx”; //Provide the client id of the Managed identity which is linked to the Flink cluster 
      String cluster = "https://trdp-1665b5eybxs0tbett.z8.kusto.fabric.microsoft.com/"; //Data explorer Cluster URI 
      KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder() 
          .setManagedIdentityAppId(msiClientId).setClusterUrl(cluster).build(); 
      String defaultTable = "CryptoRatesHeartbeatTimeBatch"; //Table where the data needs to be written 
      KustoWriteOptions kustoWriteOptionsHeartbeat = KustoWriteOptions.builder() 
          .withDatabase(database).withTable(defaultTable).withBatchIntervalMs(30000) 
    

    Позже создайте проект с помощью "пакета очистки mvn"

  8. Найдите JAR-файл с именем samples-java-1.0-SNAPSHOT-shaded.jar в папке sample-java/target, а затем отправьте этот JAR-файл в пользовательский интерфейс Flink и отправьте задание.

  9. Запросите таблицу Kusto, чтобы проверить выходные данные

    screenshot shows query the Kusto table to verify the output.

    Нет задержки при записи данных в таблицу Kusto из Flink.

Ссылка