Share via


Interroger Amazon Redshift avec Azure Databricks

Vous pouvez lire et écrire des tableaux à partir d’Amazon Redshift avec Azure Databricks.

Remarque

Vous pouvez préférer Lakehouse Federation pour gérer les requêtes vers Redshift. Consultez Présentation de Lakehouse Federation.

La source de données Databricks Redshift utilise Amazon S3 pour transférer efficacement des données depuis et vers Redshift, et utilise Java Database Connectivity (JDBC) pour déclencher automatiquement les commandes Redshift appropriées COPY et UNLOAD.

Remarque

Dans Databricks Runtime 11.3 LTS et versions ultérieures, Databricks Runtime inclut le pilote JDBC Redshift, accessible à l’aide du mot clé redshift comme option de format. Consultez les versions des notes de publication et la compatibilité de Databricks Runtime pour les versions de pilote incluses dans chaque Runtime Databricks. Les pilotes fournis par l’utilisateur sont toujours pris en charge et sont prioritaires sur le pack pilote JDBC.

Dans Databricks Runtime 10.4 LTS et versions antérieures, l’installation manuelle du pilote JDBC Redshift est requise, et pour les requêtes il convient d’utiliser le pilote (com.databricks.spark.redshift) comme format. Consultez Installation du pilote Redshift.

Utilisation

Les exemples suivants illustrent la connexion avec le pilote Redshift. Remplacez les valeurs de paramètre url si vous utilisez le pilote JDBC PostgreSQL.

Une fois que vous avez configuré vos informations d’identification AWS, vous pouvez utiliser la source de données avec l’API de source de données Spark dans Python, SQL, R ou Scala.

Important

Les emplacements externes définis dans Unity Catalog ne sont pas pris en charge en tant qu’emplacements tempdir.

Python

# Read data from a table using Databricks Runtime 10.4 LTS and below
df = (spark.read
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("forward_spark_s3_credentials", True)
  .load()
)

# Read data from a table using Databricks Runtime 11.3 LTS and above
df = (spark.read
  .format("redshift")
  .option("host", "hostname")
  .option("port", "port") # Optional - will use default port 5439 if not specified.
  .option("user", "username")
  .option("password", "password")
  .option("database", "database-name")
  .option("dbtable", "schema-name.table-name") # if schema-name is not specified, default to "public".
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("forward_spark_s3_credentials", True)
  .load()
)

# Read data from a query
df = (spark.read
  .format("redshift")
  .option("query", "select x, count(*) <your-table-name> group by x")
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("forward_spark_s3_credentials", True)
  .load()
)

# After you have applied transformations to the data, you can use
# the data source API to write the data back to another table

# Write back to a table
(df.write
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .mode("error")
  .save()
)

# Write back to a table using IAM Role based authentication
(df.write
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
  .mode("error")
  .save()
)

SQL

Lire des données à l’aide de SQL sur Databricks Runtime 10.4 LTS et versions antérieures :

DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table
USING redshift
OPTIONS (
  dbtable '<table-name>',
  tempdir 's3a://<bucket>/<directory-path>',
  url 'jdbc:redshift://<database-host-url>',
  user '<username>',
  password '<password>',
  forward_spark_s3_credentials 'true'
);
SELECT * FROM redshift_table;

Lire des données à l’aide de SQL sur Databricks Runtime 11.3 LTS et versions ultérieures :


DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table
USING redshift
OPTIONS (
  host '<hostname>',
  port '<port>', /* Optional - will use default port 5439 if not specified. *./
  user '<username>',
  password '<password>',
  database '<database-name>'
  dbtable '<schema-name>.<table-name>', /* if schema-name not provided, default to "public". */
  tempdir 's3a://<bucket>/<directory-path>',
  forward_spark_s3_credentials 'true'
);
SELECT * FROM redshift_table;

Écrire les données à l’aide de SQL :

DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table_new
USING redshift
OPTIONS (
  dbtable '<new-table-name>',
  tempdir 's3a://<bucket>/<directory-path>',
  url 'jdbc:redshift://<database-host-url>',
  user '<username>',
  password '<password>',
  forward_spark_s3_credentials 'true'
) AS
SELECT * FROM table_name;

