Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
Соединитель Apache Spark для SQL Server и SQL Azure — это высокопроизводительный соединитель, который можно использовать для включения транзакционных данных в аналитику больших данных и сохранения результатов для нерегламентированных запросов или отчетов. С помощью соединителя можно использовать любую базу данных SQL, локальную или в облаке в качестве входного источника данных или приемника выходных данных для заданий Spark.
Замечание
Этот соединитель не поддерживается активно. Эта статья сохраняется только для архивных целей.
Эта библиотека содержит исходный код соединителя Apache Spark для платформ SQL Server и SQL Azure.
Apache Spark — это единый аналитический механизм для крупномасштабной обработки данных.
Две версии соединителя доступны через Maven: совместимую версию 2.4.x и совместимую версию 3.0.x. Скачайте соединители из maven.org и импортируйте их с помощью координат:
| Соединитель | Координата Maven |
|---|---|
| Соединитель, совместимый с Spark 2.4.x | com.microsoft.azure:spark-mssql-connector:1.0.2 |
| Соединитель, совместимый с Spark 3.0.x | com.microsoft.azure:spark-mssql-connector_2.12:1.1.0 |
| Соединитель, совместимый с Spark 3.1.x | com.microsoft.azure:spark-mssql-connector_2.12:1.2.0 |
Вы также можете создать соединитель из источника или скачать JAR-файл из раздела "Выпуск" в GitHub. Последние сведения о соединителе см. в репозитории GitHub соединителя SQL Spark.
Поддерживаемые функции
- Поддержка всех привязок Spark (Scala, Python, R)
- Поддержка базовой аутентификации и Key Tab для Active Directory (AD)
- Поддержка переупорядоченной
dataframeзаписи - Поддержка записи в одиночный экземпляр SQL Server и пул данных в кластерах больших данных SQL Server
- Поддержка надежных соединителей для одного экземпляра SQL Server
| Компонент | Поддерживаемые версии |
|---|---|
| Apache Spark | 2.4.x, 3.0.x, 3.1.x |
| язык программирования Scala | 2.11, 2.12 |
| Драйвер Microsoft JDBC для SQL Server | 8.4 |
| Microsoft SQL Server | SQL Server 2008 или более поздняя версия |
| Базы данных SQL Azure | Поддерживается |
Поддерживаемые параметры
Соединитель Apache Spark для SQL Server и AZURE SQL поддерживает параметры, определенные в статье SQL DataSource JDBC .
Кроме того, соединитель поддерживает следующие параметры:
| Вариант | По умолчанию | Описание |
|---|---|---|
reliabilityLevel |
BEST_EFFORT |
BEST_EFFORT или NO_DUPLICATES.
NO_DUPLICATES реализует надежную вставку в условиях перезапуска выполнителя |
dataPoolDataSource |
none |
none означает, что значение не задано, и соединитель должен записывать в один экземпляр SQL Server. Задайте для этого значения имя источника данных для записи таблицы пула данных в кластерах больших данных. |
isolationLevel |
READ_COMMITTED |
Указание уровня изоляции |
tableLock |
false |
Реализует вставку с TABLOCK параметром для повышения производительности записи |
schemaCheckEnabled |
true |
Отключает строгий кадр данных и проверку схемы таблицы SQL при значении false |
Задайте другие параметры массового копирования как опции для dataframe. Соединитель передает эти параметры в bulkcopy API при записи.
Сравнение производительности
Соединитель Apache Spark для SQL Server и Azure SQL до 15 раз быстрее, чем универсальный соединитель JDBC для записи в SQL Server. Характеристики производительности зависят от типа, объема данных, используемых параметров и могут отображаться различия между каждым запуском. Следующие результаты производительности — это время, затраченное на перезаписи таблицы SQL с строками 143,9M в spark dataframe. Spark dataframe создается путем чтения store_sales таблицы HDFS, созданной с помощью spark TPCDS Benchmark. Время чтения от store_sales до dataframe исключается. Результаты усреднены по трем запускам.
| Тип соединителя | Опции | Описание | Время записи |
|---|---|---|---|
JDBCConnector |
По умолчанию | Универсальный соединитель JDBC с параметрами по умолчанию | 1385 секунд |
sql-spark-connector |
BEST_EFFORT |
Лучшие усилия sql-spark-connector с параметрами по умолчанию |
580 секунд |
sql-spark-connector |
NO_DUPLICATES |
Достоверный sql-spark-connector |
709 секунд |
sql-spark-connector |
BEST_EFFORT + tabLock=true |
Максимальные усилия sql-spark-connector при включенной блокировке таблицы |
72 секунды |
sql-spark-connector |
NO_DUPLICATES + tabLock=true |
Надежный с включенной sql-spark-connector блокировкой таблицы |
198 секунд |
Конфигурация
- Конфигурация Spark: число_исполнителей = 20, память_исполнителей = 1664 м, ядра_исполнителей = 2
- Конфигурация генерации данных: scale_factor = 50, partitioned_tables = true
- Файл
store_salesданных с числом строк 143 997 590
Окружающая среда
- Кластер больших данных SQL Server CU5
-
master+ 6 узлов - Каждый узел представляет собой сервер 5-го поколения с 512 ГБ ОЗУ, 4-ТБ NVM на узел и сетевой интерфейс 10 Гбит/с (NIC).
Распространенные проблемы
java.lang.NoClassDefFoundError: com/microsoft/aad/adal4j/AuthenticationException
Эта ошибка возникает при использовании более старой mssql версии драйвера в среде Hadoop. Теперь соединитель включает этот драйвер. Если вы ранее использовали соединитель SQL Azure и вручную установили драйверы в кластере для совместимости проверки подлинности Microsoft Entra, удалите эти драйверы.
Чтобы устранить ошибку, выполните следующие действия.
Если вы используете универсальную среду Hadoop, проверьте и удалите
mssqlJAR-файл со следующей командой:rm $HADOOP_HOME/share/hadoop/yarn/lib/mssql-jdbc-6.2.1.jre7.jarЕсли вы используете Databricks, добавьте глобальный или кластерный скрипт инициализации для удаления старыхmssqlверсий драйвера из/databricks/jarsпапки или добавьте эту строку в существующий скрипт:rm /databricks/jars/*mssql*Добавьте пакеты
adal4jиmssql. Например, можно использовать Maven, но любой способ должен работать.Осторожность
Не устанавливайте соединитель SQL Spark таким образом.
Добавьте класс драйвера в конфигурацию подключения. Рассмотрим пример.
connectionProperties = { `Driver`: `com.microsoft.sqlserver.jdbc.SQLServerDriver` }`
Дополнительные сведения см. в разъяснении https://github.com/microsoft/sql-spark-connector/issues/26.
Начать сейчас
Соединитель Apache Spark для SQL Server и Azure SQL основан на API Spark DataSourceV1 и Bulk API SQL Server. Он использует тот же интерфейс, что и встроенный соединитель JDBC Spark-SQL. С помощью этой интеграции можно легко подключить соединитель и перенести существующие задания Spark, обновив параметр формата с com.microsoft.sqlserver.jdbc.spark помощью.
Чтобы включить соединитель в проекты, скачайте этот репозиторий и создайте JAR-файл с помощью SBT.
Запись в новую таблицу SQL
Предупреждение
Режим overwrite сначала удаляет таблицу, если она уже существует в базе данных. Используйте этот параметр с осторожностью, чтобы избежать непредвиденных потерь данных.
Если вы используете режим overwrite без параметра truncate при повторном создании таблицы, операция удаляет индексы. Кроме того, таблица columnstore изменяется на таблицу куч. Чтобы сохранить существующие индексы, установите параметр truncate в true. Например: .option("truncate","true").
server_name = "jdbc:sqlserver://{SERVER_ADDR}"
database_name = "database_name"
url = server_name + ";" + "databaseName=" + database_name + ";"
table_name = "table_name"
username = "username"
password = "password123!#" # Please specify password here
try:
df.write \
.format("com.microsoft.sqlserver.jdbc.spark") \
.mode("overwrite") \
.option("url", url) \
.option("dbtable", table_name) \
.option("user", username) \
.option("password", password) \
.save()
except ValueError as error :
print("Connector write failed", error)
Добавление в таблицу SQL
try:
df.write \
.format("com.microsoft.sqlserver.jdbc.spark") \
.mode("append") \
.option("url", url) \
.option("dbtable", table_name) \
.option("user", username) \
.option("password", password) \
.save()
except ValueError as error :
print("Connector write failed", error)
Указание уровня изоляции
Этот соединитель использует уровень изоляции READ_COMMITTED по умолчанию при массовой вставке данных в базу данных. Чтобы переопределить уровень изоляции, используйте mssqlIsolationLevel параметр:
.option("mssqlIsolationLevel", "READ_UNCOMMITTED") \
Чтение из таблицы SQL
jdbcDF = spark.read \
.format("com.microsoft.sqlserver.jdbc.spark") \
.option("url", url) \
.option("dbtable", table_name) \
.option("user", username) \
.option("password", password).load()
Аутентификация Microsoft Entra
Пример Python с субъектом-службой
context = adal.AuthenticationContext(authority)
token = context.acquire_token_with_client_credentials(resource_app_id_url, service_principal_id, service_principal_secret)
access_token = token["accessToken"]
jdbc_db = spark.read \
.format("com.microsoft.sqlserver.jdbc.spark") \
.option("url", url) \
.option("dbtable", table_name) \
.option("accessToken", access_token) \
.option("encrypt", "true") \
.option("hostNameInCertificate", "*.database.windows.net") \
.load()
Пример Python с паролем Active Directory
jdbc_df = spark.read \
.format("com.microsoft.sqlserver.jdbc.spark") \
.option("url", url) \
.option("dbtable", table_name) \
.option("authentication", "ActiveDirectoryPassword") \
.option("user", user_name) \
.option("password", password) \
.option("encrypt", "true") \
.option("hostNameInCertificate", "*.database.windows.net") \
.load()
Чтобы пройти проверку подлинности с помощью Active Directory, установите необходимую зависимость.
При использовании ActiveDirectoryPassword значение user должно находиться в формате UPN, например username@domainname.com.
Для Scala установите com.microsoft.aad.adal4j артефакт.
Для Python установите библиотеку adal . Эта библиотека доступна через pip.
Примеры см. в образцах записных книжек.