Compartilhar via


Consultar o Amazon Redshift usando o Azure Databricks

Você pode ler e gravar tabelas do Amazon Redshift com o Azure Databricks.

Importante

As configurações descritas nesse artigo são Experimentais. Recursos experimentais são fornecidos no estado em que se encontram e não têm suporte do Databricks por meio do suporte técnico para clientes. Para obter suporte completo à federação de consultas, você deve usar a Federação do Lakehouse, que permite que os usuários do Azure Databricks aproveitem a sintaxe do Catálogo do Unity e as ferramentas de governança de dados.

A fonte de dados do Databricks Redshift usa o Amazon S3 para transferir dados de forma eficiente para dentro e fora do Redshift e usa o JDBC para disparar automaticamente os comandos COPY e UNLOAD apropriados no Redshift.

Observação

No Databricks Runtime 11.3 LTS e superior, o Databricks Runtime inclui o driver JDBC do Redshift, acessível por meio da palavra-chave redshift para a opção de formato. Consulte Versões de notas de versão do Databricks Runtime e compatibilidade para obter versões de driver incluídas em cada Databricks Runtime. Os drivers fornecidos pelo usuário ainda têm suporte e precedência sobre o driver JDBC agrupado.

No Databricks Runtime 10.4 LTS e inferior, a instalação manual do driver JDBC do Redshift é necessária e as consultas devem usar o driver (com.databricks.spark.redshift) para o formato. Consulte Instalação do driver do Redshift.

Uso

Os exemplos a seguir demonstram a conexão com o driver do Redshift. Substitua os valores do parâmetro url se estiver usando o driver JDBC do PostgreSQL.

Depois de configurar as credenciais do AWS, você pode usar a fonte de dados com a API de fonte de dados do Spark no Python, SQL, R ou Scala.

Importante

Os locais externos definidos no Unity Catalog não são suportados como locais tempdir.

Python

# Read data from a table using Databricks Runtime 10.4 LTS and below
df = (spark.read
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("forward_spark_s3_credentials", True)
  .load()
)

# Read data from a table using Databricks Runtime 11.3 LTS and above
df = (spark.read
  .format("redshift")
  .option("host", "hostname")
  .option("port", "port") # Optional - will use default port 5439 if not specified.
  .option("user", "username")
  .option("password", "password")
  .option("database", "database-name")
  .option("dbtable", "schema-name.table-name") # if schema-name is not specified, default to "public".
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("forward_spark_s3_credentials", True)
  .load()
)

# Read data from a query
df = (spark.read
  .format("redshift")
  .option("query", "select x, count(*) <your-table-name> group by x")
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("forward_spark_s3_credentials", True)
  .load()
)

# After you have applied transformations to the data, you can use
# the data source API to write the data back to another table

# Write back to a table
(df.write
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .mode("error")
  .save()
)

# Write back to a table using IAM Role based authentication
(df.write
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
  .mode("error")
  .save()
)

SQL

Leia dados usando o SQL no Databricks Runtime 10.4 LTS e inferior:

DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table
USING redshift
OPTIONS (
  dbtable '<table-name>',
  tempdir 's3a://<bucket>/<directory-path>',
  url 'jdbc:redshift://<database-host-url>',
  user '<username>',
  password '<password>',
  forward_spark_s3_credentials 'true'
);
SELECT * FROM redshift_table;

Leia dados usando o SQL no Databricks Runtime 11.3 LTS e superior:


DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table
USING redshift
OPTIONS (
  host '<hostname>',
  port '<port>', /* Optional - will use default port 5439 if not specified. *./
  user '<username>',
  password '<password>',
  database '<database-name>'
  dbtable '<schema-name>.<table-name>', /* if schema-name not provided, default to "public". */
  tempdir 's3a://<bucket>/<directory-path>',
  forward_spark_s3_credentials 'true'
);
SELECT * FROM redshift_table;

Gravar os dados usando o SQL:

DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table_new
USING redshift
OPTIONS (
  dbtable '<new-table-name>',
  tempdir 's3a://<bucket>/<directory-path>',
  url 'jdbc:redshift://<database-host-url>',
  user '<username>',
  password '<password>',
  forward_spark_s3_credentials 'true'
) AS
SELECT * FROM table_name;

A API do SQL dá suporte apenas à criação de novas tabelas e não à substituição ou acréscimo.

R

Leia dados usando o R no Databricks Runtime 10.4 LTS e inferior:

df <- read.df(
   NULL,
   "com.databricks.spark.redshift",
   tempdir = "s3a://<your-bucket>/<your-directory-path>",
   dbtable = "<your-table-name>",
   url = "jdbc:redshift://<the-rest-of-the-connection-string>")

Leia dados usando o R no Databricks Runtime 11.3 LTS e superior:

df <- read.df(
  NULL,
  "redshift",
  host = "hostname",
  port = "port",
  user = "username",
  password = "password",
  database = "database-name",
  dbtable = "schema-name.table-name",
  tempdir = "s3a://<your-bucket>/<your-directory-path>",
  forward_spark_s3_credentials = "true",
  dbtable = "<your-table-name>")

Scala

// Read data from a table using Databricks Runtime 10.4 LTS and below
val df = spark.read
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("forward_spark_s3_credentials", True)
  .load()

// Read data from a table using Databricks Runtime 11.3 LTS and above
val df = spark.read
  .format("redshift")
  .option("host", "hostname")
  .option("port", "port") /* Optional - will use default port 5439 if not specified. */
  .option("user", "username")
  .option("password", "password")
  .option("database", "database-name")
  .option("dbtable", "schema-name.table-name") /* if schema-name is not specified, default to "public". */
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("forward_spark_s3_credentials", true)
  .load()

// Read data from a query
val df = spark.read
  .format("redshift")
  .option("query", "select x, count(*) <your-table-name> group by x")
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("forward_spark_s3_credentials", True)
  .load()

// After you have applied transformations to the data, you can use
// the data source API to write the data back to another table

// Write back to a table
df.write
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .mode("error")
  .save()

// Write back to a table using IAM Role based authentication
df.write
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
  .mode("error")
  .save()

Recomendações para trabalhar com o Redshift

A execução da consulta pode extrair grandes quantidades de dados para S3. Se você planeja executar várias consultas em relação aos mesmos dados no Redshift, o Databricks recomenda salvar os dados extraídos usando o Delta Lake.

Configuração

Autenticação no S3 e no Redshift

A fonte de dados envolve várias conexões de rede, ilustradas no diagrama a seguir:

                            ┌───────┐
       ┌───────────────────>│  S3   │<─────────────────┐
       │    IAM or keys     └───────┘    IAM or keys   │
       │                        ^                      │
       │                        │ IAM or keys          │
       v                        v               ┌──────v────┐
┌────────────┐            ┌───────────┐         │┌──────────┴┐
│  Redshift  │            │  Spark    │         ││   Spark   │
│            │<──────────>│  Driver   │<────────>| Executors │
└────────────┘            └───────────┘          └───────────┘
               JDBC with                  Configured
               username /                     in
               password                     Spark
        (SSL enabled by default)

A fonte de dados lê e grava dados no S3 ao transferir dados de/para o Redshift. Como resultado, ele requer as credenciais do AWS com acesso de leitura e gravação a um bucket S3 (especificado usando o parâmetro tempdir de configuração).

Observação

A fonte de dados não limpa os arquivos temporários que cria no S3. Como resultado, recomendamos que você use um bucket S3 temporário dedicado com uma configuração de ciclo de vida do objeto para garantir que os arquivos temporários sejam excluídos automaticamente após um período de expiração especificado. Consulte a seção Criptografia deste documento para obter uma discussão sobre como criptografar esses arquivos. Você não pode usar um local externo definido no Unity Catalog como um local tempdir.

As seções a seguir descrevem as opções de configuração de autenticação de cada conexão:

Driver do Spark para Redshift

O driver do Spark se conecta ao Redshift via JDBC usando um nome de usuário e senha. O Redshift não dá suporte ao uso de funções do IAM para autenticar essa conexão. Por padrão, essa conexão usa criptografia SSL; para obter mais detalhes, consulte Criptografia.

Spark para S3

O S3 funciona como um intermediário para armazenar dados em massa durante a leitura ou a gravação no Redshift. O Spark se conecta ao S3 usando as interfaces do Hadoop FileSystem e usando diretamente o cliente S3 do SDK do Java da Amazon.

Observação