L’API SQL prend uniquement en charge la création de nouveaux tableaux et non l’écriture ou l’ajout.

R

Lire des données à l’aide de R sur Databricks Runtime 10.4 LTS et versions antérieures :

df <- read.df(
   NULL,
   "com.databricks.spark.redshift",
   tempdir = "s3a://<your-bucket>/<your-directory-path>",
   dbtable = "<your-table-name>",
   url = "jdbc:redshift://<the-rest-of-the-connection-string>")

Lire des données à l’aide de R sur Databricks Runtime 11.3 LTS et versions ultérieures :

df <- read.df(
  NULL,
  "redshift",
  host = "hostname",
  port = "port",
  user = "username",
  password = "password",
  database = "database-name",
  dbtable = "schema-name.table-name",
  tempdir = "s3a://<your-bucket>/<your-directory-path>",
  forward_spark_s3_credentials = "true",
  dbtable = "<your-table-name>")

Scala

// Read data from a table using Databricks Runtime 10.4 LTS and below
val df = spark.read
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("forward_spark_s3_credentials", True)
  .load()

// Read data from a table using Databricks Runtime 11.3 LTS and above
val df = spark.read
  .format("redshift")
  .option("host", "hostname")
  .option("port", "port") /* Optional - will use default port 5439 if not specified. */
  .option("user", "username")
  .option("password", "password")
  .option("database", "database-name")
  .option("dbtable", "schema-name.table-name") /* if schema-name is not specified, default to "public". */
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("forward_spark_s3_credentials", true)
  .load()

// Read data from a query
val df = spark.read
  .format("redshift")
  .option("query", "select x, count(*) <your-table-name> group by x")
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("forward_spark_s3_credentials", True)
  .load()

// After you have applied transformations to the data, you can use
// the data source API to write the data back to another table

// Write back to a table
df.write
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .mode("error")
  .save()

// Write back to a table using IAM Role based authentication
df.write
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
  .mode("error")
  .save()

Suggestions pour travailler avec Redshift

L’exécution des requêtes peut extraire de grandes quantités de données dans S3. Si vous envisagez d’effectuer plusieurs requêtes sur les mêmes données dans Redshift, Databricks recommande d’enregistrer les données extraites à l’aide de Delta Lake.

Configuration

Authentification auprès de S3 et de Redshift

La source de données implique plusieurs connexions réseau, illustrées dans le diagramme suivant :

                            ┌───────┐
       ┌───────────────────>│  S3   │<─────────────────┐
       │    IAM or keys     └───────┘    IAM or keys   │
       │                        ^                      │
       │                        │ IAM or keys          │
       v                        v               ┌──────v────┐
┌────────────┐            ┌───────────┐         │┌──────────┴┐
│  Redshift  │            │  Spark    │         ││   Spark   │
│            │<──────────>│  Driver   │<────────>| Executors │
└────────────┘            └───────────┘          └───────────┘
               JDBC with                  Configured
               username /                     in
               password                     Spark
        (SSL enabled by default)

La source de données lit et écrit des données dans S3 lors du transfert de données vers/depuis Redshift. Par conséquent, elle nécessite des informations d’identification AWS avec un accès en lecture et en écriture à un compartiment S3 (spécifié à l’aide du paramètre de configuration tempdir).

Remarque

La source de données ne nettoie pas les fichiers temporaires qu’elle crée dans S3. Par conséquent, nous vous recommandons d’utiliser un compartiment S3 temporaire dédié avec une configuration de cycle de vie d’objet pour vous assurer que les fichiers temporaires sont automatiquement supprimés après une période d’expiration spécifiée. Pour plus d’informations sur le chiffrement de ces fichiers, consultez la section Chiffrement de ce document. Vous ne pouvez pas utiliser un emplacement externe défini dans Unity Catalog comme emplacement tempdir.

Les sections suivantes décrivent les options de configuration d’authentification pour chaque type de connexion.

Pilote Spark vers Redshift

Le pilote Spark se connecte à Redshift via JDBC à l’aide d’un nom d’utilisateur et d’un mot de passe. Redshift ne prend pas en charge l’utilisation de rôles IAM pour authentifier cette connexion. Par défaut, cette connexion utilise le chiffrement SSL ; pour plus d’informations, consultez Chiffrement.

