Поделиться через


Запрос Amazon Redshift с помощью Azure Databricks

Таблицы можно считывать и записывать из Amazon Redshift с помощью Azure Databricks.

Внимание

Конфигурации, описанные в этой статье, являются экспериментальными. Экспериментальные функции предоставляются как есть и не поддерживаются Databricks через техническую поддержку клиентов. Чтобы получить полную поддержку федерации запросов, следует вместо этого использовать федерацию Lakehouse, которая позволяет пользователям Azure Databricks воспользоваться синтаксисом каталога Unity и средствами управления данными.

Источник данных Databricks Redshift использует Amazon S3 для эффективной передачи данных из Redshift и использует JDBC для автоматического активации соответствующих COPY и UNLOAD команд в Redshift.

Примечание.

В Databricks Runtime 11.3 LTS и более поздних версиях среда выполнения Databricks включает драйвер Redshift JDBC, доступный redshift с помощью ключевого слова для параметра форматирования. См . заметки о выпуске Databricks Runtime и совместимость версий драйверов, включенных в каждую среду выполнения Databricks. Предоставленные пользователем драйверы по-прежнему поддерживаются и имеют приоритет над пакетным драйвером JDBC.

В Databricks Runtime 10.4 LTS и ниже требуется ручная установка драйвера Redshift JDBC, а запросы должны использовать драйвер (com.databricks.spark.redshift) для формата. См. раздел установки драйвера Redshift.

Использование

В следующих примерах показано подключение к драйверу Redshift. Замените url значения параметров, если вы используете драйвер JDBC PostgreSQL.

После настройки учетных данных AWS можно использовать источник данных с API источника данных Spark в Python, SQL, R или Scala.

Внимание

Внешние расположения, определенные в каталоге Unity, не поддерживаются в качестве tempdir расположений.

Python

# Read data from a table using Databricks Runtime 10.4 LTS and below
df = (spark.read
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("forward_spark_s3_credentials", True)
  .load()
)

# Read data from a table using Databricks Runtime 11.3 LTS and above
df = (spark.read
  .format("redshift")
  .option("host", "hostname")
  .option("port", "port") # Optional - will use default port 5439 if not specified.
  .option("user", "username")
  .option("password", "password")
  .option("database", "database-name")
  .option("dbtable", "schema-name.table-name") # if schema-name is not specified, default to "public".
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("forward_spark_s3_credentials", True)
  .load()
)

# Read data from a query
df = (spark.read
  .format("redshift")
  .option("query", "select x, count(*) <your-table-name> group by x")
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("forward_spark_s3_credentials", True)
  .load()
)

# After you have applied transformations to the data, you can use
# the data source API to write the data back to another table

# Write back to a table
(df.write
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .mode("error")
  .save()
)

# Write back to a table using IAM Role based authentication
(df.write
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
  .mode("error")
  .save()
)

SQL

Чтение данных с помощью SQL в Databricks Runtime 10.4 LTS и ниже:

DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table
USING redshift
OPTIONS (
  dbtable '<table-name>',
  tempdir 's3a://<bucket>/<directory-path>',
  url 'jdbc:redshift://<database-host-url>',
  user '<username>',
  password '<password>',
  forward_spark_s3_credentials 'true'
);
SELECT * FROM redshift_table;

Чтение данных с помощью SQL в Databricks Runtime 11.3 LTS и выше:


DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table
USING redshift
OPTIONS (
  host '<hostname>',
  port '<port>', /* Optional - will use default port 5439 if not specified. *./
  user '<username>',
  password '<password>',
  database '<database-name>'
  dbtable '<schema-name>.<table-name>', /* if schema-name not provided, default to "public". */
  tempdir 's3a://<bucket>/<directory-path>',
  forward_spark_s3_credentials 'true'
);
SELECT * FROM redshift_table;

Запись данных с помощью SQL:

DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table_new
USING redshift
OPTIONS (
  dbtable '<new-table-name>',
  tempdir 's3a://<bucket>/<directory-path>',
  url 'jdbc:redshift://<database-host-url>',
  user '<username>',
  password '<password>',
  forward_spark_s3_credentials 'true'
) AS
SELECT * FROM table_name;