Você não pode usar montagens DBFS para configurar o acesso ao S3 para Redshift.

  • Definir as chaves na configuração do Hadoop: especifique chaves AWS nas propriedades de configuração do Hadoop. Se a configuração tempdir apontar para um sistema de arquivos s3a://, você poderá definir as propriedades fs.s3a.access.key e fs.s3a.secret.key em um arquivo de configuração XML do Hadoop ou chamar sc.hadoopConfiguration.set() para configurar a configuração global do Hadoop do Spark. Se você usar um sistema de arquivos s3n://, poderá fornecer as chaves de configuração herdadas, conforme mostrado no exemplo a seguir.

    Scala

    Por exemplo, se você estiver usando o sistema de arquivos s3a, adicione:

    sc.hadoopConfiguration.set("fs.s3a.access.key", "<your-access-key-id>")
    sc.hadoopConfiguration.set("fs.s3a.secret.key", "<your-secret-key>")
    

    Para o sistema de arquivos herdado s3n, adicione:

    sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "<your-access-key-id>")
    sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "<your-secret-key>")
    
    Python

    Embora o seguinte comando dependa de alguns elementos internos do Spark, ele deve funcionar com todas as versões do PySpark e provavelmente será desfeito ou alterado no futuro:

      sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", "<your-access-key-id>")
      sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "<your-secret-key>")
    

Redshift para S3

Defina a opção forward_spark_s3_credentials como true para encaminhar automaticamente as credenciais de chave do AWS que o Spark está usando para se conectar ao S3 via JDBC para o Redshift. A consulta JDBC insere essas credenciais, portanto, o Databricks recomenda fortemente que você habilite a criptografia SSL da conexão JDBC.

Criptografia

  • Proteção do JDBC: a menos que todas as configurações relacionadas ao SSL estejam presentes na URL JDBC, a fonte de dados habilita por padrão a criptografia SSL e também verifica se o servidor Redshift é confiável (ou seja, sslmode=verify-full). Para isso, um certificado de servidor é baixado automaticamente dos servidores da Amazon na primeira vez que é necessário. Caso isso falhe, um arquivo de certificado pré-agrupado é usado como um fallback. Isso vale para os drivers JDBC do Redshift e do PostgreSQL.

    Caso haja algum problema com esse recurso ou você simplesmente queira desabilitar o SSL, você pode chamar .option("autoenablessl", "false") no DataFrameReader ou DataFrameWriter.

    Se você quiser especificar configurações personalizadas relacionadas ao SSL, siga as instruções na documentação do Redshift: Usando certificados SSL e do servidor no Java e Opções de configuração de driver JDBC Quaisquer opções relacionadas a SSL presentes no JDBC url usadas com a fonte de dados têm precedência (ou seja, a configuração automática não será disparada).

  • Criptografia de dados UNLOAD armazenados em S3 (dados armazenados ao ler do Redshift): de acordo com a documentação do Redshift sobre o Descarregamento de dados para S3, "UNLOAD criptografa automaticamente arquivos de dados usando a criptografia do lado do servidor do Amazon S3 (SSE-S3)."

    O Redshift também dá suporte à criptografia do lado do cliente com uma chave personalizada (veja: Descarregar arquivos de dados criptografados), mas a fonte de dados não tem a capacidade de especificar a chave simétrica necessária.

  • Criptografar dados COPY armazenados no S3 (dados armazenados ao gravar no Redshift): de acordo com a documentação do Redshift sobre o Carregamento de arquivos de dados criptografados do Amazon S3:

Você pode usar o comando COPY para carregar arquivos de dados carregados no Amazon S3 usando criptografia do lado do servidor com chaves de criptografia gerenciadas pelo AWS (SSE-S3 ou SSE-KMS), criptografia do lado do cliente ou ambos. COPY não dá suporte à criptografia do lado do servidor do Amazon S3 com uma chave fornecida pelo cliente (SSE-C).

Parâmetros

O mapa de parâmetros ou OPÇÕES fornecido no Spark SQL dá suporte às seguintes configurações:

Parâmetro Obrigatório Padrão Descrição
dbtable Sim, a menos que a consulta seja especificada. Nenhum A tabela a ser criada ou lida no Redshift. Esse parâmetro é necessário quando os dados são salvos novamente no Redshift.
consulta Sim, a menos que dbtable seja especificado. Nenhum A consulta a ser lida no Redshift.
usuário Não Nenhum O nome de usuário do Redshift. Precisa ser usado em conjunto com a opção de senha. Só pode ser usado se o usuário e a senha não forem transmitidos na URL. Transmitir os dois resultará em erro. Use esse parâmetro quando o nome de usuário contiver caracteres especiais que precisam de escape.
password Não Nenhum A senha do Redshift. Precisa ser usado em conjunto com a opção user. Só pode ser usado se o usuário e a senha não forem transmitidos na URL. Transmitir os dois resultará em erro. Use esse parâmetro quando a senha contiver caracteres especiais que precisam de escape.
url Sim Nenhum Uma URL JDBC, do formato
jdbc:subprotocol://<host>:<port>/database?user=<username>&password=<password>