Spark vers S3

S3 fait office d’intermédiaire pour stocker des données de masse lors de la lecture ou de l’écriture dans Redshift. Spark se connecte à S3 à la fois à l’aide des interfaces Hadoop FileSystem, et directement à l’aide du client S3 du Kit de développement logiciel (SDK) Amazon Java.

Remarque

Vous ne pouvez pas utiliser de montages DBFS pour configurer l’accès à S3 pour Redshift.

  • Définir des clés dans la configuration Hadoop : vous pouvez spécifier des clés AWS à l’aide des propriétés de configuration Hadoop. Si votre configuration tempdir pointe vers un système de fichiers s3a://, vous pouvez définir les propriétés fs.s3a.access.key et fs.s3a.secret.key dans un fichier de configuration XML Hadoop ou appeler sc.hadoopConfiguration.set() pour configurer la configuration Hadoop globale de Spark. Si vous utilisez un système de fichiers s3n://, vous pouvez fournir les clés de configuration héritées, comme illustré dans l’exemple suivant.

    Scala

    Par exemple, si vous utilisez le système de fichiers s3a, ajoutez :

    sc.hadoopConfiguration.set("fs.s3a.access.key", "<your-access-key-id>")
    sc.hadoopConfiguration.set("fs.s3a.secret.key", "<your-secret-key>")
    

    Pour le système de fichiers hérité s3n, ajoutez :

    sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "<your-access-key-id>")
    sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "<your-secret-key>")
    
    Python

    La commande suivante s’appuie sur des éléments Spark internes, mais elle fonctionne en principe avec toutes les versions de PySpark. De plus, il est peu probable qu’elle change à l’avenir :

      sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", "<your-access-key-id>")
      sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "<your-secret-key>")
    

Redshift vers S3

Définissez l’option forward_spark_s3_credentials sur true pour transférer automatiquement les informations d’identification de clé AWS que Spark utilise pour se connecter à S3 via JDBC vers Redshift. La requête JDBC incorpore ces informations d’identification. Par conséquent, Databricks recommande vivement d’activer le chiffrement SSL de la connexion JDBC.

Chiffrement

  • Sécurisation de JDBC : sauf si des paramètres liés au protocole SSL sont présents dans l’URL JDBC, la source de données par défaut active le chiffrement SSL et vérifie également la fiabilité du serveur Redshift (autrement dit, sslmode=verify-full). Pour cela, un certificat de serveur est téléchargé automatiquement à partir des serveurs Amazon la première fois que cela est nécessaire. En cas d’échec, un fichier de certificat pré-groupé est utilisé comme moyen de secours. Cela est valable à la fois pour les pilotes Redshift et PostgreSQL JDBC.

    En cas de problèmes avec cette fonctionnalité, ou si vous souhaitez simplement désactiver SSL, vous pouvez appeler .option("autoenablessl", "false") sur votre DataFrameReader ou DataFrameWriter.

    Si vous souhaitez spécifier des paramètres personnalisés liés au protocole SSL, vous pouvez suivre les instructions de la documentation Redshift : Utilisation de certificats SSL et de certificats de serveur sur Java et les options de configuration du pilote JDBC Toutes les options liées au protocole SSL présentes dans l’url JDBC utilisées avec la source de données sont prioritaires (autrement dit, la configuration automatique ne se déclenche pas).

  • Chiffrement des données UNLOAD stockées dans S3 (données stockées lors de la lecture à partir de Redshift) : selon la documentation Redshift sur le déchargement des données vers S3, « UNLOAD chiffre automatiquement les fichiers de données à l’aide du chiffrement côté serveur Amazon S3 (SSE-S3) ».

    Redshift prend également en charge le chiffrement côté client avec une clé personnalisée (voir : Déchargement de fichiers de données chiffrés), mais la source de données ne peut pas spécifier la clé symétrique requise.

  • Chiffrement des données COPY stockées dans S3 (données stockées lors de l’écriture dans Redshift) : selon la documentation Redshift sur le chargement de fichiers de données chiffrés à partir d’Amazon S3 :

