Запрос Amazon Redshift с помощью Azure Databricks
Таблицы можно считывать и записывать из Amazon Redshift с помощью Azure Databricks.
Внимание
Конфигурации, описанные в этой статье, являются экспериментальными. Экспериментальные функции предоставляются как есть и не поддерживаются Databricks через техническую поддержку клиентов. Чтобы получить полную поддержку федерации запросов, следует вместо этого использовать федерацию Lakehouse, которая позволяет пользователям Azure Databricks воспользоваться синтаксисом каталога Unity и средствами управления данными.
Источник данных Databricks Redshift использует Amazon S3 для эффективной передачи данных из Redshift и использует JDBC для автоматического активации соответствующих COPY
и UNLOAD
команд в Redshift.
Примечание.
В Databricks Runtime 11.3 LTS и более поздних версиях среда выполнения Databricks включает драйвер Redshift JDBC, доступный redshift
с помощью ключевого слова для параметра форматирования. См . заметки о выпуске Databricks Runtime и совместимость версий драйверов, включенных в каждую среду выполнения Databricks. Предоставленные пользователем драйверы по-прежнему поддерживаются и имеют приоритет над пакетным драйвером JDBC.
В Databricks Runtime 10.4 LTS и ниже требуется ручная установка драйвера Redshift JDBC, а запросы должны использовать драйвер (com.databricks.spark.redshift
) для формата. См. раздел установки драйвера Redshift.
Использование
В следующих примерах показано подключение к драйверу Redshift. Замените url
значения параметров, если вы используете драйвер JDBC PostgreSQL.
После настройки учетных данных AWS можно использовать источник данных с API источника данных Spark в Python, SQL, R или Scala.
Внимание
Внешние расположения, определенные в каталоге Unity, не поддерживаются в качестве tempdir
расположений.
Python
# Read data from a table using Databricks Runtime 10.4 LTS and below
df = (spark.read
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("forward_spark_s3_credentials", True)
.load()
)
# Read data from a table using Databricks Runtime 11.3 LTS and above
df = (spark.read
.format("redshift")
.option("host", "hostname")
.option("port", "port") # Optional - will use default port 5439 if not specified.
.option("user", "username")
.option("password", "password")
.option("database", "database-name")
.option("dbtable", "schema-name.table-name") # if schema-name is not specified, default to "public".
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("forward_spark_s3_credentials", True)
.load()
)
# Read data from a query
df = (spark.read
.format("redshift")
.option("query", "select x, count(*) <your-table-name> group by x")
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("forward_spark_s3_credentials", True)
.load()
)
# After you have applied transformations to the data, you can use
# the data source API to write the data back to another table
# Write back to a table
(df.write
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.mode("error")
.save()
)
# Write back to a table using IAM Role based authentication
(df.write
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
.mode("error")
.save()
)
SQL
Чтение данных с помощью SQL в Databricks Runtime 10.4 LTS и ниже:
DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table
USING redshift
OPTIONS (
dbtable '<table-name>',
tempdir 's3a://<bucket>/<directory-path>',
url 'jdbc:redshift://<database-host-url>',
user '<username>',
password '<password>',
forward_spark_s3_credentials 'true'
);
SELECT * FROM redshift_table;
Чтение данных с помощью SQL в Databricks Runtime 11.3 LTS и выше:
DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table
USING redshift
OPTIONS (
host '<hostname>',
port '<port>', /* Optional - will use default port 5439 if not specified. *./
user '<username>',
password '<password>',
database '<database-name>'
dbtable '<schema-name>.<table-name>', /* if schema-name not provided, default to "public". */
tempdir 's3a://<bucket>/<directory-path>',
forward_spark_s3_credentials 'true'
);
SELECT * FROM redshift_table;
Запись данных с помощью SQL:
DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table_new
USING redshift
OPTIONS (
dbtable '<new-table-name>',
tempdir 's3a://<bucket>/<directory-path>',
url 'jdbc:redshift://<database-host-url>',
user '<username>',
password '<password>',
forward_spark_s3_credentials 'true'
) AS
SELECT * FROM table_name;
API SQL поддерживает только создание новых таблиц, а не перезапись или добавление.
R
Чтение данных с помощью R в Databricks Runtime 10.4 LTS и ниже:
df <- read.df(
NULL,
"com.databricks.spark.redshift",
tempdir = "s3a://<your-bucket>/<your-directory-path>",
dbtable = "<your-table-name>",
url = "jdbc:redshift://<the-rest-of-the-connection-string>")
Чтение данных с помощью R в Databricks Runtime 11.3 LTS и выше:
df <- read.df(
NULL,
"redshift",
host = "hostname",
port = "port",
user = "username",
password = "password",
database = "database-name",
dbtable = "schema-name.table-name",
tempdir = "s3a://<your-bucket>/<your-directory-path>",
forward_spark_s3_credentials = "true",
dbtable = "<your-table-name>")
Scala
// Read data from a table using Databricks Runtime 10.4 LTS and below
val df = spark.read
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("forward_spark_s3_credentials", True)
.load()
// Read data from a table using Databricks Runtime 11.3 LTS and above
val df = spark.read
.format("redshift")
.option("host", "hostname")
.option("port", "port") /* Optional - will use default port 5439 if not specified. */
.option("user", "username")
.option("password", "password")
.option("database", "database-name")
.option("dbtable", "schema-name.table-name") /* if schema-name is not specified, default to "public". */
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("forward_spark_s3_credentials", true)
.load()
// Read data from a query
val df = spark.read
.format("redshift")
.option("query", "select x, count(*) <your-table-name> group by x")
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("forward_spark_s3_credentials", True)
.load()
// After you have applied transformations to the data, you can use
// the data source API to write the data back to another table
// Write back to a table
df.write
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.mode("error")
.save()
// Write back to a table using IAM Role based authentication
df.write
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
.mode("error")
.save()
Рекомендации по работе с Redshift
Выполнение запроса может извлекать большие объемы данных в S3. Если вы планируете выполнить несколько запросов к тем же данным в Redshift, Databricks рекомендует сохранить извлеченные данные с помощью Delta Lake.
Настройка
Проверка подлинности в S3 и Redshift
Источник данных включает несколько сетевых подключений, показанных на следующей схеме:
┌───────┐
┌───────────────────>│ S3 │<─────────────────┐
│ IAM or keys └───────┘ IAM or keys │
│ ^ │
│ │ IAM or keys │
v v ┌──────v────┐
┌────────────┐ ┌───────────┐ │┌──────────┴┐
│ Redshift │ │ Spark │ ││ Spark │
│ │<──────────>│ Driver │<────────>| Executors │
└────────────┘ └───────────┘ └───────────┘
JDBC with Configured
username / in
password Spark
(SSL enabled by default)
Источник данных считывает и записывает данные в S3 при передаче данных в Redshift и из нее. В результате требуется учетные данные AWS с доступом на чтение и запись в контейнер S3 (указанный tempdir
с помощью параметра конфигурации).
Примечание.
Источник данных не очищает временные файлы, создаваемые в S3. В результате рекомендуется использовать выделенный временный контейнер S3 с конфигурацией жизненного цикла объектов, чтобы гарантировать автоматическое удаление временных файлов после указанного срока действия. Сведения о шифровании этих файлов см. в разделе "Шифрование " этого документа. Нельзя использовать внешнее расположение, определенное в каталоге tempdir
Unity в качестве расположения.
В следующих разделах описаны параметры конфигурации проверки подлинности каждого подключения:
Драйвер Spark в Redshift
Драйвер Spark подключается к Redshift через JDBC с помощью имени пользователя и пароля. Redshift не поддерживает использование ролей IAM для проверки подлинности этого подключения. По умолчанию это подключение использует шифрование SSL; Дополнительные сведения см. в разделе "Шифрование".
Spark до S3
S3 выступает в качестве посредника для хранения массовых данных при чтении или записи в Redshift. Spark подключается к S3 с помощью интерфейсов Файловой системы Hadoop и непосредственно с помощью клиента S3 пакета SDK для Amazon Java.
Примечание.
Подключения DBFS нельзя использовать для настройки доступа к S3 для Redshift.
Задайте ключи в conf Hadoop: вы можете указать ключи AWS с помощью свойств конфигурации Hadoop.
tempdir
Если конфигурация указывает на файловуюs3a://
систему, можно задатьfs.s3a.access.key
иfs.s3a.secret.key
свойства в XML-файле конфигурации Hadoop или вызватьsc.hadoopConfiguration.set()
настройку глобальной конфигурации Hadoop Spark. При использовании файловой системы можно указать устаревшиеs3n://
ключи конфигурации, как показано в следующем примере.Scala
Например, если вы используете файловую
s3a
систему, добавьте:sc.hadoopConfiguration.set("fs.s3a.access.key", "<your-access-key-id>") sc.hadoopConfiguration.set("fs.s3a.secret.key", "<your-secret-key>")
Для устаревшей файловой
s3n
системы добавьте:sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "<your-access-key-id>") sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "<your-secret-key>")
Python
Следующая команда использует некоторые внутренние компоненты Spark, но должна работать со всеми версиями PySpark и вряд ли изменится в будущем:
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", "<your-access-key-id>") sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "<your-secret-key>")
Redshift до S3
forward_spark_s3_credentials
Задайте параметр для true
автоматической пересылки учетных данных ключа AWS, которые Spark использует для подключения к S3 через JDBC в Redshift. Запрос JDBC внедряет эти учетные данные, поэтому Databricks настоятельно рекомендует включить шифрование SSL подключения JDBC.
Шифрование
Защита JDBC: если в URL-адресе JDBC отсутствуют какие-либо параметры, связанные с SSL, источник данных по умолчанию включает шифрование SSL, а также проверяет, является ли сервер Redshift надежным (т
sslmode=verify-full
. е. ). Для этого сертификат сервера автоматически загружается с серверов Amazon при первом необходимости. В случае сбоя предварительно упакованный файл сертификата используется в качестве резервного варианта. Это относится как к драйверам Redshift, так и к JDBC PostgreSQL.Если с этой функцией возникают какие-либо проблемы или вы просто хотите отключить SSL, можно вызвать
.option("autoenablessl", "false")
илиDataFrameReader
DataFrameWriter
.Если вы хотите указать настраиваемые параметры, связанные с SSL, вы можете следовать инструкциям в документации Redshift: использование SSL-сертификатов и сертификатов сервера в Java и JDBC Driver Configuration Options Any SSL-связанных параметров, присутствующих в JDBC
url
, используемых с источником данных, имеет приоритет (то есть автоматическая настройка не будет активироваться).Шифрование данных ВЫГРУЗКИ, хранящихся в S3 (данные, хранящиеся при чтении из Redshift): согласно документации Redshift по выгрузке данных в S3, "ВЫГРУЗ автоматически шифрует файлы данных с помощью шифрования на стороне сервера Amazon S3 (SSE-S3).
Redshift также поддерживает шифрование на стороне клиента с помощью пользовательского ключа (см. раздел " Выгрузка зашифрованных файлов данных"), но источник данных не имеет возможности указать необходимый симметричный ключ.
Шифрование данных COPY, хранящихся в S3 (данные, хранящиеся при записи в Redshift): в соответствии с документацией Redshift по загрузке зашифрованных файлов данных из Amazon S3:
С помощью команды можно COPY
загрузить файлы данных, отправленные в Amazon S3, с помощью шифрования на стороне сервера с ключами шифрования, управляемыми AWS (SSE-S3 или SSE-KMS), шифрование на стороне клиента или оба. COPY не поддерживает шифрование на стороне сервера Amazon S3 с ключом, предоставленным клиентом (SSE-C).
Параметры
Карта параметров или ПАРАМЕТРЫ, предоставляемые в Spark SQL, поддерживают следующие параметры:
Параметр | Обязательное поле | По умолчанию. | Description |
---|---|---|---|
dbtable | Да, если запрос не указан. | нет | Таблица для создания или чтения из Redshift. Этот параметр требуется при сохранении данных обратно в Redshift. |
query | Да, если не указан параметр dbtable. | нет | Запрос для чтения из Redshift. |
Пользователь | No | нет | Имя пользователя Redshift. Необходимо использовать в тандеме с параметром пароля. Можно использовать только в том случае, если пользователь и пароль не передаются в URL-адресе, передача обоих приведет к ошибке. Используйте этот параметр, если имя пользователя содержит специальные символы, которые необходимо экранировать. |
password | No | нет | Пароль Redshift. Используется в сочетании с параметром user . Можно использовать только в том случае, если пользователь и пароль не передаются в URL-адресе; Передача обоих приведет к ошибке. Используйте этот параметр, если пароль содержит специальные символы, которые необходимо экранировать. |
URL-адрес | Да | Не допускается | URL-адрес JDBC в форматеjdbc:subprotocol://<host>:<port>/database?user=<username>&password=<password> subprotocol может быть postgresql или redshift в зависимости от загруженного драйвера JDBC. Один драйвер, совместимый с Redshift, должен находиться в пути класса и соответствовать этому URL-адресу. host и port должен указывать на главный узел Redshift, поэтому группы безопасности и /или VPC должны быть настроены, чтобы разрешить доступ из приложения драйвера.database определяет имя user базы данных Redshift и password являются учетными данными для доступа к базе данных, которая должна быть внедрена в этот URL-адрес для JDBC, а учетная запись пользователя должна иметь необходимые привилегии для указанной таблицы. |
search_path | No | нет | Задайте путь поиска схемы в Redshift. Будет задано с помощью SET search_path to команды. Должен быть разделен запятыми список имен схем для поиска таблиц в. См . документацию по Redshift search_path. |
aws_iam_role | Только при использовании ролей IAM для авторизации. | нет | Полный указанный ARN роли операций IAM Redshift COPY/UNLOAD, присоединенной к кластеру Redshift, например arn:aws:iam::123456789000:role/<redshift-iam-role> . |
forward_spark_s3_credentials | No | false |
Если true источник данных автоматически обнаруживает учетные данные, которые Spark использует для подключения к S3, и перенаправит эти учетные данные в Redshift через JDBC. Эти учетные данные отправляются в рамках запроса JDBC, поэтому настоятельно рекомендуется включить шифрование SSL подключения JDBC при использовании этого параметра. |
temporary_aws_access_key_id | No | нет | Ключ доступа AWS должен иметь разрешения на запись в контейнер S3. |
temporary_aws_secret_access_key | No | нет | Ключ секретного доступа AWS, соответствующий предоставленному ключу доступа. |
temporary_aws_session_token | No | нет | Токен сеанса AWS, соответствующий предоставленному ключу доступа. |
tempdir | Да | Не допускается | Записываемое расположение в Amazon S3, которое будет использоваться для выгрузки данных при чтении и загрузке данных Avro в Redshift при записи. Если вы используете источник данных Redshift для Spark в рамках обычного конвейера ETL, это может быть полезно, чтобы задать политику жизненного цикла в контейнере и использовать ее в качестве временного расположения для этих данных. Нельзя использовать внешние расположения, определенные в каталоге Unity в качестве tempdir расположений. |
jdbcdriver | No | Определяется подпротоколом URL-адреса JDBC. | Имя класса используемого драйвера JDBC. Этот класс должен находиться в подкаталоге классов. В большинстве случаев не следует указывать этот параметр, так как соответствующее имя класса драйвера должно автоматически определяться подпротоколом URL-адреса JDBC. |
diststyle | No | EVEN |
Стиль распределения Redshift, используемый при создании таблицы. Может быть одним из EVEN них KEY или ALL (см. документы Redshift). При использовании KEY необходимо также задать ключ распространения с параметром distkey. |
distkey | Нет, если не используется DISTSTYLE KEY |
нет | Имя столбца в таблице, используемого в качестве ключа распространения при создании таблицы. |
sortkeyspec | No | нет | Полное определение ключа сортировки Redshift. Вот некоторые примеры. - SORTKEY(my_sort_column) - COMPOUND SORTKEY(sort_col_1, sort_col_2) - INTERLEAVED SORTKEY(sort_col_1, sort_col_2) |
usestagingtable (не рекомендуется) | No | true |
Установка этого нерекомендуемого параметра приводит к тому, что false целевая таблица операции перезаписи удаляется сразу же в начале записи, что делает операцию перезаписи не атомарным и уменьшая доступность целевой таблицы. Это может снизить требования к временному дисковом пространству для перезаписи.Так как установка usestagingtable=false операции рискует потерей данных или недоступностью, она не рекомендуется использовать для удаления целевой таблицы вручную. |
описание | No | нет | Описание таблицы. Будет задана команда "КОММЕНТАРИЙ SQL" и должна отображаться в большинстве средств запросов. См. также метаданные description , чтобы задать описания для отдельных столбцов. |
предварительные действия | No | нет | Разделенный ; список команд SQL, которые необходимо выполнить перед загрузкой COPY команды. Перед загрузкой новых данных может потребоваться выполнить некоторые DELETE команды или аналогичные команды. Если команда содержит %s , имя таблицы отформатировано перед выполнением (в случае использования промежуточной таблицы).Предупреждайте, что если эти команды завершаются сбоем, он обрабатывается как ошибка и возникает исключение. При использовании промежуточной таблицы изменения возвращаются и таблица резервного копирования восстанавливается, если предварительные действия завершаются сбоем. |
postactions | No | нет | Разделенный ; список команд SQL, выполняемых после успешного COPY выполнения при загрузке данных. При загрузке новых данных может потребоваться выполнить некоторые GRANT команды или аналогичные команды. Если команда содержит %s , имя таблицы отформатировано перед выполнением (в случае использования промежуточной таблицы).Предупреждайте, что если эти команды завершаются сбоем, он обрабатывается как ошибка и возникает исключение. При использовании промежуточной таблицы изменения удаляются, а таблица резервного копирования восстанавливается при сбое действий после выполнения. |
extracopyoptions | No | нет | Список дополнительных параметров для добавления к команде Redshift COPY при загрузке данных, напримерTRUNCATECOLUMNS или MAXERROR n (см . документы Redshift для других вариантов ).Так как эти параметры добавляются в конец COPY команды, можно использовать только те параметры, которые имеют смысл в конце команды, но это должно охватывать наиболее возможные варианты использования. |
tempformat | No | AVRO |
Формат, в котором необходимо сохранить временные файлы в S3 при записи в Redshift. По умолчанию — AVRO ; Остальные допустимые значения : CSV CSV GZIP csv и gzipped CSV соответственно.Redshift значительно быстрее при загрузке CSV-файлов, чем при загрузке файлов Avro, поэтому использование этого tempformat может повысить производительность при записи в Redshift. |
csvnullstring | No | @NULL@ |
Строковое значение для записи значений NULL при использовании tempformat CSV. Это должно быть значение, которое не отображается в фактических данных. |
csvseparator | No | , |
Разделитель, используемый при написании временных файлов с набором tempformat или CSV CSV GZIP . Это должен быть допустимый символ ASCII, например ", " или "\| ". |
csvignoreleadingwhitespace | No | true |
Если задано значение true, при записи удаляется начальное пробелы из значений во время записиtempformat имеет значение CSV или CSV GZIP . В противном случае пробелы сохраняются. |
csvignoretrailingwhitespace | No | true |
Если задано значение true, удаляет конечные пробелы из значений во время записи при записиtempformat имеет значение CSV или CSV GZIP . В противном случае пробел сохраняется. |
infer_timestamp_ntz_type | No | false |
Если true значения типа Redshift TIMESTAMP интерпретируются как TimestampNTZType (метка времени без часового пояса) во время чтения. В противном случае все метки времени интерпретируются независимо TimestampType от типа в базовой таблице Redshift. |
Дополнительные параметры конфигурации
Настройка максимального размера строковых столбцов
При создании таблиц Redshift поведение по умолчанию заключается в создании TEXT
столбцов строковых столбцов. Redshift сохраняет TEXT
столбцы, так что VARCHAR(256)
эти столбцы имеют максимальный размер 256 символов (источник).
Для поддержки больших столбцов можно использовать maxlength
поле метаданных столбца, чтобы указать максимальную длину отдельных строковых столбцов. Это также полезно для реализации оптимизации производительности экономии пространства путем объявления столбцов с меньшей максимальной длиной, чем по умолчанию.
Примечание.
Из-за ограничений в Spark API языка SQL и R не поддерживают изменение метаданных столбца.
Python
df = ... # the dataframe you'll want to write to Redshift
# Specify the custom width of each column
columnLengthMap = {
"language_code": 2,
"country_code": 2,
"url": 2083,
}
# Apply each column metadata customization
for (colName, length) in columnLengthMap.iteritems():
metadata = {'maxlength': length}
df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))
df.write \
.format("com.databricks.spark.redshift") \
.option("url", jdbcURL) \
.option("tempdir", s3TempDirectory) \
.option("dbtable", sessionTable) \
.save()
Scala
Ниже приведен пример обновления полей метаданных нескольких столбцов с помощью API Scala Spark:
import org.apache.spark.sql.types.MetadataBuilder
// Specify the custom width of each column
val columnLengthMap = Map(
"language_code" -> 2,
"country_code" -> 2,
"url" -> 2083
)
var df = ... // the dataframe you'll want to write to Redshift
// Apply each column metadata customization
columnLengthMap.foreach { case (colName, length) =>
val metadata = new MetadataBuilder().putLong("maxlength", length).build()
df = df.withColumn(colName, df(colName).as(colName, metadata))
}
df.write
.format("com.databricks.spark.redshift")
.option("url", jdbcURL)
.option("tempdir", s3TempDirectory)
.option("dbtable", sessionTable)
.save()
Установка типа настраиваемого столбца
Если необходимо вручную задать тип столбца, можно использовать метаданные redshift_type
столбца. Например, если требуется переопределить Spark SQL Schema -> Redshift SQL
сопоставление типов для назначения определяемого пользователем типа столбца, можно выполнить следующее:
Python
# Specify the custom type of each column
columnTypeMap = {
"language_code": "CHAR(2)",
"country_code": "CHAR(2)",
"url": "BPCHAR(111)",
}
df = ... # the dataframe you'll want to write to Redshift
# Apply each column metadata customization
for colName, colType in columnTypeMap.items():
metadata = {'redshift_type': colType}
df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))
Scala
import org.apache.spark.sql.types.MetadataBuilder
// Specify the custom type of each column
val columnTypeMap = Map(
"language_code" -> "CHAR(2)",
"country_code" -> "CHAR(2)",
"url" -> "BPCHAR(111)"
)
var df = ... // the dataframe you'll want to write to Redshift
// Apply each column metadata customization
columnTypeMap.foreach { case (colName, colType) =>
val metadata = new MetadataBuilder().putString("redshift_type", colType).build()
df = df.withColumn(colName, df(colName).as(colName, metadata))
}
Настройка кодировки столбцов
При создании таблицы используйте encoding
поле метаданных столбца, чтобы указать кодировку сжатия для каждого столбца (см . документы Amazon для доступных кодировок).
Настройка описаний для столбцов
Redshift позволяет столбцам содержать описания, которые должны отображаться в большинстве средств запросов (с помощью COMMENT
команды). Можно задать description
поле метаданных столбца, чтобы указать описание отдельных столбцов.
Отправка запроса в Redshift
Оптимизатор Spark отправляет следующие операторы в Redshift:
Filter
Project
Sort
Limit
Aggregation
Join
Внутри Project
и Filter
он поддерживает следующие выражения:
- Большинство логических операторов логики
- Сравнения
- Простые арифметические операции
- Приведение числовых или строковых типов
- Большинство строковых функций
- Скалярные вложенные запросы, если их можно полностью отправить в Redshift.
Примечание.
Этот pushdown не поддерживает выражения, работающие с датами и метками времени.
В ней Aggregation
поддерживаются следующие функции агрегирования:
AVG
COUNT
MAX
MIN
SUM
STDDEV_SAMP
STDDEV_POP
VAR_SAMP
VAR_POP
в сочетании с предложением DISTINCT
, где применимо.
В Join
ней поддерживаются следующие типы соединений:
INNER JOIN
LEFT OUTER JOIN
RIGHT OUTER JOIN
LEFT SEMI JOIN
LEFT ANTI JOIN
- Вложенные запросы, которые перезаписываются
Join
оптимизатором, напримерWHERE EXISTS
,WHERE NOT EXISTS
Примечание.
Принудительная отправка соединения не поддерживается FULL OUTER JOIN
.
Pushdown может оказаться наиболее полезным в запросах.LIMIT
Запрос, такой как SELECT * FROM large_redshift_table LIMIT 10
может занять очень много времени, так как вся таблица в первую очередь будет unLOADeded to S3 в качестве промежуточного результата. С помощью pushdown LIMIT
выполняется в Redshift. В запросах с агрегированием при отправке агрегирования в Redshift также помогает сократить объем передаваемых данных.
Отправка запросов в Redshift включена по умолчанию. Его можно отключить, установив для него значение spark.databricks.redshift.pushdown
false
. Даже при отключении Spark по-прежнему отправляет фильтры вниз и выполняет удаление столбцов в Redshift.
Установка драйвера Redshift
Для источника данных Redshift также требуется драйвер JDBC, совместимый с Redshift. Так как Redshift основан на системе базы данных PostgreSQL, можно использовать драйвер JDBC PostgreSQL, включенный в Databricks Runtime или рекомендуемый драйвер JDBC Redshift. Установка не требуется для использования драйвера JDBC PostgreSQL. Версия драйвера JDBC PostgreSQL, включенная в каждый выпуск Databricks Runtime, указана в заметках о выпуске Databricks Runtime.
Чтобы вручную установить драйвер JDBC Redshift, выполните следующие действия.
- Скачайте драйвер из Amazon.
- Отправьте драйвер в рабочую область Azure Databricks. См . библиотеки.
- Установите библиотеку в кластере.
Примечание.
Databricks рекомендует использовать последнюю версию драйвера Redshift JDBC. Версии драйвера JDBC Redshift ниже 1.2.41 имеют следующие ограничения:
- Версия 1.2.16 драйвера возвращает пустые данные при использовании
where
предложения в SQL-запросе. - Версии драйвера ниже 1.2.41 могут возвращать недопустимые результаты, так как допустимость null столбца неправильно отображается как "Not Nullable" вместо "Неизвестно".
Гарантии транзакций
В этом разделе описаны гарантии транзакций источника данных Redshift для Spark.
Общий фон для свойств Redshift и S3
Общие сведения о гарантиях транзакций Redshift см . в разделе "Управление параллельными операциями записи" в документации Redshift. В двух словах Redshift обеспечивает сериализуемую изоляцию в соответствии с документацией по команде Redshift BEGIN :
[хотя] вы можете использовать любой из четырех уровней изоляции транзакций, Amazon Redshift обрабатывает все уровни изоляции как сериализуемые.
В соответствии с документацией Redshift:
Amazon Redshift поддерживает поведение автоматической фиксации по умолчанию, в котором каждая отдельная команда SQL фиксирует по отдельности.
Таким образом, отдельные команды, такие как COPY
и UNLOAD
атомарные и транзакционные, в то время как явные BEGIN
и END
должны быть необходимы только для применения атомарности нескольких команд или запросов.
При чтении и записи в Redshift источник данных считывает и записывает данные в S3. Spark и Redshift создают секционированные выходные данные и хранят его в нескольких файлах в S3. Согласно документации по модели согласованности данных Amazon S3, операции с описанием сегментов S3 в конечном итоге согласованы, поэтому файлы должны перейти к специальной длине, чтобы избежать отсутствия или неполных данных из-за этого источника конечной согласованности.
Гарантии источника данных Redshift для Spark
Добавление к существующей таблице
При вставке строк в Redshift источник данных использует команду COPY и задает манифесты для защиты от определенных операций S3, согласованных в конечном итоге. В результате spark-redshift
добавление к существующим таблицам имеет те же атомарные и транзакционные свойства, что и обычные команды Redshift COPY
.
Создание новой таблицы (SaveMode.CreateIfNotExists
)
Создание новой таблицы — это двухэтапный процесс, состоящий из CREATE TABLE
команды, за которой следует команда COPY , чтобы добавить начальный набор строк. Обе операции выполняются в одной транзакции.
Перезапись существующей таблицы
По умолчанию источник данных использует транзакции для выполнения перезаписей, которые реализуются путем удаления целевой таблицы, создания пустой таблицы и добавления к ней строк.
Если задан false
устаревший usestagingtable
параметр, источник данных фиксирует DELETE TABLE
команду перед добавлением строк в новую таблицу, жертвуя атомарностью операции перезаписи, но уменьшая объем промежуточного пространства, необходимого Redshift во время перезаписи.
Таблица Redshift запроса
Запросы используют команду Redshift UNLOAD для выполнения запроса и сохранения результатов в S3 и использования манифестов для защиты от определенных операций S3, согласованных в конечном итоге. В результате запросы из источника данных Redshift для Spark должны иметь те же свойства согласованности, что и обычные запросы Redshift.
Распространенные проблемы и способы их решения
КонтейнерЫ S3 и кластер Redshift находятся в разных регионах AWS
По умолчанию копии S3 —> Redshift не работают, если контейнер S3 <и кластер Redshift находятся в разных регионах AWS.
Если вы пытаетесь прочитать таблицу Redshift, если контейнер S3 находится в другом регионе, может появиться ошибка, например:
ERROR: S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect.
Аналогичным образом попытка записи в Redshift с помощью контейнера S3 в другом регионе может привести к следующей ошибке:
error: Problem reading manifest file - S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect
Записывает: команда Redshift COPY поддерживает явную спецификацию региона контейнера S3, поэтому вы можете правильно выполнять запись в Redshift в этих случаях, добавив
region 'the-region-name'
вextracopyoptions
параметр. Например, с контейнером в регионе "Восточная часть США" (Вирджиния) и API Scala используйте:.option("extracopyoptions", "region 'us-east-1'")
Можно также использовать
awsregion
параметр:.option("awsregion", "us-east-1")
Считывает: команда Redshift UNLOAD также поддерживает явную спецификацию региона контейнера S3. Вы можете правильно выполнять операции чтения, добавив регион в
awsregion
параметр:.option("awsregion", "us-east-1")
Ошибка проверки подлинности при использовании пароля с специальными символами в URL-адресе JDBC
Если вы предоставляете имя пользователя и пароль в рамках URL-адреса JDBC, а пароль содержит специальные символы, например ;
, ?
или &
может появиться следующее исключение:
java.sql.SQLException: [Amazon](500310) Invalid operation: password authentication failed for user 'xyz'
Это вызвано тем, что специальные символы в имени пользователя или пароля не экранируются драйвером JDBC. Обязательно укажите имя пользователя и пароль с помощью соответствующих параметров user
Кадра данных и password
. Дополнительные сведения см. в разделе Параметры.
Длительный запрос Spark зависает на неопределенный срок, даже если выполняется соответствующая операция Redshift
Если вы считываете или записываете большие объемы данных из Redshift, запрос Spark может зависать на неопределенный срок, даже если страница мониторинга AWS Redshift показывает, что соответствующий LOAD
или UNLOAD
операция завершена и что кластер неактивен. Это вызвано подключением между Redshift и Spark время ожидания. Чтобы избежать этого, убедитесь, что tcpKeepAlive
флаг JDBC включен и TCPKeepAliveMinutes
имеет низкое значение (например, 1).
Дополнительные сведения см. в разделе Amazon Redshift JDBC Driver Configuration.
Метка времени с семантикой часового пояса
При чтении данных redshift TIMESTAMP
и TIMESTAMPTZ
типы данных сопоставляются с Spark TimestampType
, а значение преобразуется в координированное универсальное время (UTC) и сохраняется в формате метки времени UTC. Для Redshift TIMESTAMP
предполагается, что в локальном часовом поясе нет сведений о часовом поясе. При записи данных в таблицу Redshift spark TimestampType
сопоставляется с типом данных Redshift TIMESTAMP
.
Руководство по миграции
Теперь источник данных требует явно задать forward_spark_s3_credentials
перед отправкой учетных данных Spark S3 в Redshift. Это изменение не влияет на использование механизмов проверки подлинности или temporary_aws_*
их использованияaws_iam_role
. Однако если вы использовали старое поведение по умолчанию, то теперь необходимо явно установить forward_spark_s3_credentials
true
для продолжения использования предыдущего механизма проверки подлинности Redshift в S3. Обсуждение трех механизмов проверки подлинности и их компромиссов безопасности см. в разделе "Проверка подлинности в S3 и Redshift " этого документа.