Поделиться через


Соединитель Azure Data Explorer для Apache Spark

Apache Spark — это единый аналитический механизм для крупномасштабной обработки данных. Azure Data Explorer — это быстрая и полностью управляемая служба для аналитики большого объема потоковых данных в реальном времени.

Соединитель Kusto для Spark — это проект открытый код, который может выполняться в любом кластере Spark. Он реализует источник и приемник данных для перемещения данных между кластерами Azure Data Explorer и Spark. Используя Azure Data Explorer и Apache Spark, вы можете создавать быстрые и масштабируемые приложения, ориентированные на сценарии, основанные на данных. Например, машинное обучение (ML), извлечение-преобразование-загрузка (ETL) и Log Analytics. С помощью соединителя Azure Data Explorer становится допустимым хранилищем данных для стандартных операций источника и приемника Spark, таких как запись, чтение и writeStream.

Вы можете записать в Azure Data Explorer с помощью приема в очереди или потоковой передачи. Чтение из Azure Data Explorer поддерживает обрезку столбцов и раскрытие предикатов, которые фильтруют данные в обозревателе данных Azure, уменьшая объем передаваемых данных.

Примечание.

Сведения о работе с соединителем Synapse Spark для Azure Data Explorer см. в статье Подключение к Azure Data Explorer с помощью Apache Spark для Azure Synapse Analytics.

В этом разделе описывается, как установить и настроить соединитель Spark Azure Data Explorer и перемещать данные между кластерами обозревателя данных Azure и Apache Spark.

Примечание.

Хотя некоторые из приведенных ниже примеров относятся к кластеру Azure Databricks Spark, соединитель Azure Data Explorer Spark не имеет прямых зависимостей от Databricks или любого другого дистрибутива Spark.

Необходимые компоненты

Совет

Версии Spark 2.3.x также поддерживаются, но могут потребоваться некоторые изменения в зависимостях pom.xml.

Как собрать соединитель Spark

Начиная с версии 2.3.0 мы представляем новые идентификаторы артефактов, заменяющие spark-kusto-connector: kusto-spark_3.0_2.12 , предназначенные для Spark 3.x и Scala 2.12.

Примечание.

Версии до 2.5.1 больше не работают для вставки в существующую таблицу, пожалуйста, обновите до более поздней версии. Этот шаг необязательный. Если вы используете готовые библиотеки, например Maven, см. раздел Настройка кластера Spark.