API SQL поддерживает только создание новых таблиц, а не перезапись или добавление.

R

Чтение данных с помощью R в Databricks Runtime 10.4 LTS и ниже:

df <- read.df(
   NULL,
   "com.databricks.spark.redshift",
   tempdir = "s3a://<your-bucket>/<your-directory-path>",
   dbtable = "<your-table-name>",
   url = "jdbc:redshift://<the-rest-of-the-connection-string>")

Чтение данных с помощью R в Databricks Runtime 11.3 LTS и выше:

df <- read.df(
  NULL,
  "redshift",
  host = "hostname",
  port = "port",
  user = "username",
  password = "password",
  database = "database-name",
  dbtable = "schema-name.table-name",
  tempdir = "s3a://<your-bucket>/<your-directory-path>",
  forward_spark_s3_credentials = "true",
  dbtable = "<your-table-name>")

Scala

// Read data from a table using Databricks Runtime 10.4 LTS and below
val df = spark.read
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("forward_spark_s3_credentials", True)
  .load()

// Read data from a table using Databricks Runtime 11.3 LTS and above
val df = spark.read
  .format("redshift")
  .option("host", "hostname")
  .option("port", "port") /* Optional - will use default port 5439 if not specified. */
  .option("user", "username")
  .option("password", "password")
  .option("database", "database-name")
  .option("dbtable", "schema-name.table-name") /* if schema-name is not specified, default to "public". */
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("forward_spark_s3_credentials", true)
  .load()

// Read data from a query
val df = spark.read
  .format("redshift")
  .option("query", "select x, count(*) <your-table-name> group by x")
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("forward_spark_s3_credentials", True)
  .load()

// After you have applied transformations to the data, you can use
// the data source API to write the data back to another table

// Write back to a table
df.write
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .mode("error")
  .save()

// Write back to a table using IAM Role based authentication
df.write
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
  .mode("error")
  .save()

Рекомендации по работе с Redshift

Выполнение запроса может извлекать большие объемы данных в S3. Если вы планируете выполнить несколько запросов к тем же данным в Redshift, Databricks рекомендует сохранить извлеченные данные с помощью Delta Lake.

Настройка

Проверка подлинности в S3 и Redshift

Источник данных включает несколько сетевых подключений, показанных на следующей схеме:

                            ┌───────┐
       ┌───────────────────>│  S3   │<─────────────────┐
       │    IAM or keys     └───────┘    IAM or keys   │
       │                        ^                      │
       │                        │ IAM or keys          │
       v                        v               ┌──────v────┐
┌────────────┐            ┌───────────┐         │┌──────────┴┐
│  Redshift  │            │  Spark    │         ││   Spark   │
│            │<──────────>│  Driver   │<────────>| Executors │
└────────────┘            └───────────┘          └───────────┘
               JDBC with                  Configured
               username /                     in
               password                     Spark
        (SSL enabled by default)

Источник данных считывает и записывает данные в S3 при передаче данных в Redshift и из нее. В результате требуется учетные данные AWS с доступом на чтение и запись в контейнер S3 (указанный tempdir с помощью параметра конфигурации).

Примечание.

Источник данных не очищает временные файлы, создаваемые в S3. В результате рекомендуется использовать выделенный временный контейнер S3 с конфигурацией жизненного цикла объектов, чтобы гарантировать автоматическое удаление временных файлов после указанного срока действия. Сведения о шифровании этих файлов см. в разделе "Шифрование " этого документа. Нельзя использовать внешнее расположение, определенное в каталоге tempdir Unity в качестве расположения.

В следующих разделах описаны параметры конфигурации проверки подлинности каждого подключения:

Драйвер Spark в Redshift

Драйвер Spark подключается к Redshift через JDBC с помощью имени пользователя и пароля. Redshift не поддерживает использование ролей IAM для проверки подлинности этого подключения. По умолчанию это подключение использует шифрование SSL; Дополнительные сведения см. в разделе "Шифрование".

Spark до S3