Vous pouvez utiliser la commande COPY pour charger des fichiers de données qui ont été chargés sur Amazon S3 par un chiffrement côté serveur avec clés de chiffrement gérées par AWS (SSE-S3 ou SSE-KMS), un chiffrement côté client, ou les deux. COPY ne prend pas en charge le chiffrement côté serveur avec une clé fournie par le client (SSE-C).

Paramètres

Le mappage de paramètres ou OPTIONS fourni dans Spark SQL prend en charge les paramètres suivants :

Paramètre Requis
dbtable Oui, sauf si la requête est spécifiée.
query Oui, sauf si dbtable est spécifié.
utilisateur Non
mot de passe Non
url Oui
search_path Non
aws_iam_role Uniquement si vous utilisez des rôles IAM pour les autorisations.
forward_spark_s3_credentials Non
temporary_aws_access_key_id Non
temporary_aws_secret_access_key Non
temporary_aws_session_token Non
tempdir Oui
jdbcdriver Non
diststyle Non
distkey Non, sauf si vous utilisez DISTSTYLE KEY
sortkeyspec Non
usetagingtable (déconseillé) Non
description Non
preactions Non
postactions Non
extracopyoptions Non
tempformat Non
csvnullstring Non
csvseparator Non , Séparateur à utiliser lors de l’écriture de fichiers temporaires avec tempformat défini sur CSV ou
CSV GZIP. Il doit s’agir d’un caractère ASCII valide, par exemple, « , » ou « | ».
csvignoreleadingwhitespace Non
csvignoretrailingwhitespace Non
infer_timestamp_ntz_type Non

Options de configuration supplémentaires

Configuration de la taille maximale des colonnes de chaîne

Lors de la création de tableaux Redshift, le comportement par défaut consiste à créer des colonnes TEXT pour les colonnes de chaîne. Redshift stocke les colonnes TEXT en tant que VARCHAR(256), de sorte que ces colonnes ont une taille maximale de 256 caractères (source).

Pour prendre en charge des colonnes plus volumineuses, vous pouvez utiliser le champ de métadonnées de colonne maxlength afin de spécifier la longueur maximale des colonnes de chaîne individuelles. Cela est également utile à l’optimisation des performances d’économie d’espace en indiquant des colonnes dont la longueur maximale est inférieure à la valeur par défaut.

Remarque

En raison de limitations dans Spark, les API de langage SQL et R ne prennent pas en charge la modification des métadonnées de colonne.

Python

df = ... # the dataframe you'll want to write to Redshift

# Specify the custom width of each column
columnLengthMap = {
  "language_code": 2,
  "country_code": 2,
  "url": 2083,
}

# Apply each column metadata customization
for (colName, length) in columnLengthMap.iteritems():
  metadata = {'maxlength': length}
  df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))

df.write \
  .format("com.databricks.spark.redshift") \
  .option("url", jdbcURL) \
  .option("tempdir", s3TempDirectory) \
  .option("dbtable", sessionTable) \
  .save()

Scala

Voici un exemple de mise à jour des champs de métadonnées de plusieurs colonnes à l’aide de l’API Scala de Spark :

import org.apache.spark.sql.types.MetadataBuilder

// Specify the custom width of each column
val columnLengthMap = Map(
  "language_code" -> 2,
  "country_code" -> 2,
  "url" -> 2083
)

var df = ... // the dataframe you'll want to write to Redshift

// Apply each column metadata customization
columnLengthMap.foreach { case (colName, length) =>
  val metadata = new MetadataBuilder().putLong("maxlength", length).build()
  df = df.withColumn(colName, df(colName).as(colName, metadata))
}

df.write
  .format("com.databricks.spark.redshift")
  .option("url", jdbcURL)
  .option("tempdir", s3TempDirectory)
  .option("dbtable", sessionTable)
.save()

Définissez un type de colonne personnalisé

Si vous devez définir manuellement un type de colonne, vous pouvez utiliser les métadonnées de colonne redshift_type. Par exemple, si vous souhaitez substituer le matcher de type Spark SQL Schema -> Redshift SQL pour affecter un type de colonne défini par l’utilisateur, vous pouvez effectuer les opérations suivantes :

