Поделиться через


Соединитель Spark для баз данных SQL (предварительная версия)

Это важно

Эта функция доступна в предварительной версии.

Соединитель Spark для баз данных SQL — это высокопроизводительная библиотека, которая позволяет считывать и записывать данные в SQL Server, базы данных SQL Azure и базы данных SQL Fabric. Соединитель предлагает следующие возможности:

  • Используйте Spark для выполнения больших операций записи и чтения в Базе данных SQL Azure, Управляемом экземпляре SQL Azure, SQL Server на виртуальной машине Azure и базах данных SQL Fabric.
  • При использовании таблицы или представления соединитель поддерживает модели безопасности, заданные на уровне ядра SQL. К этим моделям относятся безопасность на уровне объектов (OLS), безопасность на уровне строк (RLS) и безопасность на уровне столбцов (CLS).

Соединитель предварительно установлен в среде выполнения Fabric, поэтому его не нужно устанавливать отдельно.

Authentication

Проверка подлинности Microsoft Entra интегрирована с Microsoft Fabric.

  • При входе в рабочую область Fabric учетные данные автоматически передаются подсистеме SQL для проверки подлинности и авторизации.
  • Требуется, чтобы идентификатор Microsoft Entra был включен и настроен в ядре СУБД SQL.
  • Дополнительная конфигурация в коде Spark не требуется, если настроен идентификатор Microsoft Entra. Учетные данные автоматически сопоставляются.

Можно также использовать метод проверки подлинности SQL (указав имя пользователя и пароль SQL) или служебный принципал (предоставив токен доступа Azure для проверки подлинности для приложений).

Permissions

Чтобы использовать соединитель Spark, удостоверение ( будь то пользователь или приложение) должно иметь необходимые разрешения базы данных для целевого ядра SQL. Эти разрешения необходимы для чтения или записи в таблицы и представления.

Для Базы данных SQL Azure, Управляемого экземпляра SQL Azure и SQL Server на виртуальной машине Azure:

  • Идентификация, выполняющая операцию, обычно требует db_datawriter и db_datareader права, а также при необходимости db_owner для полного управления.

Для баз данных SQL Fabric:

  • Идентификатор обычно нуждается в db_datawriter и db_datareader разрешениях, и при необходимости db_owner.
  • Удостоверение также должно иметь по крайней мере разрешение на чтение базы данных SQL Fabric на уровне элемента.

Замечание

Если вы используете учетную запись службы, она может выполняться в качестве приложения (без пользовательского контекста) или как пользователь, если олицетворение пользователя разрешено. Субъект-служба должен иметь необходимые разрешения базы данных для выполняемых операций.

Примеры использования и кода

В этом разделе приведены примеры кода для демонстрации эффективного использования соединителя Spark для баз данных SQL. В этих примерах рассматриваются различные сценарии, включая чтение и запись в таблицы SQL и настройку параметров соединителя.

Поддерживаемые параметры

Минимальный необходимый параметр — это url как "jdbc:sqlserver://<server>:<port>;database=<database>;" или установка spark.mssql.connector.default.url.

  • Если указан параметр url , выполните следующие действия.

    • Всегда используйте url в качестве первого предпочтения.
    • Если spark.mssql.connector.default.url он не задан, соединитель задал его и повторно использует его для дальнейшего использования.
  • url Если это не указано:

    • Если spark.mssql.connector.default.url задано, соединитель использует значение из конфигурации Spark.
    • Если spark.mssql.connector.default.url не задано, возникает ошибка, так как необходимые сведения недоступны.

Этот соединитель поддерживает параметры, определенные здесь: параметры JDBC для SQL DataSource

Соединитель также поддерживает следующие параметры:

Вариант Значение по умолчанию Description
reliabilityLevel "максимальные усилия" Управляет надежностью операций вставки. Возможные значения: BEST_EFFORT (по умолчанию, самый быстрый, может привести к повторяющимся строкам при перезапуске исполнителя) NO_DUPLICATES (медленнее, гарантирует, что повторяющиеся строки не вставляются даже при перезапуске исполнителя). Выберите в зависимости от допустимости повторяющихся данных и потребностей в производительности.
isolationLevel "READ_COMMITTED" Задает уровень изоляции транзакций для операций SQL. Возможные значения: READ_COMMITTED (по умолчанию, предотвращает чтение незафиксированных данных), READ_UNCOMMITTED, , REPEATABLE_READ, SNAPSHOT. SERIALIZABLE Более высокие уровни изоляции могут снизить параллелизм, но повысить согласованность данных.
tableLock ложь Определяет, используется ли подсказка блокировки табличного уровня SQL Server TABLOCK во время операций вставки. Возможные значения: true (включает TABLOCK, что может повысить производительность массовой записи), false (по умолчанию не использует TABLOCK). Настройка параметра на true может увеличить пропускную способность для крупных вставок, но может снизить одновременность для других операций с таблицей.
schemaCheckEnabled правда Определяет, применяется ли строгая проверка схемы между Spark DataFrame и таблицей SQL. Возможные значения: true (по умолчанию применяется строгое сопоставление схемы), false (обеспечивает большую гибкость и может пропустить некоторые проверки схемы). Установка значения false может помочь при несоответствиях схемы, однако это может привести к неожиданным результатам, если структуры значительно отличаются.