subprotocol pode ser postgresql ou redshift, dependendo de qual driver JDBC tiver sido carregado. Um driver compatível com o Redshift deve estar no caminho de classe e corresponder a essa URL. host e port devem apontar para o Node mestre do Redshift. Portanto, os grupos de segurança e/ou a VPC devem ser configurados para permitir o acesso do aplicativo do driver.
database identifica um nome user de banco de dados do Redshift e password são credenciais para acessar o banco de dados, que devem ser incorporadas a essa URL para JDBC, e sua conta de usuário deve ter os privilégios necessários para a tabela que está sendo referenciada.
search_path Não Nenhum Defina o caminho da pesquisa de esquema no Redshift. Será definido usando o comando SET search_path to. Deve ser uma lista de nomes de esquema separada por vírgulas para pesquisar tabelas. Confira a documentação do Redshift referente a search_path.
aws_iam_role Somente se estiver usando funções do IAM para autorizar. Nenhum ARN totalmente especificado da Função de operações COPY/UNLOAD do IAM para Redshift anexada ao cluster do Redshift, por exemplo, arn:aws:iam::123456789000:role/<redshift-iam-role>.
forward_spark_s3_credentials Não false Se o valor for true, a fonte de dados descobrirá automaticamente as credenciais que o Spark está usando para se conectar ao S3 e as encaminhará para o Redshift por meio do JDBC. Essas credenciais são enviadas como parte da consulta JDBC, portanto, é altamente recomendável habilitar a criptografia SSL da conexão JDBC ao usar essa opção.
temporary_aws_access_key_id Não Nenhum Chave de acesso da AWS, deve ter permissões de gravação no bucket do S3.
temporary_aws_secret_access_key Não Nenhum Chave de acesso de segredo da AWS correspondente à chave de acesso fornecida.
temporary_aws_session_token Não Nenhum Token de sessão da AWS correspondente à chave de acesso fornecida.
tempdir Sim Nenhum Uma localização gravável no Amazon S3, a ser usada em dados descarregados durante a leitura, e dados Avro a serem carregados no Redshift durante a gravação. Se você estiver usando a fonte de dados do Redshift para o Spark como parte de um pipeline de ETL regular, pode ser útil definir uma Política de Ciclo de Vida em um bucket e usá-la como uma localização temporária para esses dados.

Você não pode usar Locais externos definidos no Catálogo do Unity como locais de tempdir.
jdbcdriver Não Determinado pelo subprotocolo da URL JDBC. O nome de classe do driver JDBC a ser usado. Essa classe precisa estar no caminho de classe. Na maioria dos casos, não é necessário especificar essa opção, pois o nome de classe apropriado do driver deve ser determinado automaticamente pelo subprotocolo da URL JDBC.
diststyle Não EVEN O Estilo de Distribuição do Redshift a ser usado ao criar uma tabela. Pode ser um dos EVEN, KEY ou ALL (consulte os documentos do Redshift). Ao usar KEY, você também deve definir uma chave de distribuição com a opção distkey.
distkey Não, a menos que use DISTSTYLE KEY Nenhum O nome de uma coluna na tabela a ser usada como chave de distribuição ao criar uma tabela.
sortkeyspec Não Nenhum Uma definição completa da Chave de Classificação do Redshift. Os exemplos incluem:

- SORTKEY(my_sort_column)
- COMPOUND SORTKEY(sort_col_1, sort_col_2)
- INTERLEAVED SORTKEY(sort_col_1, sort_col_2)
usestagingtable (Preterido) Não true Definir essa opção preterida como false faz com que a tabela de destino de uma operação de substituição seja descartada imediatamente no início da gravação, tornando a operação de substituição não atômica e reduzindo a disponibilidade da tabela de destino. Isso pode reduzir os requisitos temporários de espaço em disco para substituições.

