Соединитель Apache Spark: SQL Server и Azure SQL
Соединитель Apache Spark для SQL Server и SQL Azure — это высокопроизводительный соединитель, позволяющий использовать транзакционные данные в аналитике больших данных и сохранять результаты для нерегламентированных запросов или отчетов. Соединитель позволяет использовать любую базу данных SQL, локальную или облачную, в качестве источника входных данных или приемника выходных данных для заданий Spark.
Эта библиотека содержит исходный код соединителя Apache Spark для SQL Server и Azure SQL.
Apache Spark — это единый аналитический механизм для крупномасштабной обработки данных.
В Maven доступны две версии соединителя: совместимая с версией 2.4.x и совместимая с версией 3.0.x. Обе версии можно найти здесь и импортировать с помощью приведенных ниже координат.
Соединитель | Координата Maven |
---|---|
Соединитель, совместимый со Spark 2.4.x | com.microsoft.azure:spark-mssql-connector:1.0.2 |
Соединитель, совместимый со Spark 3.0.x | com.microsoft.azure:spark-mssql-connector_2.12:1.1.0 |
Соединитель, совместимый со Spark 3.1.x | com.microsoft.azure:spark-mssql-connector_2.12:1.2.0 |
Вы также можете создать соединитель на основе источника данных или скачать JAR-файл из раздела выпуска на GitHub. Последние сведения о соединителе см. в репозитории GitHub для соединителя Spark SQL.
Поддерживаемые возможности
- Поддержка всех привязок Spark (Scala, Python, R).
- Поддержка обычной проверки подлинности и вкладки ключа Active Directory (AD).
- Поддержка для операции записи переупорядоченного
dataframe
. - Поддержка операции записи в единственном экземпляре SQL Server и пуле данных в Кластерах больших данных SQL Server.
- Поддержка надежного соединителя для единственного экземпляра SQL Server.
Компонент | Поддерживаемые версии |
---|---|
Apache Spark | 2.4.x, 3.0.x, 3.1.x |
Scala | 2.11, 2.12 |
Драйвер Microsoft JDBC для SQL Server | 8.4 |
Microsoft SQL Server | SQL Server 2008 или более поздняя версия |
Базы данных SQL Azure | Поддерживается |
Поддерживаемые параметры
Подключение or Apache Spark для SQL Server и Azure SQL поддерживает параметры, определенные здесь: SQL DataSource JDBC
Кроме того, поддерживаются указанные ниже параметры
Параметр | По умолчанию. | Description |
---|---|---|
reliabilityLevel |
BEST_EFFORT |
BEST_EFFORT или NO_DUPLICATES . NO_DUPLICATES реализует надежную операцию вставки в сценариях перезапуска исполнителя |
dataPoolDataSource |
none |
none означает, что значение не задано и соединитель должен записывать данные в один экземпляр SQL Server. Присвойте этому параметру имя источника данных для записи в таблицу пула данных в кластере больших данных |
isolationLevel |
READ_COMMITTED |
Указание уровня изоляции |
tableLock |
false |
Реализует операцию вставки с параметром TABLOCK для повышения производительности записи. |
schemaCheckEnabled |
true |
Отключает строгие проверки кадра данных и схемы таблицы SQL, если установлено значение false |
Другие параметры массового копирования могут быть заданы в качестве параметров для dataframe
и передаваться в API bulkcopy
при записи.
Сравнение производительности
Соединитель Apache Spark для SQL Server и Azure SQL выполняет операции в 15 раз быстрее, чем универсальный соединитель JDBC для записи в SQL Server. Характеристики производительности зависят от типа и объема данных, используемых параметров и могут варьироваться при запусках. Следующие результаты производительности отображают время, затрачиваемое на перезапись таблицы SQL с 143,9 млн строк в dataframe
Spark. dataframe
Spark создается путем считывания таблицы HDFS store_sales
, созданной с помощью теста производительности TPCDS Spark. Время на считывание store_sales
в dataframe
исключено. Результаты представляют собой усредненное значение по трем запускам.
Тип соединителя | Параметры | Description | Время записи |
---|---|---|---|
JDBCConnector |
По умолчанию | Универсальный соединитель JDBC с параметрами по умолчанию | 1385 секунд |
sql-spark-connector |
BEST_EFFORT |
Наилучший вариант sql-spark-connector с параметрами по умолчанию |
580 секунд |
sql-spark-connector |
NO_DUPLICATES |
Надежный sql-spark-connector |
709 секунд |
sql-spark-connector |
BEST_EFFORT + tabLock=true |
Наилучший вариант sql-spark-connector с включенной блокировкой таблицы |
72 секунды |
sql-spark-connector |
NO_DUPLICATES + tabLock=true |
Надежный sql-spark-connector с включенной блокировкой таблицы |
198 секунд |
Config
- Конфигурация Spark: num_executors = 20, executor_memory = 1664 m, executor_cores = 2.
- Конфигурация создания данных: scale_factor=50, partitioned_tables=true.
- Файл данных
store_sales
с 143 997 590 строками.
Среда
- CU5 кластера больших данных SQL Server.
master
+ 6 узлов.- Сервер 5-го поколения (для каждого узла), 512 ГБ ОЗУ, 4 ТБ NVM на узел, сетевая карта 10 ГБ.
Часто встречающиеся проблемы
java.lang.NoClassDefFoundError: com/microsoft/aad/adal4j/AuthenticationException
Эта проблема возникает из-за использования старой версии драйвера MSSQL (который теперь включен в этот соединитель) в среде Hadoop. Если вы используете предыдущую версию Подключение SQL Azure и вручную установили драйверы в этом кластере для обеспечения совместимости проверки подлинности Microsoft Entra, необходимо удалить эти драйверы.
Действия по устранению проблемы:
Если вы используете универсальную среду Hadoop, проверьте и удалите JAR-файл MSSQL:
rm $HADOOP_HOME/share/hadoop/yarn/lib/mssql-jdbc-6.2.1.jre7.jar
. Если вы используете Databricks, добавьте глобальный скрипт или скрипт инициализации кластера, чтобы удалить старые версии драйвера MSSQL из папки/databricks/jars
, или вставьте следующую строку в существующий сценарий:rm /databricks/jars/*mssql*
.Добавьте пакеты
adal4j
иmssql
. Например, можно использовать Maven, но должен подойти любой вариант.Внимание
Не устанавливайте соединитель SQL Spark таким образом.
Добавьте класс драйвера в конфигурацию подключения. Например:
connectionProperties = { `Driver`: `com.microsoft.sqlserver.jdbc.SQLServerDriver` }`
Дополнительные сведения и описание способов устранения см. здесь: https://github.com/microsoft/sql-spark-connector/issues/26.
Начать
Соединитель Apache Spark для SQL Server и Azure SQL основан на API Spark DataSourceV1 и API массовых операций SQL Server. Он использует тот же интерфейс, что и встроенный соединитель Spark-SQL JDBC. Такая интеграция позволяет легко интегрировать соединитель и перенести существующие задания Spark, просто обновив параметр формата с помощью com.microsoft.sqlserver.jdbc.spark
.
Чтобы включить соединитель в проекты, скачайте этот репозиторий и создайте JAR-файл с помощью SBT.
Запись в новую таблицу SQL
Предупреждение
Режим overwrite
сначала удаляет таблицу, если она уже существует в базе данных по умолчанию. Используйте этот параметр с осторожностью, чтобы избежать непредвиденных потерь данных.
При использовании режима overwrite
, если для воссоздания таблицы не используется параметр truncate
, индексы будут потеряны. Таблица columnstore будет кучей. Если вы хотите сохранить существующее индексирование, задайте для параметра truncate
значение true. Например, .option("truncate","true")
.
server_name = "jdbc:sqlserver://{SERVER_ADDR}"
database_name = "database_name"
url = server_name + ";" + "databaseName=" + database_name + ";"
table_name = "table_name"
username = "username"
password = "password123!#" # Please specify password here
try:
df.write \
.format("com.microsoft.sqlserver.jdbc.spark") \
.mode("overwrite") \
.option("url", url) \
.option("dbtable", table_name) \
.option("user", username) \
.option("password", password) \
.save()
except ValueError as error :
print("Connector write failed", error)
Добавление в таблицу SQL
try:
df.write \
.format("com.microsoft.sqlserver.jdbc.spark") \
.mode("append") \
.option("url", url) \
.option("dbtable", table_name) \
.option("user", username) \
.option("password", password) \
.save()
except ValueError as error :
print("Connector write failed", error)
Указание уровня изоляции
По умолчанию этот соединитель использует уровень изоляции READ_COMMITTED
при выполнении массовой операции вставки в базу данных. Если вы хотите переопределить уровень изоляции, используйте параметр mssqlIsolationLevel
, как показано ниже.
.option("mssqlIsolationLevel", "READ_UNCOMMITTED") \
Чтение из таблицы SQL
jdbcDF = spark.read \
.format("com.microsoft.sqlserver.jdbc.spark") \
.option("url", url) \
.option("dbtable", table_name) \
.option("user", username) \
.option("password", password).load()
Проверка подлинности Microsoft Entra
Пример Python с субъектом-службой
context = adal.AuthenticationContext(authority)
token = context.acquire_token_with_client_credentials(resource_app_id_url, service_principal_id, service_principal_secret)
access_token = token["accessToken"]
jdbc_db = spark.read \
.format("com.microsoft.sqlserver.jdbc.spark") \
.option("url", url) \
.option("dbtable", table_name) \
.option("accessToken", access_token) \
.option("encrypt", "true") \
.option("hostNameInCertificate", "*.database.windows.net") \
.load()
Пример Python с паролем Active Directory
jdbc_df = spark.read \
.format("com.microsoft.sqlserver.jdbc.spark") \
.option("url", url) \
.option("dbtable", table_name) \
.option("authentication", "ActiveDirectoryPassword") \
.option("user", user_name) \
.option("password", password) \
.option("encrypt", "true") \
.option("hostNameInCertificate", "*.database.windows.net") \
.load()
Для проверки подлинности с помощью Active Directory необходимо установить требуемую зависимость.
Формат user
, когда используется ActiveDirectoryPassword, должен быть форматом имени участника-пользователя, например username@domainname.com
.
Для Scala необходимо установить артефакт _com.microsoft.aad.adal4j_
.
Для Python необходимо установить библиотеку _adal_
. Это можно сделать с помощью PIP.
Поддержка
Соединитель Apache Spark для Azure SQL и SQL Server является проектом с открытым кодом. Корпорация Майкрософт не предоставляет поддержку для этого проекта. Чтобы устранить проблемы с соединителем или если у вас возникли вопросы, создайте запрос в этом репозитории проекта. Сообщество по вопросам соединителей активно отслеживает запросы.
Следующие шаги
Посетите репозиторий GitHub для соединителя SQL Spark.
Сведения об уровнях изоляции см. в разделе SET TRANSACTION ISOLATION LEVEL (Transact-SQL).