Предварительные требования к сборке

  1. Обратитесь к этому источнику для создания Spark Connector.

  2. Для приложений Scala/Java с помощью определений проекта Maven свяжите приложение с последним артефактом. Найдите последний артефакт на Maven Central.

    For more information, see [https://mvnrepository.com/artifact/com.microsoft.azure.kusto/kusto-spark_3.0_2.12](https://mvnrepository.com/artifact/com.microsoft.azure.kusto/kusto-spark_3.0_2.12).
    
    
  3. Если вы не используете предварительно созданные библиотеки, необходимо установить библиотеки, перечисленные в зависимостях , включая следующие библиотеки пакета SDK Для Java Kusto. Чтобы найти нужную версию для установки, загляните в pom соответствующего выпуска.

    1. Для создания JAR-файла и выполнения всех тестов выполните команду:

      mvn clean package -DskipTests
      
    2. Чтобы собрать jar, запустите все тесты и установите jar в локальный репозиторий Maven:

      mvn clean install -DskipTests
      

Для получения дополнительной информации см. использование соединителя.

Настройка кластера Spark

Примечание.

Рекомендуется использовать последний выпуск соединителя Kusto Spark при выполнении следующих действий.

  1. Настройте следующие параметры кластера Spark на основе кластера Azure Databricks Spark 3.0.1 и Scala 2.12:

    Настройки кластера Databricks.

  2. Установите последнюю версию библиотеки spark-kusto-connector от Maven.

    Импорт библиотек.Выберите Spark-Kusto-Connector.

  3. Убедитесь, что установлены все необходимые библиотеки:

    Убедитесь, что библиотеки установлены.

  4. Для установки с помощью JAR-файла убедитесь, что установлены другие зависимости:

    Добавьте зависимости.

Проверка подлинности

Соединитель Kusto Spark позволяет выполнять проверку подлинности с помощью идентификатора Microsoft Entra с помощью одного из следующих методов:

Проверка подлинности приложения Microsoft Entra

Проверка подлинности приложения Microsoft Entra является самым простым и наиболее распространенным методом проверки подлинности и рекомендуется для соединителя Kusto Spark.

  1. Войдите в подписку Azure с помощью Azure CLI. Затем авторизуйтесь в браузере.

    az login
    
  2. Выберите подписку для размещения субъекта. Этот шаг необходим, если у вас несколько подписок.

    az account set --subscription YOUR_SUBSCRIPTION_GUID
    
  3. Создайте субъект-службу. В этом примере принципал службы называется my-service-principal.

    az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
    
  4. Из возвращаемых данных JSON скопируйте appIdpasswordданные и tenant для дальнейшего использования.

    {
      "appId": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn",
      "displayName": "my-service-principal",
      "name": "my-service-principal",
      "password": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn",
      "tenant": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn"
    }
    

Вы создали приложение Microsoft Entra и субъект-службу.

Соединитель Spark использует следующие свойства приложения Entra для проверки подлинности:

Свойства Строка параметра Description
KUSTO_AAD_APP_ID kustoAadAppId Идентификатор приложения Microsoft Entra (клиент).
KUSTO_AAD_AUTHORITY_ID kustoAadAuthorityID Центр проверки подлинности Microsoft Entra. Идентификатор каталога Microsoft Entra (клиента). Необязательный — по умолчанию используется microsoft.com. Дополнительные сведения см. в центре Microsoft Entra.
KUSTO_AAD_APP_SECRET kustoAadAppSecret Ключ приложения Microsoft Entra для клиента.
KUSTO_ACCESS_TOKEN kustoAccessToken Если у вас уже есть accessToken, созданный с доступом к Kusto, его можно использовать для проверки подлинности.

Примечание.

Более старые версии API (менее 2.0.0) имеют следующие наименования: kustoAADClientID, kustoClientAADClientPassword, kustoAADAuthorityID

Привилегии Kusto

Предоставьте следующие привилегии на стороне kusto на основе операции Spark, которую вы хотите выполнить.

Операция Spark Привилегии
Чтение — один режим Читатель
Чтение — принудительно распределенный режим Читатель
Запись — режим очереди с параметром создания таблицы CreateTableIfNotExist Административный
Запись — режим очереди с параметром создания таблицы FailIfNotExist Ingestor
Запись — TransactionalMode Административный

Дополнительные сведения об основных ролях см. в разделе "Управление доступом на основе ролей". Для управления ролями безопасности см. Управление ролями безопасности.

Приемник Spark: запись в Kusto

  1. Настроить параметры приемника:

    val KustoSparkTestAppId = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppId")
    val KustoSparkTestAppKey = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppKey")
    
    val appId = KustoSparkTestAppId
    val appKey = KustoSparkTestAppKey
    val authorityId = "72f988bf-86f1-41af-91ab-2d7cd011db47" // Optional - defaults to microsoft.com
    val cluster = "Sparktest.eastus2"
    val database = "TestDb"
    val table = "StringAndIntTable"
    
  2. Запись кадра данных Spark в кластер Kusto в виде пакета:

    import com.microsoft.kusto.spark.datasink.KustoSinkOptions
    import org.apache.spark.sql.{SaveMode, SparkSession}
    
    df.write
      .format("com.microsoft.kusto.spark.datasource")
      .option(KustoSinkOptions.KUSTO_CLUSTER, cluster)
      .option(KustoSinkOptions.KUSTO_DATABASE, database)
      .option(KustoSinkOptions.KUSTO_TABLE, "Demo3_spark")
      .option(KustoSinkOptions.KUSTO_AAD_APP_ID, appId)
      .option(KustoSinkOptions.KUSTO_AAD_APP_SECRET, appKey)
      .option(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, authorityId)
      .option(KustoSinkOptions.KUSTO_TABLE_CREATE_OPTIONS, "CreateIfNotExist")
      .mode(SaveMode.Append)
      .save()  
    

    Или используйте упрощенный синтаксис:

    import com.microsoft.kusto.spark.datasink.SparkIngestionProperties
    import com.microsoft.kusto.spark.sql.extension.SparkExtension._
    
    val sparkIngestionProperties = Some(new SparkIngestionProperties()) // Optional, use None if not needed
    df.write.kusto(cluster, database, table, conf, sparkIngestionProperties)
    
  3. Запись потоковых данных:

    import org.apache.spark.sql.streaming.Trigger
    import java.util.concurrent.TimeUnit
    import java.util.concurrent.TimeUnit
    import org.apache.spark.sql.streaming.Trigger
    
    // Set up a checkpoint and disable codeGen. 
    spark.conf.set("spark.sql.streaming.checkpointLocation", "/FileStore/temp/checkpoint")
    
    // Write to a Kusto table from a streaming source
    val kustoQ = df
      .writeStream
      .format("com.microsoft.kusto.spark.datasink.KustoSinkProvider")
      .options(conf) 
      .trigger(Trigger.ProcessingTime(TimeUnit.SECONDS.toMillis(10))) // Sync this with the ingestionBatching policy of the database
      .start()
    

Источник Spark: чтение из Kusto

  1. При чтении небольших объемов данных определите запрос данных:

    import com.microsoft.kusto.spark.datasource.KustoSourceOptions
    import org.apache.spark.SparkConf
    import org.apache.spark.sql._
    import com.microsoft.azure.kusto.data.ClientRequestProperties
    
    val query = s"$table | where (ColB % 1000 == 0) | distinct ColA"
    val conf: Map[String, String] = Map(
          KustoSourceOptions.KUSTO_AAD_APP_ID -> appId,
          KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey
        )
    
    val df = spark.read.format("com.microsoft.kusto.spark.datasource").
      options(conf).
      option(KustoSourceOptions.KUSTO_QUERY, query).
      option(KustoSourceOptions.KUSTO_DATABASE, database).
      option(KustoSourceOptions.KUSTO_CLUSTER, cluster).
      load()
    
    // Simplified syntax flavor
    import com.microsoft.kusto.spark.sql.extension.SparkExtension._
    
    val cpr: Option[ClientRequestProperties] = None // Optional
    val df2 = spark.read.kusto(cluster, database, query, conf, cpr)
    display(df2)
    
  2. Необязательно. Если вы предоставляете временное хранилище BLOB-объектов (а не Kusto), большие двоичные объекты создаются под ответственностью вызывающего объекта. Это включает в себя предоставление хранилища, ротацию ключей доступа и удаление временных артефактов. Модуль KustoBlobStorageUtils содержит вспомогательные функции для удаления больших двоичных объектов на основе координат учетной записи и контейнера и учетных данных либо полного URL-адреса SAS с разрешениями на запись, чтение и список. Когда соответствующий RDD больше не нужен, каждая транзакция сохраняет временные артефакты больших двоичных объектов в отдельном каталоге. Этот каталог записывается как часть журналов с информацией о транзакциях чтения, передаваемых на узле Spark Driver.

    // Use either container/account-key/account name, or container SaS
    val container = dbutils.secrets.get(scope = "KustoDemos", key = "blobContainer")
    val storageAccountKey = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountKey")
    val storageAccountName = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountName")
    // val storageSas = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageSasUrl")
    

    В приведенном выше примере доступ к Key Vault через интерфейс соединителя отсутствует; используется более простой метод использования секретов Databricks.

  3. Чтение из Kusto.

    • Если вы предоставляете временное хранилище BLOB-объектов, читайте из Kusto следующим образом:

       val conf3 = Map(
            KustoSourceOptions.KUSTO_AAD_APP_ID -> appId,
            KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey
            KustoSourceOptions.KUSTO_BLOB_STORAGE_SAS_URL -> storageSas)
      val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3)
      
      val dfFiltered = df2
        .where(df2.col("ColA").startsWith("row-2"))
        .filter("ColB > 12")
        .filter("ColB <= 21")
        .select("ColA")
      
      display(dfFiltered)
      
    • Если Kusto предоставляет временное хранилище BLOB-объектов, считывается из Kusto следующим образом:

      val conf3 = Map(
        KustoSourceOptions.KUSTO_AAD_CLIENT_ID -> appId,
        KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey)
      val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3)
      
      val dfFiltered = df2
        .where(df2.col("ColA").startsWith("row-2"))
        .filter("ColB > 12")
        .filter("ColB <= 21")
        .select("ColA")
      
      display(dfFiltered)