Другие параметры массового API можно задать в DataFrame, и они передаются в API массового копирования при записи.

Пример записи и чтения данных

В следующем коде показано, как записывать и считывать данные с помощью mssql("<schema>.<table>") метода с автоматической проверкой подлинности идентификатора Microsoft Entra ID.

Подсказка

Данные создаются оперативно для демонстрационных целей. В производственном сценарии вы бы обычно считывали данные из существующего источника или создавали более сложные DataFrame.

import com.microsoft.sqlserver.jdbc.spark
url = "jdbc:sqlserver://<server>:<port>;database=<database>;"
row_data = [("Alice", 1),("Bob", 2),("Charlie", 3)]
column_header = ["Name", "Age"]
df = spark.createDataFrame(row_data, column_header)
df.write.mode("overwrite").option("url", url).mssql("dbo.publicExample")
spark.read.option("url", url).mssql("dbo.publicExample").show()

url = "jdbc:sqlserver://<server>:<port>;database=<database2>;" # different database
df.write.mode("overwrite").option("url", url).mssql("dbo.tableInDatabase2") # default url is updated
spark.read.mssql("dbo.tableInDatabase2").show() # no url option specified and will use database2

Вы также можете выбирать столбцы, применять фильтры и использовать другие параметры при чтении данных из ядра СУБД SQL.

Примеры проверки подлинности

В следующих примерах показано, как использовать методы проверки подлинности, отличные от идентификатора Microsoft Entra, например субъект-служба (маркер доступа) и проверку подлинности SQL.

Замечание

Как упоминалось ранее, проверка подлинности Идентификатора Microsoft Entra обрабатывается автоматически при входе в рабочую область Fabric, поэтому вам нужно использовать только эти методы, если сценарий требует их.

import com.microsoft.sqlserver.jdbc.spark
url = "jdbc:sqlserver://<server>:<port>;database=<database>;"
row_data = [("Alice", 1),("Bob", 2),("Charlie", 3)]
column_header = ["Name", "Age"]
df = spark.createDataFrame(row_data, column_header)

from azure.identity import ClientSecretCredential
credential = ClientSecretCredential(tenant_id="", client_id="", client_secret="") # service principal app
scope = "https://database.windows.net/.default"
token = credential.get_token(scope).token

df.write.mode("overwrite").option("url", url).option("accesstoken", token).mssql("dbo.publicExample")
spark.read.option("accesstoken", token).mssql("dbo.publicExample").show()

Поддерживаемые режимы сохранения DataFrame

При записи данных из Spark в базы данных SQL можно выбрать один из нескольких режимов сохранения. Режимы сохранения управляют записью данных, когда целевая таблица уже существует, и может повлиять на схему, данные и индексирование. Понимание этих режимов помогает избежать непредвиденных потерь данных или изменений.

Этот соединитель поддерживает параметры, определенные здесь: функции сохранения Spark

  • ErrorIfExists (режим сохранения по умолчанию): если целевая таблица существует, запись прерывается и возвращается исключение. В противном случае создается новая таблица с данными.

  • Игнорировать: если целевая таблица существует, запись игнорирует запрос и не возвращает ошибку. В противном случае создается новая таблица с данными.

  • Перезапись. Если целевая таблица существует, таблица удаляется, повторно создается и добавляются новые данные.

    Замечание

    При использовании overwrite исходная схема таблицы (особенно MSSQL-эксклюзивные типы данных) и индексы таблиц утрачиваются и заменяются схемой, определяемой из Spark DataFrame. Чтобы избежать потери схемы и индексов, используйте .option("truncate", true) вместо overwrite.

  • Добавление: если целевая таблица существует, к ней добавляются новые данные. В противном случае создается новая таблица с данными.

Troubleshoot

После завершения процесса выходные данные операции чтения Spark отображаются в области выходных данных ячейки. Ошибки com.microsoft.sqlserver.jdbc.SQLServerException происходят непосредственно из SQL Server. Подробные сведения об ошибках можно найти в журналах приложений Spark.