Python

# Specify the custom type of each column
columnTypeMap = {
  "language_code": "CHAR(2)",
  "country_code": "CHAR(2)",
  "url": "BPCHAR(111)",
}

df = ... # the dataframe you'll want to write to Redshift

# Apply each column metadata customization
for (colName, colType) in columnTypeMap.iteritems():
  metadata = {'redshift_type': colType}
  df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))

Scala

import org.apache.spark.sql.types.MetadataBuilder

// Specify the custom type of each column
val columnTypeMap = Map(
  "language_code" -> "CHAR(2)",
  "country_code" -> "CHAR(2)",
  "url" -> "BPCHAR(111)"
)

var df = ... // the dataframe you'll want to write to Redshift

// Apply each column metadata customization
columnTypeMap.foreach { case (colName, colType) =>
  val metadata = new MetadataBuilder().putString("redshift_type", colType).build()
  df = df.withColumn(colName, df(colName).as(colName, metadata))
}

Configurer le chiffrement de colonne

Lors de la création d’une table, utilisez le champ de métadonnées de colonne encoding pour spécifier un encodage de compression pour chaque colonne (consultez la documentation Amazon pour les encodages disponibles).

Définir des descriptions sur les colonnes

Redshift permet aux colonnes d’avoir des descriptions jointes qui apparaissent dans la plupart des outils de requête (à l’aide de la commande COMMENT). Vous pouvez définir le champ de métadonnées de colonne description pour spécifier une description chaque colonne individuelle.

Requête pushdown dans Redshift

L’optimiseur Spark envoie (push) les opérateurs suivants vers Redshift :

  • Filter
  • Project
  • Sort
  • Limit
  • Aggregation
  • Join

Sur Project et Filter, il prend en charge les expressions suivantes :

  • La plupart des opérateurs logiques booléens
  • Comparaisons
  • Opérations arithmétiques de base
  • Casts numériques et de chaînes
  • La plupart des fonctions de chaîne
  • Sous-requêtes scalaires, si elles peuvent être entièrement envoyées (push) vers Redshift.

Remarque

Ce pushdown ne prend pas en charge les expressions qui fonctionnent sur des dates et des timestamps.

Sur Aggregation, il prend en charge les fonctions d’agrégation suivantes :

  • AVG
  • COUNT
  • MAX
  • MIN
  • SUM
  • STDDEV_SAMP
  • STDDEV_POP
  • VAR_SAMP
  • VAR_POP

combiné à la clause DISTINCT, le cas échéant.

Sur Join, il prend en charge les types de jointures suivants :

  • INNER JOIN
  • LEFT OUTER JOIN
  • RIGHT OUTER JOIN
  • LEFT SEMI JOIN
  • LEFT ANTI JOIN
  • Sous-requêtes réécrites en Join par l’optimiseur, par exemple WHERE EXISTS, WHERE NOT EXISTS

Remarque

La jointure pushdown ne prend pas FULL OUTER JOIN en charge.

Le pushdown serait le plus bénéfique pour les requêtes avec LIMIT. Une requête telle que SELECT * FROM large_redshift_table LIMIT 10 peut prendre beaucoup de temps, car l’ensemble du tableau serait d’abord UNLOADé sur S3 comme résultat intermédiaire. Avec le pushdown, le paramètre LIMIT est exécuté dans Redshift. Dans les requêtes avec des agrégations, l’envoi (push) de l’agrégation dans Redshift permet également de réduire la quantité de données qui doivent être transférées.

La requête pushdown dans Redshift est activée par défaut. Cette fonctionnalité peut être désactivée en définissant spark.databricks.redshift.pushdown sur false. Même en cas de désactivation, Spark envoie (push) toujours les filtres et effectue l’élimination des colonnes dans Redshift.

Installation du pilote Redshift

La source de données Redshift nécessite également un pilote JDBC compatible avec Redshift. Étant donné que Redshift est basé sur le système de base de données PostgreSQL, vous pouvez utiliser le pilote JDBC PostgreSQL inclus avec Databricks Runtime ou le pilote JDBC pour Redshift recommandé par Amazon. Aucune installation n’est requise pour utiliser le pilote JDBC PostgreSQL. La version du pilote JDBC PostgreSQL incluse dans chaque version Databricks Runtime est répertoriée dans les notes de publication de Databricks Runtime.