Como a configuração da operação usestagingtable=false corre o risco de perda ou indisponibilidade de dados, ela foi preterida em favor de exigir que você descarte manualmente a tabela de destino.
descrição Não Nenhum Uma descrição da tabela. Será definido usando o comando SQL COMMENT e deve aparecer na maioria das ferramentas de consulta. Confira também os metadados de description para definir descrições em colunas individuais.
preactions Não Nenhum Uma lista separada de ; referente a comandos SQL a serem executados antes de carregar o comando COPY. Pode ser útil ter alguns comandos DELETE ou similares executados aqui antes de carregar novos dados. Se o comando contiver %s, o nome da tabela será formatado antes da execução (caso esteja usando uma tabela de preparo).

Tenha em mente que, se esses comandos falharem, isso será tratado como um erro e uma exceção será aplicada. Se estiver usando uma tabela de preparo, as alterações serão revertidas e a tabela de backup restaurada em caso de falha das pré-ações.
postactions Não Nenhum Uma lista separada de ; referente a comandos SQL a serem executados após um COPY bem-sucedido durante o carregamento de dados. Pode ser útil ter alguns comandos GRANT ou similares executados aqui ao carregar novos dados. Se o comando contiver %s, o nome da tabela será formatado antes da execução (caso esteja usando uma tabela de preparo).

Tenha em mente que, se esses comandos falharem, isso será tratado como um erro e uma exceção será aplicada. Se estiver usando uma tabela de preparo, as alterações serão revertidas e a tabela de backup restaurada em caso de falha das ações posteriores.
extracopyoptions Não Nenhum Uma lista de opções extras a serem acrescentadas ao comando COPY do Redshift ao carregar dados, por exemplo,
TRUNCATECOLUMNS ou MAXERROR n (confira os documentos do Redshift para consultar outras opções).

Como essas opções são acrescentadas ao final do comando COPY, apenas as opções que fazem sentido no final do comando podem ser usadas, mas isso cobre a maioria dos casos de uso possíveis.
tempformat Não AVRO O formato no qual salvar arquivos temporários no S3 ao gravar no Redshift. Usa como padrão
AVRO; os outros valores permitidos são CSV e CSV GZIP para CSV e CSV compactado com gzip, respectivamente.

O Redshift é muito mais rápido ao carregar arquivos CSV do que Avro, portanto, usar esse tempformat pode fornecer um grande aumento de desempenho ao gravar no Redshift.
csvnullstring Não @NULL@ O valor de Sequência a ser gravado para nulos ao usar o tempformat CSV. Esse valor não deve aparecer nos dados reais.
csvseparator Não , Separador a ser usado ao gravar arquivos temporários com tempformat definido como CSV ou
CSV GZIP. Deve ser um caractere ASCII válido, por exemplo, “,” ou “\|”.
csvignoreleadingwhitespace Não true Se definido como verdadeiro, remove o espaço em branco de entrelinhamento dos valores durante gravações quando
tempformat é definido como CSV ou CSV GZIP. Caso contrário, o espaço em branco será mantido.
csvignoretrailingwhitespace Não true Se definido como verdadeiro, remove o espaço em branco à direita dos valores durante gravações quando
tempformat é definido como CSV ou CSV GZIP. Caso contrário, o espaço em branco será mantido.
infer_timestamp_ntz_type Não false Se for true, os valores do tipo TIMESTAMP do Redshift serão interpretados como TimestampNTZType (carimbo de data/hora sem fuso horário) durante as leituras. Caso contrário, todos os carimbos de data/hora serão interpretados como TimestampType independentemente do tipo na tabela subjacente do Redshift.

Opções de configuração adicionais

Configurando o tamanho máximo das colunas de cadeia de caracteres

Ao criar tabelas do Redshift, o comportamento padrão é a criação de colunas TEXT para colunas de cadeia de caracteres. O Redshift armazena colunas TEXT como VARCHAR(256), portanto, essas colunas têm um tamanho máximo de 256 caracteres (origem).

Para dar suporte a colunas maiores, você pode usar o campo de metadados da coluna maxlength para especificar o comprimento máximo das colunas de cadeia de caracteres individuais. Isso também é útil para implementar otimizações de desempenho de economia de espaço declarando colunas com um comprimento máximo menor do que o padrão.

Observação

Devido a limitações no Spark, as APIs de linguagem SQL e R não dão suporte à modificação de metadados de coluna.

Python

df = ... # the dataframe you'll want to write to Redshift

# Specify the custom width of each column
columnLengthMap = {
  "language_code": 2,
  "country_code": 2,
  "url": 2083,
}

