Соединитель Apache Spark: SQL Server и Azure SQL

Соединитель Apache Spark для SQL Server и SQL Azure — это высокопроизводительный соединитель, позволяющий использовать транзакционные данные в аналитике больших данных и сохранять результаты для нерегламентированных запросов или отчетов. Соединитель позволяет использовать любую базу данных SQL, локальную или облачную, в качестве источника входных данных или приемника выходных данных для заданий Spark.

Эта библиотека содержит исходный код соединителя Apache Spark для SQL Server и Azure SQL.

Apache Spark — это единый аналитический механизм для крупномасштабной обработки данных.

В Maven доступны две версии соединителя: совместимая с версией 2.4.x и совместимая с версией 3.0.x. Обе версии можно найти здесь и импортировать с помощью приведенных ниже координат.

Соединитель Координата Maven
Соединитель, совместимый со Spark 2.4.x com.microsoft.azure:spark-mssql-connector:1.0.2
Соединитель, совместимый со Spark 3.0.x com.microsoft.azure:spark-mssql-connector_2.12:1.1.0
Соединитель, совместимый со Spark 3.1.x com.microsoft.azure:spark-mssql-connector_2.12:1.2.0

Вы также можете создать соединитель на основе источника данных или скачать JAR-файл из раздела выпуска на GitHub. Последние сведения о соединителе см. в репозитории GitHub для соединителя Spark SQL.

Поддерживаемые возможности

  • Поддержка всех привязок Spark (Scala, Python, R).
  • Поддержка обычной проверки подлинности и вкладки ключа Active Directory (AD).
  • Поддержка для операции записи переупорядоченного dataframe.
  • Поддержка операции записи в единственном экземпляре SQL Server и пуле данных в Кластерах больших данных SQL Server.
  • Поддержка надежного соединителя для единственного экземпляра SQL Server.
Компонент Поддерживаемые версии
Apache Spark 2.4.x, 3.0.x, 3.1.x
Scala 2.11, 2.12
Драйвер Microsoft JDBC для SQL Server 8.4
Microsoft SQL Server SQL Server 2008 или более поздняя версия
Базы данных SQL Azure Поддерживается

Поддерживаемые параметры

Подключение or Apache Spark для SQL Server и Azure SQL поддерживает параметры, определенные здесь: SQL DataSource JDBC

Кроме того, поддерживаются указанные ниже параметры

Параметр По умолчанию. Description
reliabilityLevel BEST_EFFORT BEST_EFFORT или NO_DUPLICATES. NO_DUPLICATES реализует надежную операцию вставки в сценариях перезапуска исполнителя
dataPoolDataSource none none означает, что значение не задано и соединитель должен записывать данные в один экземпляр SQL Server. Присвойте этому параметру имя источника данных для записи в таблицу пула данных в кластере больших данных
isolationLevel READ_COMMITTED Указание уровня изоляции
tableLock false Реализует операцию вставки с параметром TABLOCK для повышения производительности записи.
schemaCheckEnabled true Отключает строгие проверки кадра данных и схемы таблицы SQL, если установлено значение false

Другие параметры массового копирования могут быть заданы в качестве параметров для dataframe и передаваться в API bulkcopy при записи.

Сравнение производительности

Соединитель Apache Spark для SQL Server и Azure SQL выполняет операции в 15 раз быстрее, чем универсальный соединитель JDBC для записи в SQL Server. Характеристики производительности зависят от типа и объема данных, используемых параметров и могут варьироваться при запусках. Следующие результаты производительности отображают время, затрачиваемое на перезапись таблицы SQL с 143,9 млн строк в dataframe Spark. dataframe Spark создается путем считывания таблицы HDFS store_sales, созданной с помощью теста производительности TPCDS Spark. Время на считывание store_sales в dataframe исключено. Результаты представляют собой усредненное значение по трем запускам.

Тип соединителя Параметры Description Время записи
JDBCConnector По умолчанию Универсальный соединитель JDBC с параметрами по умолчанию 1385 секунд
sql-spark-connector BEST_EFFORT Наилучший вариант sql-spark-connector с параметрами по умолчанию 580 секунд
sql-spark-connector NO_DUPLICATES Надежный sql-spark-connector 709 секунд
sql-spark-connector BEST_EFFORT + tabLock=true Наилучший вариант sql-spark-connector с включенной блокировкой таблицы 72 секунды
sql-spark-connector NO_DUPLICATES  + tabLock=true Надежный sql-spark-connector с включенной блокировкой таблицы 198 секунд

Config

  • Конфигурация Spark: num_executors = 20, executor_memory = 1664 m, executor_cores = 2.
  • Конфигурация создания данных: scale_factor=50, partitioned_tables=true.
  • Файл данных store_sales с 143 997 590 строками.

Среда

Часто встречающиеся проблемы

java.lang.NoClassDefFoundError: com/microsoft/aad/adal4j/AuthenticationException

Эта проблема возникает из-за использования старой версии драйвера MSSQL (который теперь включен в этот соединитель) в среде Hadoop. Если вы используете предыдущую версию Подключение SQL Azure и вручную установили драйверы в этом кластере для обеспечения совместимости проверки подлинности Microsoft Entra, необходимо удалить эти драйверы.