S3 выступает в качестве посредника для хранения массовых данных при чтении или записи в Redshift. Spark подключается к S3 с помощью интерфейсов Файловой системы Hadoop и непосредственно с помощью клиента S3 пакета SDK для Amazon Java.

Примечание.

Подключения DBFS нельзя использовать для настройки доступа к S3 для Redshift.

  • Задайте ключи в conf Hadoop: вы можете указать ключи AWS с помощью свойств конфигурации Hadoop. tempdir Если конфигурация указывает на файловую s3a:// систему, можно задать fs.s3a.access.key и fs.s3a.secret.key свойства в XML-файле конфигурации Hadoop или вызвать sc.hadoopConfiguration.set() настройку глобальной конфигурации Hadoop Spark. При использовании файловой системы можно указать устаревшие s3n:// ключи конфигурации, как показано в следующем примере.

    Scala

    Например, если вы используете файловую s3a систему, добавьте:

    sc.hadoopConfiguration.set("fs.s3a.access.key", "<your-access-key-id>")
    sc.hadoopConfiguration.set("fs.s3a.secret.key", "<your-secret-key>")
    

    Для устаревшей файловой s3n системы добавьте:

    sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "<your-access-key-id>")
    sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "<your-secret-key>")
    
    Python

    Следующая команда использует некоторые внутренние компоненты Spark, но должна работать со всеми версиями PySpark и вряд ли изменится в будущем:

      sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", "<your-access-key-id>")
      sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "<your-secret-key>")
    

Redshift до S3

forward_spark_s3_credentials Задайте параметр для true автоматической пересылки учетных данных ключа AWS, которые Spark использует для подключения к S3 через JDBC в Redshift. Запрос JDBC внедряет эти учетные данные, поэтому Databricks настоятельно рекомендует включить шифрование SSL подключения JDBC.

Шифрование

  • Защита JDBC: если в URL-адресе JDBC отсутствуют какие-либо параметры, связанные с SSL, источник данных по умолчанию включает шифрование SSL, а также проверяет, является ли сервер Redshift надежным (т sslmode=verify-full. е. ). Для этого сертификат сервера автоматически загружается с серверов Amazon при первом необходимости. В случае сбоя предварительно упакованный файл сертификата используется в качестве резервного варианта. Это относится как к драйверам Redshift, так и к JDBC PostgreSQL.

    Если с этой функцией возникают какие-либо проблемы или вы просто хотите отключить SSL, можно вызвать .option("autoenablessl", "false") или DataFrameReader DataFrameWriter.

    Если вы хотите указать настраиваемые параметры, связанные с SSL, вы можете следовать инструкциям в документации Redshift: использование SSL-сертификатов и сертификатов сервера в Java и JDBC Driver Configuration Options Any SSL-связанных параметров, присутствующих в JDBC url , используемых с источником данных, имеет приоритет (то есть автоматическая настройка не будет активироваться).

  • Шифрование данных ВЫГРУЗКИ, хранящихся в S3 (данные, хранящиеся при чтении из Redshift): согласно документации Redshift по выгрузке данных в S3, "ВЫГРУЗ автоматически шифрует файлы данных с помощью шифрования на стороне сервера Amazon S3 (SSE-S3).

    Redshift также поддерживает шифрование на стороне клиента с помощью пользовательского ключа (см. раздел " Выгрузка зашифрованных файлов данных"), но источник данных не имеет возможности указать необходимый симметричный ключ.

  • Шифрование данных COPY, хранящихся в S3 (данные, хранящиеся при записи в Redshift): в соответствии с документацией Redshift по загрузке зашифрованных файлов данных из Amazon S3:

С помощью команды можно COPY загрузить файлы данных, отправленные в Amazon S3, с помощью шифрования на стороне сервера с ключами шифрования, управляемыми AWS (SSE-S3 или SSE-KMS), шифрование на стороне клиента или оба. COPY не поддерживает шифрование на стороне сервера Amazon S3 с ключом, предоставленным клиентом (SSE-C).

Параметры

Карта параметров или ПАРАМЕТРЫ, предоставляемые в Spark SQL, поддерживают следующие параметры:

Параметр Обязательное поле По умолчанию. Description
dbtable Да, если запрос не указан. нет Таблица для создания или чтения из Redshift. Этот параметр требуется при сохранении данных обратно в Redshift.
query Да, если не указан параметр dbtable. нет Запрос для чтения из Redshift.
Пользователь No нет Имя пользователя Redshift. Необходимо использовать в тандеме с параметром пароля. Можно использовать только в том случае, если пользователь и пароль не передаются в URL-адресе, передача обоих приведет к ошибке. Используйте этот параметр, если имя пользователя содержит специальные символы, которые необходимо экранировать.
password No нет Пароль Redshift. Используется в сочетании с параметром user. Можно использовать только в том случае, если пользователь и пароль не передаются в URL-адресе; Передача обоих приведет к ошибке. Используйте этот параметр, если пароль содержит специальные символы, которые необходимо экранировать.
URL-адрес Да Не допускается URL-адрес JDBC в формате
jdbc:subprotocol://<host>:<port>/database?user=<username>&password=<password>

subprotocol может быть postgresql или redshiftв зависимости от загруженного драйвера JDBC. Один драйвер, совместимый с Redshift, должен находиться в пути класса и соответствовать этому URL-адресу. host и port должен указывать на главный узел Redshift, поэтому группы безопасности и /или VPC должны быть настроены, чтобы разрешить доступ из приложения драйвера.
database определяет имя user базы данных Redshift и password являются учетными данными для доступа к базе данных, которая должна быть внедрена в этот URL-адрес для JDBC, а учетная запись пользователя должна иметь необходимые привилегии для указанной таблицы.
search_path No нет Задайте путь поиска схемы в Redshift. Будет задано с помощью SET search_path to команды. Должен быть разделен запятыми список имен схем для поиска таблиц в. См . документацию по Redshift search_path.
aws_iam_role Только при использовании ролей IAM для авторизации. нет Полный указанный ARN роли операций IAM Redshift COPY/UNLOAD, присоединенной к кластеру Redshift, например arn:aws:iam::123456789000:role/<redshift-iam-role>.
forward_spark_s3_credentials No false Если trueисточник данных автоматически обнаруживает учетные данные, которые Spark использует для подключения к S3, и перенаправит эти учетные данные в Redshift через JDBC. Эти учетные данные отправляются в рамках запроса JDBC, поэтому настоятельно рекомендуется включить шифрование SSL подключения JDBC при использовании этого параметра.
temporary_aws_access_key_id No нет Ключ доступа AWS должен иметь разрешения на запись в контейнер S3.
temporary_aws_secret_access_key No нет Ключ секретного доступа AWS, соответствующий предоставленному ключу доступа.
temporary_aws_session_token No нет Токен сеанса AWS, соответствующий предоставленному ключу доступа.
tempdir Да Не допускается Записываемое расположение в Amazon S3, которое будет использоваться для выгрузки данных при чтении и загрузке данных Avro в Redshift при записи. Если вы используете источник данных Redshift для Spark в рамках обычного конвейера ETL, это может быть полезно, чтобы задать политику жизненного цикла в контейнере и использовать ее в качестве временного расположения для этих данных.

Нельзя использовать внешние расположения, определенные в каталоге Unity в качестве tempdir расположений.
jdbcdriver No Определяется подпротоколом URL-адреса JDBC. Имя класса используемого драйвера JDBC. Этот класс должен находиться в подкаталоге классов. В большинстве случаев не следует указывать этот параметр, так как соответствующее имя класса драйвера должно автоматически определяться подпротоколом URL-адреса JDBC.
diststyle No EVEN Стиль распределения Redshift, используемый при создании таблицы. Может быть одним из EVENних KEY или ALL (см. документы Redshift). При использовании KEYнеобходимо также задать ключ распространения с параметром distkey.
distkey Нет, если не используется DISTSTYLE KEY нет Имя столбца в таблице, используемого в качестве ключа распространения при создании таблицы.
sortkeyspec No нет Полное определение ключа сортировки Redshift. Вот некоторые примеры.