# Apply each column metadata customization
for (colName, length) in columnLengthMap.iteritems():
  metadata = {'maxlength': length}
  df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))

df.write \
  .format("com.databricks.spark.redshift") \
  .option("url", jdbcURL) \
  .option("tempdir", s3TempDirectory) \
  .option("dbtable", sessionTable) \
  .save()

Scala

Aqui está um exemplo de atualização dos campos de metadados de várias colunas usando a API Scala do Spark:

import org.apache.spark.sql.types.MetadataBuilder

// Specify the custom width of each column
val columnLengthMap = Map(
  "language_code" -> 2,
  "country_code" -> 2,
  "url" -> 2083
)

var df = ... // the dataframe you'll want to write to Redshift

// Apply each column metadata customization
columnLengthMap.foreach { case (colName, length) =>
  val metadata = new MetadataBuilder().putLong("maxlength", length).build()
  df = df.withColumn(colName, df(colName).as(colName, metadata))
}

df.write
  .format("com.databricks.spark.redshift")
  .option("url", jdbcURL)
  .option("tempdir", s3TempDirectory)
  .option("dbtable", sessionTable)
.save()

Definir um tipo de coluna personalizado

Se você precisar definir manualmente um tipo de coluna, poderá usar os metadados da coluna redshift_type. Por exemplo, se você quiser substituir o correspondente do tipo Spark SQL Schema -> Redshift SQL para atribuir um tipo de coluna definido pelo usuário, poderá fazer o seguinte:

Python

# Specify the custom type of each column
columnTypeMap = {
  "language_code": "CHAR(2)",
  "country_code": "CHAR(2)",
  "url": "BPCHAR(111)",
}

df = ... # the dataframe you'll want to write to Redshift

# Apply each column metadata customization
for colName, colType in columnTypeMap.items():
  metadata = {'redshift_type': colType}
  df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))

Scala

import org.apache.spark.sql.types.MetadataBuilder

// Specify the custom type of each column
val columnTypeMap = Map(
  "language_code" -> "CHAR(2)",
  "country_code" -> "CHAR(2)",
  "url" -> "BPCHAR(111)"
)

var df = ... // the dataframe you'll want to write to Redshift

// Apply each column metadata customization
columnTypeMap.foreach { case (colName, colType) =>
  val metadata = new MetadataBuilder().putString("redshift_type", colType).build()
  df = df.withColumn(colName, df(colName).as(colName, metadata))
}

Configurar codificação da coluna

Ao criar uma tabela, use o campo de metadados da coluna encoding para especificar uma codificação de compactação para cada coluna (consulte Documentos da Amazon para obter as codificações disponíveis).

Definição das descrições em colunas

O Redshift permite que as colunas tenham descrições anexadas que devem aparecer na maioria das ferramentas de consulta (usando o comando COMMENT). Você pode definir o campo de metadados da coluna description para especificar uma descrição para colunas individuais.

Pushdown de consulta no Redshift

O otimizador do Spark envia por push os seguintes operadores para o Redshift:

  • Filter
  • Project
  • Sort
  • Limit
  • Aggregation
  • Join

No Project e Filter, ele dá suporte às seguintes expressões:

  • A maioria dos operadores lógicos boolianos
  • Comparações
  • Operadores aritméticos básicos
  • Conversões numéricas e de cadeia de caracteres
  • A maioria das funções de cadeia de caracteres
  • Subconsultas escalares, se puderem ser totalmente enviadas para o Redshift.

Observação

Esse pushdown não dá suporte a expressões que operam em datas e carimbos de data/hora.

No Aggregation, ele dá suporte às seguintes funções de agregação:

  • AVG
  • COUNT
  • MAX
  • MIN
  • SUM
  • STDDEV_SAMP
  • STDDEV_POP
  • VAR_SAMP
  • VAR_POP

combinado com a cláusula DISTINCT, quando aplicável.

No Join, ele dá suporte aos seguintes tipos de junções:

  • INNER JOIN
  • LEFT OUTER JOIN
  • RIGHT OUTER JOIN
  • LEFT SEMI JOIN
  • LEFT ANTI JOIN
  • Subconsultas que são regeneradas em Join pelo otimizador, por exemplo WHERE EXISTS, WHERE NOT EXISTS

Observação

O pushdown de junção não dá suporte a FULL OUTER JOIN.