Действия по устранению проблемы:

  1. Если вы используете универсальную среду Hadoop, проверьте и удалите JAR-файл MSSQL: rm $HADOOP_HOME/share/hadoop/yarn/lib/mssql-jdbc-6.2.1.jre7.jar. Если вы используете Databricks, добавьте глобальный скрипт или скрипт инициализации кластера, чтобы удалить старые версии драйвера MSSQL из папки /databricks/jars, или вставьте следующую строку в существующий сценарий: rm /databricks/jars/*mssql*.

  2. Добавьте пакеты adal4j и mssql. Например, можно использовать Maven, но должен подойти любой вариант.

    Внимание

    Не устанавливайте соединитель SQL Spark таким образом.

  3. Добавьте класс драйвера в конфигурацию подключения. Например:

    connectionProperties = {
      `Driver`: `com.microsoft.sqlserver.jdbc.SQLServerDriver`
    }`
    

Дополнительные сведения и описание способов устранения см. здесь: https://github.com/microsoft/sql-spark-connector/issues/26.

Начать

Соединитель Apache Spark для SQL Server и Azure SQL основан на API Spark DataSourceV1 и API массовых операций SQL Server. Он использует тот же интерфейс, что и встроенный соединитель Spark-SQL JDBC. Такая интеграция позволяет легко интегрировать соединитель и перенести существующие задания Spark, просто обновив параметр формата с помощью com.microsoft.sqlserver.jdbc.spark.

Чтобы включить соединитель в проекты, скачайте этот репозиторий и создайте JAR-файл с помощью SBT.

Запись в новую таблицу SQL

Предупреждение

Режим overwrite сначала удаляет таблицу, если она уже существует в базе данных по умолчанию. Используйте этот параметр с осторожностью, чтобы избежать непредвиденных потерь данных.

При использовании режима overwrite, если для воссоздания таблицы не используется параметр truncate, индексы будут потеряны. Таблица columnstore будет кучей. Если вы хотите сохранить существующее индексирование, задайте для параметра truncate значение true. Например, .option("truncate","true").

server_name = "jdbc:sqlserver://{SERVER_ADDR}"
database_name = "database_name"
url = server_name + ";" + "databaseName=" + database_name + ";"

table_name = "table_name"
username = "username"
password = "password123!#" # Please specify password here

try:
  df.write \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .mode("overwrite") \
    .option("url", url) \
    .option("dbtable", table_name) \
    .option("user", username) \
    .option("password", password) \
    .save()
except ValueError as error :
    print("Connector write failed", error)

Добавление в таблицу SQL

try:
  df.write \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .mode("append") \
    .option("url", url) \
    .option("dbtable", table_name) \
    .option("user", username) \
    .option("password", password) \
    .save()
except ValueError as error :
    print("Connector write failed", error)

Указание уровня изоляции

По умолчанию этот соединитель использует уровень изоляции READ_COMMITTED при выполнении массовой операции вставки в базу данных. Если вы хотите переопределить уровень изоляции, используйте параметр mssqlIsolationLevel, как показано ниже.

    .option("mssqlIsolationLevel", "READ_UNCOMMITTED") \

Чтение из таблицы SQL

jdbcDF = spark.read \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .option("url", url) \
        .option("dbtable", table_name) \
        .option("user", username) \
        .option("password", password).load()

Проверка подлинности Microsoft Entra

Пример Python с субъектом-службой

context = adal.AuthenticationContext(authority)
token = context.acquire_token_with_client_credentials(resource_app_id_url, service_principal_id, service_principal_secret)
access_token = token["accessToken"]

jdbc_db = spark.read \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .option("url", url) \
        .option("dbtable", table_name) \
        .option("accessToken", access_token) \
        .option("encrypt", "true") \
        .option("hostNameInCertificate", "*.database.windows.net") \
        .load()

Пример Python с паролем Active Directory

jdbc_df = spark.read \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .option("url", url) \
        .option("dbtable", table_name) \
        .option("authentication", "ActiveDirectoryPassword") \
        .option("user", user_name) \
        .option("password", password) \
        .option("encrypt", "true") \
        .option("hostNameInCertificate", "*.database.windows.net") \
        .load()

Для проверки подлинности с помощью Active Directory необходимо установить требуемую зависимость.

Формат user, когда используется ActiveDirectoryPassword, должен быть форматом имени участника-пользователя, например username@domainname.com.

Для Scala необходимо установить артефакт _com.microsoft.aad.adal4j_.

Для Python необходимо установить библиотеку _adal_. Это можно сделать с помощью PIP.

Примеры записных книжек.

Поддержка

Соединитель Apache Spark для Azure SQL и SQL Server является проектом с открытым кодом. Корпорация Майкрософт не предоставляет поддержку для этого проекта. Чтобы устранить проблемы с соединителем или если у вас возникли вопросы, создайте запрос в этом репозитории проекта. Сообщество по вопросам соединителей активно отслеживает запросы.

Следующие шаги

Посетите репозиторий GitHub для соединителя SQL Spark.

Сведения об уровнях изоляции см. в разделе SET TRANSACTION ISOLATION LEVEL (Transact-SQL).