- SORTKEY(my_sort_column)
- COMPOUND SORTKEY(sort_col_1, sort_col_2)
- INTERLEAVED SORTKEY(sort_col_1, sort_col_2)
usestagingtable (не рекомендуется) No true Установка этого нерекомендуемого параметра приводит к тому, что false целевая таблица операции перезаписи удаляется сразу же в начале записи, что делает операцию перезаписи не атомарным и уменьшая доступность целевой таблицы. Это может снизить требования к временному дисковом пространству для перезаписи.

Так как установка usestagingtable=false операции рискует потерей данных или недоступностью, она не рекомендуется использовать для удаления целевой таблицы вручную.
описание No нет Описание таблицы. Будет задана команда "КОММЕНТАРИЙ SQL" и должна отображаться в большинстве средств запросов. См. также метаданные description , чтобы задать описания для отдельных столбцов.
предварительные действия No нет Разделенный ; список команд SQL, которые необходимо выполнить перед загрузкой COPY команды. Перед загрузкой новых данных может потребоваться выполнить некоторые DELETE команды или аналогичные команды. Если команда содержит %s, имя таблицы отформатировано перед выполнением (в случае использования промежуточной таблицы).

Предупреждайте, что если эти команды завершаются сбоем, он обрабатывается как ошибка и возникает исключение. При использовании промежуточной таблицы изменения возвращаются и таблица резервного копирования восстанавливается, если предварительные действия завершаются сбоем.
postactions No нет Разделенный ; список команд SQL, выполняемых после успешного COPY выполнения при загрузке данных. При загрузке новых данных может потребоваться выполнить некоторые GRANT команды или аналогичные команды. Если команда содержит %s, имя таблицы отформатировано перед выполнением (в случае использования промежуточной таблицы).

Предупреждайте, что если эти команды завершаются сбоем, он обрабатывается как ошибка и возникает исключение. При использовании промежуточной таблицы изменения удаляются, а таблица резервного копирования восстанавливается при сбое действий после выполнения.
extracopyoptions No нет Список дополнительных параметров для добавления к команде Redshift COPY при загрузке данных, например
TRUNCATECOLUMNS или MAXERROR n (см . документы Redshift для других вариантов ).

Так как эти параметры добавляются в конец COPY команды, можно использовать только те параметры, которые имеют смысл в конце команды, но это должно охватывать наиболее возможные варианты использования.
tempformat No AVRO Формат, в котором необходимо сохранить временные файлы в S3 при записи в Redshift. По умолчанию —
AVRO; Остальные допустимые значения : CSV CSV GZIP csv и gzipped CSV соответственно.

Redshift значительно быстрее при загрузке CSV-файлов, чем при загрузке файлов Avro, поэтому использование этого tempformat может повысить производительность при записи в Redshift.
csvnullstring No @NULL@ Строковое значение для записи значений NULL при использовании tempformat CSV. Это должно быть значение, которое не отображается в фактических данных.
csvseparator No , Разделитель, используемый при написании временных файлов с набором tempformat или CSV
CSV GZIP. Это должен быть допустимый символ ASCII, например "," или "\|".
csvignoreleadingwhitespace No true Если задано значение true, при записи удаляется начальное пробелы из значений во время записи
tempformat имеет значение CSV или CSV GZIP. В противном случае пробелы сохраняются.
csvignoretrailingwhitespace No true Если задано значение true, удаляет конечные пробелы из значений во время записи при записи
tempformat имеет значение CSV или CSV GZIP. В противном случае пробел сохраняется.
infer_timestamp_ntz_type No false Если trueзначения типа Redshift TIMESTAMP интерпретируются как TimestampNTZType (метка времени без часового пояса) во время чтения. В противном случае все метки времени интерпретируются независимо TimestampType от типа в базовой таблице Redshift.

Дополнительные параметры конфигурации

Настройка максимального размера строковых столбцов

При создании таблиц Redshift поведение по умолчанию заключается в создании TEXT столбцов строковых столбцов. Redshift сохраняет TEXT столбцы, так что VARCHAR(256)эти столбцы имеют максимальный размер 256 символов (источник).