O pushdown pode ser mais benéfico nas consultas com LIMIT. Uma consulta como SELECT * FROM large_redshift_table LIMIT 10 poderia levar muito tempo, pois a tabela inteira seria descarregada primeiro para S3 como um resultado intermediário. Com o pushdown, LIMIT é executado no Redshift. Em consultas com agregações, o envio por push de agregação para o Redshift também ajuda a reduzir a quantidade de dados que precisam ser transferidos.

O pushdown de consulta no Redshift está habilitado por padrão. Ele pode ser desabilitado definindo spark.databricks.redshift.pushdown como false. Mesmo quando desabilitado, o Spark ainda efetua push de filtros e executa a eliminação de coluna no Redshift.

Instalação do driver do Redshift

A fonte de dados do Redshift também requer um driver JDBC compatível com Redshift. Como o Redshift é baseado no sistema de banco de dados do PostgreSQL, você pode usar o driver JDBC do PostgreSQL incluído no Databricks Runtime ou no driver JDBC do Redshift recomendado pela Amazon. Nenhuma instalação é necessária para usar o driver JDBC do PostgreSQL. A versão do driver JDBC do PostgreSQL incluída em cada versão do Databricks Runtime está listada nas notas sobre a versão do Databricks Runtime.

Para instalar manualmente o driver JDBC do Redshift:

  1. Baixe o driver da Amazon.
  2. Carregue o driver no workspace do Azure Databricks. Consulte Bibliotecas.
  3. Instale a biblioteca em seu cluster.

Observação

O Databricks recomenda o uso da versão mais recente do driver JDBC do Redshift. As versões do driver JDBC do Redshift abaixo de 1.2.41 têm as seguintes limitações:

  • A versão 1.2.16 do driver retorna dados vazios ao usar uma cláusula where em uma consulta SQL.
  • As versões do driver abaixo de 1.2.41 podem retornar resultados inválidos porque a nulidade de uma coluna é relatada incorretamente como "Não anulável" em vez de "Desconhecido".

Garantias transacionais

Esta seção descreve as garantias transacionais da fonte de dados do Redshift para Spark.

Informações gerais sobre as propriedades do Redshift e S3

Para obter informações gerais sobre garantias transacionais do Redshift, consulte o capítulo Gerenciando operações de gravação simultâneas na documentação do Redshift. Em poucas palavras, o Redshift fornece isolamento serializável de acordo com a documentação do comando BEGIN do Redshift:

[embora] você possa usar qualquer um dos quatro níveis de isolamento de transação, o Amazon Redshift processa todos os níveis de isolamento como serializáveis.

De acordo com a documentação do Redshift:

O Amazon Redshift dá suporte a um comportamento de confirmação automática padrão no qual cada comando SQL executado separadamente é confirmado individualmente.

Assim, comandos individuais como COPY e UNLOAD são atômicos e transacionais, embora BEGIN e END explícitos só devem ser necessários para impor a atomicidade de vários comandos ou consultas.

Ao ler e gravar no Redshift, a fonte de dados lê e grava dados no S3. O Spark e o Redshift produzem saída particionada e a armazenam em vários arquivos no S3. De acordo com a documentação do Modelo de consistência de dados do Amazon S3, as operações de listagem de bucket S3 são eventualmente consistentes, portanto, os arquivos devem ir para comprimentos especiais para evitar dados ausentes ou incompletos devido a essa fonte de consistência eventual.

Garantias da fonte de dados do Redshift para Spark

Acrescentar a uma tabela existente

Ao inserir linhas no Redshift, a fonte de dados usa o comando COPY e especifica manifestos para se proteger contra determinadas operações S3 eventualmente consistentes. Como resultado, spark-redshift acrescentado às tabelas existentes têm as mesmas propriedades atômicas e transacionais que os comandos COPY regulares do Redshift.

Criar uma nova tabela (SaveMode.CreateIfNotExists)

A criação de uma nova tabela é um processo em duas etapas, composto por um comando CREATE TABLE seguido por um comando COPY para acrescentar o conjunto inicial de linhas. Ambas as operações são executadas na mesma transação.

Substituir uma tabela existente

Por padrão, a fonte de dados usa transações para executar substituições, que são implementadas excluindo a tabela de destino, criando uma nova tabela vazia e acrescentando linhas.

Se a configuração usestagingtable preterida estiver definida como false, a fonte de dados confirmará o comando DELETE TABLE antes de acrescentar linhas à nova tabela, sacrificando a atomicidade da operação de substituição, mas reduzindo o espaço de preparo que o Redshift precisa durante a substituição.