Pour installer manuellement le pilote JDBC Redshift :

  1. Téléchargez le pilote à partir d’Amazon.
  2. Chargez le pilote dans votre espace de travail Azure Databricks. Consultez Bibliothèques.
  3. Installez la bibliothèque sur votre cluster.

Remarque

Databricks recommande d’utiliser la dernière version du pilote JDBC Redshift. Les versions du pilote JDBC Redshift inférieures à la version 1.2.41 présentent les limitations suivantes :

  • La version 1.2.16 du pilote renvoie des données vides lors de l’utilisation d’une clause where dans une requête SQL.
  • Les versions du pilote inférieures à la version 1.2.41 peuvent renvoyer des résultats non valides, car la possibilité de valeur Nul d’une colonne est signalée de façon incorrecte comme « Non Nullable » au lieu d’« Inconnu ».

Garanties transactionnelles

Cette section décrit les garanties transactionnelles de la source de données Redshift pour Spark.

Arrière-plan général sur les propriétés Redshift et S3

Pour obtenir des informations générales sur les garanties transactionnelles Redshift, consultez le chapitre Gestion des opérations d’écriture simultanées dans la documentation Redshift. En bref, Redshift fournit une isolation sérialisable en fonction de la documentation de la commande BEGIN Redshift :

[Même si] vous pouvez utiliser l’un des quatre niveaux d’isolation des transactions, Amazon Redshift traite tous les niveaux d’isolation comme sérialisables.

Conformément à la documentation Redshift :

Amazon Redshift prend en charge un comportement de validation automatique par défaut dans lequel chaque commande SQL exécutée séparément est validée individuellement.

Ainsi, des commandes individuelles comme COPY et UNLOAD sont atomiques et transactionnelles, tandis que les commandes explicites BEGIN et END ne doivent être nécessaires que pour appliquer l’atomicité de plusieurs commandes ou requêtes.

Lors de la lecture et de l’écriture dans Redshift, la source de données lit et écrit des données dans S3. Spark et Redshift produisent tous deux une sortie partitionnée et la stockent dans plusieurs fichiers sur S3. Selon la documentation du modèle de cohérence des données Amazon S3, les opérations de référencement de compartiments S3 sont cohérentes. En raison de cette cohérence éventuelle, les fichiers doivent atteindre des longueurs spéciales afin d’éviter les données manquantes ou incomplètes.

Garanties de la source de données Redshift pour Spark

Ajouter à un tableau existant

Lors de l’insertion de lignes dans Redshift, la source de données utilise la commande COPY,et spécifie des manifestes pour se protéger de certaines opérations cohérentes S3. Par conséquent, spark-redshift les ajouts aux tableaux existants ont les mêmes propriétés atomiques et transactionnelles que les commandes Redshift COPY standard.

Créez un nouveau tableau (SaveMode.CreateIfNotExists)

La création d’un nouveau tableau est un processus en deux étapes, constitué d’une commande CREATE TABLE suivie d’une commande COPY pour s’ajouter à l’ensemble initial de lignes. Les deux opérations sont effectuées dans la même transaction.

Remplacer un tableau existant

Par défaut, la source de données utilise les transactions pour effectuer des remplacements. Ils sont implémentés en supprimant la table de destination, ce qui crée un nouveau tableau vide et y ajoute des lignes.

Si le paramètre déconseillé usestagingtable est défini sur false, la source de données valide la commande DELETE TABLE avant d’ajouter des lignes au nouveau tableau. Cela sacrifie l’atomicité de l’opération de remplacement, mais réduit la quantité d’espace de transit nécessaire à Redshift pendant l’opération.

Tableau de requête Redshift

Les requêtes utilisent la commande Redshift UNLOAD pour exécuter une requête et enregistrer ses résultats dans S3, et utilisent des manifestes pour se protéger contre certaines opérations cohérentes S3. Par conséquent, les requêtes provenant de la source de données Redshift pour Spark doivent avoir les mêmes propriétés de cohérence que les requêtes Redshift standard.