Для поддержки больших столбцов можно использовать maxlength поле метаданных столбца, чтобы указать максимальную длину отдельных строковых столбцов. Это также полезно для реализации оптимизации производительности экономии пространства путем объявления столбцов с меньшей максимальной длиной, чем по умолчанию.

Примечание.

Из-за ограничений в Spark API языка SQL и R не поддерживают изменение метаданных столбца.

Python

df = ... # the dataframe you'll want to write to Redshift

# Specify the custom width of each column
columnLengthMap = {
  "language_code": 2,
  "country_code": 2,
  "url": 2083,
}

# Apply each column metadata customization
for (colName, length) in columnLengthMap.iteritems():
  metadata = {'maxlength': length}
  df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))

df.write \
  .format("com.databricks.spark.redshift") \
  .option("url", jdbcURL) \
  .option("tempdir", s3TempDirectory) \
  .option("dbtable", sessionTable) \
  .save()

Scala

Ниже приведен пример обновления полей метаданных нескольких столбцов с помощью API Scala Spark:

import org.apache.spark.sql.types.MetadataBuilder

// Specify the custom width of each column
val columnLengthMap = Map(
  "language_code" -> 2,
  "country_code" -> 2,
  "url" -> 2083
)

var df = ... // the dataframe you'll want to write to Redshift

// Apply each column metadata customization
columnLengthMap.foreach { case (colName, length) =>
  val metadata = new MetadataBuilder().putLong("maxlength", length).build()
  df = df.withColumn(colName, df(colName).as(colName, metadata))
}

df.write
  .format("com.databricks.spark.redshift")
  .option("url", jdbcURL)
  .option("tempdir", s3TempDirectory)
  .option("dbtable", sessionTable)
.save()

Установка типа настраиваемого столбца

Если необходимо вручную задать тип столбца, можно использовать метаданные redshift_type столбца. Например, если требуется переопределить Spark SQL Schema -> Redshift SQL сопоставление типов для назначения определяемого пользователем типа столбца, можно выполнить следующее:

Python

# Specify the custom type of each column
columnTypeMap = {
  "language_code": "CHAR(2)",
  "country_code": "CHAR(2)",
  "url": "BPCHAR(111)",
}

df = ... # the dataframe you'll want to write to Redshift

# Apply each column metadata customization
for colName, colType in columnTypeMap.items():
  metadata = {'redshift_type': colType}
  df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))

Scala

import org.apache.spark.sql.types.MetadataBuilder

// Specify the custom type of each column
val columnTypeMap = Map(
  "language_code" -> "CHAR(2)",
  "country_code" -> "CHAR(2)",
  "url" -> "BPCHAR(111)"
)

var df = ... // the dataframe you'll want to write to Redshift

// Apply each column metadata customization
columnTypeMap.foreach { case (colName, colType) =>
  val metadata = new MetadataBuilder().putString("redshift_type", colType).build()
  df = df.withColumn(colName, df(colName).as(colName, metadata))
}

Настройка кодировки столбцов

При создании таблицы используйте encoding поле метаданных столбца, чтобы указать кодировку сжатия для каждого столбца (см . документы Amazon для доступных кодировок).

Настройка описаний для столбцов

Redshift позволяет столбцам содержать описания, которые должны отображаться в большинстве средств запросов (с помощью COMMENT команды). Можно задать description поле метаданных столбца, чтобы указать описание отдельных столбцов.

Отправка запроса в Redshift

Оптимизатор Spark отправляет следующие операторы в Redshift:

  • Filter
  • Project
  • Sort
  • Limit
  • Aggregation
  • Join

Внутри Project и Filterон поддерживает следующие выражения:

  • Большинство логических операторов логики
  • Сравнения
  • Простые арифметические операции
  • Приведение числовых или строковых типов
  • Большинство строковых функций
  • Скалярные вложенные запросы, если их можно полностью отправить в Redshift.

Примечание.

Этот pushdown не поддерживает выражения, работающие с датами и метками времени.

В ней Aggregationподдерживаются следующие функции агрегирования:

  • AVG
  • COUNT
  • MAX
  • MIN
  • SUM
  • STDDEV_SAMP
  • STDDEV_POP
  • VAR_SAMP
  • VAR_POP