Consultar a tabela do Redshift

As consultas usam o comando UNLOAD do Redshift para executar uma consulta e salvar seus resultados no S3 e usar manifestos para se proteger contra determinadas operações S3 eventualmente consistentes. Como resultado, as consultas da fonte de dados do Redshift para Spark devem ter as mesmas propriedades de consistência que as consultas regulares do Redshift.

Problemas e soluções comuns

O bucket S3 e o cluster do Redshift estão em regiões diferentes do AWS

Por padrão, as cópias do S3 <–> Redshift não funcionarão se o bucket S3 e o cluster do Redshift estiverem em regiões diferentes do AWS.

Se você tentar ler uma tabela do Redshift quando o bucket S3 estiver em uma região diferente, poderá ver um erro como:

ERROR: S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect.

Da mesma forma, tentar gravar no Redshift usando um bucket S3 em uma região diferente pode causar o seguinte erro:

error:  Problem reading manifest file - S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect
  • Grava: o comando COPY do Redshift dá suporte à especificação explícita da região do bucket S3, para que você possa fazer as gravações no Redshift funcionarem corretamente nesses casos, adicionando region 'the-region-name' à configuração extracopyoptions. Por exemplo, com um bucket no Leste dos EUA (Virgínia) e a API Scala, use:

    .option("extracopyoptions", "region 'us-east-1'")
    

    Como alternativa, você pode usar a configuração awsregion:

    .option("awsregion", "us-east-1")
    
  • Lê: o comando UNLOAD do Redshift também dá suporte à especificação explícita da região do bucket S3. Você pode fazer as leituras funcionarem corretamente adicionando a região à configuração awsregion:

    .option("awsregion", "us-east-1")
    

Erro de autenticação ao usar uma senha com caracteres especiais na URL JDBC

Se você estiver fornecendo o nome de usuário e a senha como parte da URL JDBC e a senha contiver caracteres especiais, como ;, ? ou &, você poderá ver a seguinte exceção:

java.sql.SQLException: [Amazon](500310) Invalid operation: password authentication failed for user 'xyz'

Isso é causado por caracteres especiais no nome de usuário ou senha que não são escapados corretamente pelo driver JDBC. Especifique o nome de usuário e a senha usando as opções user e password do DataFrame correspondentes. Para obter mais informações, confira Parâmetros.

A consulta do Spark de execução prolongada trava indefinidamente, embora a operação correspondente do Redshift seja concluída

Se você estiver lendo ou gravando grandes quantidades de dados de e para o Redshift, sua consulta no Spark poderá travar indefinidamente, embora a página monitoramento do Redshift do AWS mostre que a operação LOAD ou UNLOAD correspondente foi concluída e que o cluster está ocioso. Isso é causado pela conexão entre o Redshift e o tempo limite do Spark. Para evitar isso, verifique se o sinalizador do JDBC tcpKeepAlive está habilitado e TCPKeepAliveMinutes está definido como um valor baixo (por exemplo, 1).

Para obter mais informações, consulte a Configuração do driver JDBC no Amazon Redshift.

Carimbo de data/hora com semântica de fuso horário

Ao ler os dados, os tipos de dados TIMESTAMP e TIMESTAMPTZ do Redshift são mapeados para o Spark TimestampType e um valor é convertido em UTC (Tempo Universal Coordenado) e é armazenado como o carimbo de data/hora UTC. Para um TIMESTAMP do Redshift, o fuso horário local é presumido, pois o valor não tem nenhuma informação de fuso horário. Ao gravar os dados em uma tabela do Redshift, um TimestampType do Spark é mapeado para o tipo de dados TIMESTAMP do Redshift.

Guia de migração

A fonte de dados agora exige que você defina forward_spark_s3_credentials explicitamente antes que as credenciais do Spark S3 sejam encaminhadas para o Redshift. Essa alteração não terá impacto se você usar os mecanismos de autenticação aws_iam_role ou temporary_aws_*. No entanto, se você se baseou no comportamento padrão antigo, agora você deve definir explicitamente forward_spark_s3_credentials como true para continuar usando o mecanismo de autenticação anterior do Redshift para S3. Para uma discussão sobre os três mecanismos de autenticação e suas compensações de segurança, consulte a seção Autenticação para S3 e Redshift deste documento.