Problèmes courants et solutions

Les compartiments S3 et les clusters Redshift se trouvent dans différentes régions AWS

Par défaut, les copies S3 Redshift <-> ne fonctionnent pas si le compartiment S3 et le cluster Redshift se trouvent dans des régions AWS différentes.

Si vous essayez de lire un tableau Redshift lorsque le compartiment S3 se trouve dans une autre région, vous pouvez voir une erreur comme :

ERROR: S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect.

De même, une tentative d’écriture dans Redshift à l’aide d’un compartiment S3 situé dans une autre région peut entraîner l’erreur suivante :

error:  Problem reading manifest file - S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect
  • Écriture : La commande Redshift COPY prend en charge la spécification de la région du compartiment S3. Dans ces cas, vous pouvez donc faire fonctionner correctement les écritures dans Redshift en ajoutant region 'the-region-name' au paramètre extracopyoptions. Par exemple, avec un compartiment dans la région de l’est des États-Unis (Virginie) et l’API Scala, utilisez :

    .option("extracopyoptions", "region 'us-east-1'")
    

    Vous pouvez également utiliser le paramètre awsregion :

    .option("awsregion", "us-east-1")
    
  • Lecture : La commande Redshift UNLOAD prend également en charge la spécification de la région du compartiment S3. Vous pouvez faire fonctionner correctement les lectures en ajoutant la région au paramètre awsregion :

    .option("awsregion", "us-east-1")
    

Erreur d’authentification lors de l’utilisation d’un mot de passe avec des caractères spéciaux dans l’URL JDBC

Si vous fournissez le nom d’utilisateur et le mot de passe dans le cadre de l’URL JDBC et que le mot de passe contient des caractères spéciaux tels que ;, ? ou &, vous pourrez voir l’exception suivante :

java.sql.SQLException: [Amazon](500310) Invalid operation: password authentication failed for user 'xyz'

Cela est causé par le fait que des caractères spéciaux dans le nom d’utilisateur ou le mot de passe ne sont pas placés correctement dans la séquence d’échappement par le pilote JDBC. Veillez à spécifier le nom d’utilisateur et le mot de passe à l’aide des options DataFrame correspondantes user et password. Pour plus d’informations, consultez Paramètres.

La requête de longue durée Spark se bloque indéfiniment même si l’opération Redshift correspondante est terminée

Si vous lisez ou écrivez de grandes quantités de données depuis et vers Redshift, votre requête Spark peut se bloquer indéfiniment, même si la page d’analyse AWS Redshift indique que l’opération correspondante LOAD ou UNLOAD est terminée et que le cluster est inactif. Cela est dû au délai d’attente entre Redshift et Spark. Pour éviter cela, vérifiez que l’indicateur JDBC tcpKeepAlive est activé et que TCPKeepAliveMinutes est défini sur une valeur faible (par exemple, 1).

Pour plus d’informations, consultez Configuration du pilote JDBC Amazon Redshift.

Sémantique de timestamp avec fuseau horaire

Lors de la lecture de données, Redshift TIMESTAMP et TIMESTAMPTZ les types de données sont mappés à Spark TimestampType, et une valeur est convertie en temps universel coordonné (UTC) et est stockée en tant que timestamp UTC. Pour un TIMESTAMP Redshift, le fuseau horaire local est supposé, puisque la valeur n’a pas d’information de fuseau horaire. Lors de l’écriture de données dans un tableau Redshift, un élément TimestampType Spark est mappé au type de données TIMESTAMP Redshift.

Guide de migration

La source de données vous oblige désormais à définir forward_spark_s3_credentials explicitement avant que les informations d’identification Spark S3 soient transférées à Redshift. Cette modification n’a aucun impact si vous utilisez les mécanismes d’authentification aws_iam_role ou temporary_aws_*. Toutefois, si vous vous appuyez sur l’ancien comportement par défaut, vous devrez configurer explicitement forward_spark_s3_credentials sur true pour continuer à utiliser votre mécanisme d’authentification Redshift vers S3 précédent. Pour une discussion sur les trois mécanismes d’authentification et leurs compromis de sécurité, consultez la section S’authentifier au S3 et à Redshift de ce document.