в сочетании с предложением DISTINCT , где применимо.

В Joinней поддерживаются следующие типы соединений:

  • INNER JOIN
  • LEFT OUTER JOIN
  • RIGHT OUTER JOIN
  • LEFT SEMI JOIN
  • LEFT ANTI JOIN
  • Вложенные запросы, которые перезаписываются Join оптимизатором, например WHERE EXISTS, WHERE NOT EXISTS

Примечание.

Принудительная отправка соединения не поддерживается FULL OUTER JOIN.

Pushdown может оказаться наиболее полезным в запросах.LIMIT Запрос, такой как SELECT * FROM large_redshift_table LIMIT 10 может занять очень много времени, так как вся таблица в первую очередь будет unLOADeded to S3 в качестве промежуточного результата. С помощью pushdown LIMIT выполняется в Redshift. В запросах с агрегированием при отправке агрегирования в Redshift также помогает сократить объем передаваемых данных.

Отправка запросов в Redshift включена по умолчанию. Его можно отключить, установив для него значение spark.databricks.redshift.pushdown false. Даже при отключении Spark по-прежнему отправляет фильтры вниз и выполняет удаление столбцов в Redshift.

Установка драйвера Redshift

Для источника данных Redshift также требуется драйвер JDBC, совместимый с Redshift. Так как Redshift основан на системе базы данных PostgreSQL, можно использовать драйвер JDBC PostgreSQL, включенный в Databricks Runtime или рекомендуемый драйвер JDBC Redshift. Установка не требуется для использования драйвера JDBC PostgreSQL. Версия драйвера JDBC PostgreSQL, включенная в каждый выпуск Databricks Runtime, указана в заметках о выпуске Databricks Runtime.

Чтобы вручную установить драйвер JDBC Redshift, выполните следующие действия.

  1. Скачайте драйвер из Amazon.
  2. Отправьте драйвер в рабочую область Azure Databricks. См . библиотеки.
  3. Установите библиотеку в кластере.

Примечание.

Databricks рекомендует использовать последнюю версию драйвера Redshift JDBC. Версии драйвера JDBC Redshift ниже 1.2.41 имеют следующие ограничения:

  • Версия 1.2.16 драйвера возвращает пустые данные при использовании where предложения в SQL-запросе.
  • Версии драйвера ниже 1.2.41 могут возвращать недопустимые результаты, так как допустимость null столбца неправильно отображается как "Not Nullable" вместо "Неизвестно".

Гарантии транзакций

В этом разделе описаны гарантии транзакций источника данных Redshift для Spark.

Общий фон для свойств Redshift и S3

Общие сведения о гарантиях транзакций Redshift см . в разделе "Управление параллельными операциями записи" в документации Redshift. В двух словах Redshift обеспечивает сериализуемую изоляцию в соответствии с документацией по команде Redshift BEGIN :

[хотя] вы можете использовать любой из четырех уровней изоляции транзакций, Amazon Redshift обрабатывает все уровни изоляции как сериализуемые.

В соответствии с документацией Redshift:

Amazon Redshift поддерживает поведение автоматической фиксации по умолчанию, в котором каждая отдельная команда SQL фиксирует по отдельности.

Таким образом, отдельные команды, такие как COPY и UNLOAD атомарные и транзакционные, в то время как явные BEGIN и END должны быть необходимы только для применения атомарности нескольких команд или запросов.

При чтении и записи в Redshift источник данных считывает и записывает данные в S3. Spark и Redshift создают секционированные выходные данные и хранят его в нескольких файлах в S3. Согласно документации по модели согласованности данных Amazon S3, операции с описанием сегментов S3 в конечном итоге согласованы, поэтому файлы должны перейти к специальной длине, чтобы избежать отсутствия или неполных данных из-за этого источника конечной согласованности.

Гарантии источника данных Redshift для Spark

Добавление к существующей таблице

При вставке строк в Redshift источник данных использует команду COPY и задает манифесты для защиты от определенных операций S3, согласованных в конечном итоге. В результате spark-redshift добавление к существующим таблицам имеет те же атомарные и транзакционные свойства, что и обычные команды Redshift COPY .

Создание новой таблицы (SaveMode.CreateIfNotExists)

Создание новой таблицы — это двухэтапный процесс, состоящий из CREATE TABLE команды, за которой следует команда COPY , чтобы добавить начальный набор строк. Обе операции выполняются в одной транзакции.

Перезапись существующей таблицы

По умолчанию источник данных использует транзакции для выполнения перезаписей, которые реализуются путем удаления целевой таблицы, создания пустой таблицы и добавления к ней строк.

Если задан falseустаревший usestagingtable параметр, источник данных фиксирует DELETE TABLE команду перед добавлением строк в новую таблицу, жертвуя атомарностью операции перезаписи, но уменьшая объем промежуточного пространства, необходимого Redshift во время перезаписи.

Таблица Redshift запроса

Запросы используют команду Redshift UNLOAD для выполнения запроса и сохранения результатов в S3 и использования манифестов для защиты от определенных операций S3, согласованных в конечном итоге. В результате запросы из источника данных Redshift для Spark должны иметь те же свойства согласованности, что и обычные запросы Redshift.

Распространенные проблемы и способы их решения

КонтейнерЫ S3 и кластер Redshift находятся в разных регионах AWS

По умолчанию копии S3 —> Redshift не работают, если контейнер S3 <и кластер Redshift находятся в разных регионах AWS.

Если вы пытаетесь прочитать таблицу Redshift, если контейнер S3 находится в другом регионе, может появиться ошибка, например:

ERROR: S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect.

Аналогичным образом попытка записи в Redshift с помощью контейнера S3 в другом регионе может привести к следующей ошибке:

error:  Problem reading manifest file - S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect

Ошибка проверки подлинности при использовании пароля с специальными символами в URL-адресе JDBC

Если вы предоставляете имя пользователя и пароль в рамках URL-адреса JDBC, а пароль содержит специальные символы, например ;, ?или &может появиться следующее исключение:

java.sql.SQLException: [Amazon](500310) Invalid operation: password authentication failed for user 'xyz'

Это вызвано тем, что специальные символы в имени пользователя или пароля не экранируются драйвером JDBC. Обязательно укажите имя пользователя и пароль с помощью соответствующих параметров user Кадра данных и password. Дополнительные сведения см. в разделе Параметры.

Длительный запрос Spark зависает на неопределенный срок, даже если выполняется соответствующая операция Redshift

Если вы считываете или записываете большие объемы данных из Redshift, запрос Spark может зависать на неопределенный срок, даже если страница мониторинга AWS Redshift показывает, что соответствующий LOAD или UNLOAD операция завершена и что кластер неактивен. Это вызвано подключением между Redshift и Spark время ожидания. Чтобы избежать этого, убедитесь, что tcpKeepAlive флаг JDBC включен и TCPKeepAliveMinutes имеет низкое значение (например, 1).

Дополнительные сведения см. в разделе Amazon Redshift JDBC Driver Configuration.

Метка времени с семантикой часового пояса

При чтении данных redshift TIMESTAMP и TIMESTAMPTZ типы данных сопоставляются с Spark TimestampType, а значение преобразуется в координированное универсальное время (UTC) и сохраняется в формате метки времени UTC. Для Redshift TIMESTAMPпредполагается, что в локальном часовом поясе нет сведений о часовом поясе. При записи данных в таблицу Redshift spark TimestampType сопоставляется с типом данных Redshift TIMESTAMP .

Руководство по миграции

Теперь источник данных требует явно задать forward_spark_s3_credentials перед отправкой учетных данных Spark S3 в Redshift. Это изменение не влияет на использование механизмов проверки подлинности или temporary_aws_* их использованияaws_iam_role. Однако если вы использовали старое поведение по умолчанию, то теперь необходимо явно установить forward_spark_s3_credentials true для продолжения использования предыдущего механизма проверки подлинности Redshift в S3. Обсуждение трех механизмов проверки подлинности и их компромиссов безопасности см. в разделе "Проверка подлинности в S3 и Redshift " этого документа.