Abfragen von Amazon Redshift mithilfe von Azure Databricks

Sie können Tabellen aus Amazon Redshift mit Azure Databricks lesen und schreiben.

Hinweis

Möglicherweise bevorzugen Sie Lakehouse Federation für die Verwaltung von Abfragen in Redshift. Weitere Informationen unter Was ist Lakehouse Federation.

Die Datenquelle „Databricks Redshift“ verwendet Amazon S3 zum effizienten Übertragen von Daten in und aus Redshift. Sie verwendet JDBC, um die entsprechenden Befehle COPY und UNLOAD automatisch in Redshift auszulösen.

Hinweis

In Databricks Runtime 11.3 LTS und höher enthält Databricks Runtime den Redshift JDBC-Treiber, auf den mithilfe des Schlüsselworts redshift für die Formatoption zugegriffen werden kann. Weitere Informationen zu den Treiberversionen, die in jeder Databricks Runtime-Instanz enthalten sind, finden Sie unter Versionshinweise zu Databricks Runtime-Versionen und -Kompatibilität. Vom Benutzer bereitgestellte Treiber werden weiterhin unterstützt und haben vor dem gebündelten JDBC-Treiber Vorrang.

In Databricks Runtime 10.4 LTS und früher ist eine manuelle Installation des Redshift JDBC-Treibers erforderlich, und für Abfragen muss der Treiber (com.databricks.spark.redshift) für das Format verwendet werden. Siehe Redshift-Treiberinstallation.

Verwendung

In den folgenden Beispielen wird die Verbindung mit dem Redshift-Treiber veranschaulicht. Ersetzen Sie die url Parameterwerte, wenn Sie den PostgreSQL JDBC-Treiber verwenden.

Nachdem Sie Ihre AWS-Anmeldeinformationen konfiguriert haben, können Sie die Datenquelle mit der Spark-Datenquellen-API in Python, SQL, R oder Scala verwenden.

Wichtig

Externe Speicherorte, die im Unity-Katalog definiert sind, werden nicht als tempdir-Speicherorte unterstützt.

Python

# Read data from a table using Databricks Runtime 10.4 LTS and below
df = (spark.read
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("forward_spark_s3_credentials", True)
  .load()
)

# Read data from a table using Databricks Runtime 11.3 LTS and above
df = (spark.read
  .format("redshift")
  .option("host", "hostname")
  .option("port", "port") # Optional - will use default port 5439 if not specified.
  .option("user", "username")
  .option("password", "password")
  .option("database", "database-name")
  .option("dbtable", "schema-name.table-name") # if schema-name is not specified, default to "public".
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("forward_spark_s3_credentials", True)
  .load()
)

# Read data from a query
df = (spark.read
  .format("redshift")
  .option("query", "select x, count(*) <your-table-name> group by x")
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("forward_spark_s3_credentials", True)
  .load()
)

# After you have applied transformations to the data, you can use
# the data source API to write the data back to another table

# Write back to a table
(df.write
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .mode("error")
  .save()
)

# Write back to a table using IAM Role based authentication
(df.write
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
  .mode("error")
  .save()
)

SQL

Lesen von Daten mithilfe von SQL in Databricks Runtime 10.4 LTS und früher:

DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table
USING redshift
OPTIONS (
  dbtable '<table-name>',
  tempdir 's3a://<bucket>/<directory-path>',
  url 'jdbc:redshift://<database-host-url>',
  user '<username>',
  password '<password>',
  forward_spark_s3_credentials 'true'
);
SELECT * FROM redshift_table;

Lesen von Daten mithilfe von SQL in Databricks Runtime 11.3 LTS und höher:


DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table
USING redshift
OPTIONS (
  host '<hostname>',
  port '<port>', /* Optional - will use default port 5439 if not specified. *./
  user '<username>',
  password '<password>',
  database '<database-name>'
  dbtable '<schema-name>.<table-name>', /* if schema-name not provided, default to "public". */
  tempdir 's3a://<bucket>/<directory-path>',
  forward_spark_s3_credentials 'true'
);
SELECT * FROM redshift_table;

Schreiben von Daten mithilfe von SQL:

DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table_new
USING redshift
OPTIONS (
  dbtable '<new-table-name>',
  tempdir 's3a://<bucket>/<directory-path>',
  url 'jdbc:redshift://<database-host-url>',
  user '<username>',
  password '<password>',
  forward_spark_s3_credentials 'true'
) AS
SELECT * FROM table_name;

Die SQL-API unterstützt nur das Erstellen neuer Tabellen und nicht das Überschreiben oder Anfügen.

R

Lesen von Daten mithilfe von R in Databricks Runtime 10.4 LTS und früher:

df <- read.df(
   NULL,
   "com.databricks.spark.redshift",
   tempdir = "s3a://<your-bucket>/<your-directory-path>",
   dbtable = "<your-table-name>",
   url = "jdbc:redshift://<the-rest-of-the-connection-string>")

Lesen von Daten mithilfe von R in Databricks Runtime 11.3 LTS und höher:

df <- read.df(
  NULL,
  "redshift",
  host = "hostname",
  port = "port",
  user = "username",
  password = "password",
  database = "database-name",
  dbtable = "schema-name.table-name",
  tempdir = "s3a://<your-bucket>/<your-directory-path>",
  forward_spark_s3_credentials = "true",
  dbtable = "<your-table-name>")

Scala

// Read data from a table using Databricks Runtime 10.4 LTS and below
val df = spark.read
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("forward_spark_s3_credentials", True)
  .load()

// Read data from a table using Databricks Runtime 11.3 LTS and above
val df = spark.read
  .format("redshift")
  .option("host", "hostname")
  .option("port", "port") /* Optional - will use default port 5439 if not specified. */
  .option("user", "username")
  .option("password", "password")
  .option("database", "database-name")
  .option("dbtable", "schema-name.table-name") /* if schema-name is not specified, default to "public". */
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("forward_spark_s3_credentials", true)
  .load()

// Read data from a query
val df = spark.read
  .format("redshift")
  .option("query", "select x, count(*) <your-table-name> group by x")
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("forward_spark_s3_credentials", True)
  .load()

// After you have applied transformations to the data, you can use
// the data source API to write the data back to another table

// Write back to a table
df.write
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .mode("error")
  .save()

// Write back to a table using IAM Role based authentication
df.write
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
  .mode("error")
  .save()

Empfehlungen zum Arbeiten mit Redshift

Bei der Abfrageausführung können große Datenmengen für S3 extrahiert werden. Wenn Sie vorhaben, mehrere Abfragen mit den gleichen Daten in Redshift auszuführen, empfiehlt Databricks, die extrahierten Daten mithilfe von Delta Lake zu speichern.

Konfiguration

Authentifizierung für S3 und Redshift

Die Datenquelle umfasst mehrere Netzwerkverbindungen, die im folgenden Diagramm dargestellt sind:

                            ┌───────┐
       ┌───────────────────>│  S3   │<─────────────────┐
       │    IAM or keys     └───────┘    IAM or keys   │
       │                        ^                      │
       │                        │ IAM or keys          │
       v                        v               ┌──────v────┐
┌────────────┐            ┌───────────┐         │┌──────────┴┐
│  Redshift  │            │  Spark    │         ││   Spark   │
│            │<──────────>│  Driver   │<────────>| Executors │
└────────────┘            └───────────┘          └───────────┘
               JDBC with                  Configured
               username /                     in
               password                     Spark
        (SSL enabled by default)

Die Datenquelle liest und schreibt Daten in S3, wenn Daten an/aus Redshift übertragen werden. Daher sind AWS-Anmeldeinformationen mit Lese- und Schreibzugriff auf einen S3-Bucket erforderlich (angegeben mithilfe des tempdir-Konfigurationsparameters).

Hinweis

Die Datenquelle bereinigt nicht die temporären Dateien, die sie in S3 erstellt. Daher wird empfohlen, einen dedizierten temporären S3-Bucket mit einer Objektlebenszykluskonfiguration zu verwenden, um sicherzustellen, dass temporäre Dateien nach einem angegebenen Ablaufzeitraum automatisch gelöscht werden. Weitere Informationen zum Verschlüsseln dieser Dateien finden Sie im Abschnitt Verschlüsselung in diesem Dokument. Sie können keinen externen Speicherort verwenden, der im Unity-Katalog als tempdir-Speicherort definiert ist.

In den folgenden Abschnitten werden die Konfigurationsoptionen für die Authentifizierung der einzelnen Verbindungen beschrieben:

Spark-Treiber zu Redshift

Der Spark-Treiber verbindet sich über JDBC mithilfe eines Benutzernamens und eines Kennworts mit Redshift. Redshift unterstützt nicht die Verwendung von IAM-Rollen, um diese Verbindung zu authentifizieren. Standardmäßig wird für diese Verbindung die SSL-Verschlüsselung verwendet; weitere Details finden Sie unter Verschlüsselung.

Spark zu S3

S3 dient als Vermittler zum Speichern von Massendaten beim Lesen oder Schreiben in Redshift. Spark verbindet sich mit S3 sowohl mithilfe der Hadoop FileSystem-Schnittstellen als auch direkt mithilfe des S3-Clients des Amazon Java SDK.

Hinweis

Sie können DBFS-Bereitstellungen nicht verwenden, um den Zugriff auf S3 for Redshift zu konfigurieren.

  • Festlegen von Schlüsseln in der Hadoop-Konfiguration: Sie können AWS-Schlüssel mithilfe von Hadoop-Konfigurationseigenschaften angeben. Wenn Ihre tempdir-Konfiguration auf ein s3a://-Dateisystem verweist, können Sie die Eigenschaften fs.s3a.access.key und fs.s3a.secret.key in einer Hadoop-XML-Konfigurationsdatei festlegen oder sc.hadoopConfiguration.set() aufrufen, um die globale Hadoop-Konfiguration von Spark zu konfigurieren. Wenn Sie ein s3n://-Dateisystem verwenden, können Sie die Legacykonfigurationsschlüssel angeben, wie im folgenden Beispiel gezeigt.

    Scala

    Wenn Sie beispielsweise das s3a-Dateisystem verwenden, fügen Sie Folgendes hinzu:

    sc.hadoopConfiguration.set("fs.s3a.access.key", "<your-access-key-id>")
    sc.hadoopConfiguration.set("fs.s3a.secret.key", "<your-secret-key>")
    

    Fügen Sie für das Legacydateisystem s3n Folgendes hinzu:

    sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "<your-access-key-id>")
    sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "<your-secret-key>")
    
    Python

    Der folgende Befehl basiert auf einigen internen Spark-Versionen, sollte jedoch mit allen PySpark-Versionen funktionieren. Es ist unwahrscheinlich, dass er sich in Zukunft ändert:

      sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", "<your-access-key-id>")
      sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "<your-secret-key>")
    

Redshift zu S3

Legen Sie die Option forward_spark_s3_credentials auf true fest, um die AWS-Schlüsselanmeldeinformationen, die Spark zum Herstellen einer Verbindung mit S3 verwendet, automatisch über JDBC an Redshift weiterzuleiten. Die JDBC-Abfrage bettet diese Anmeldeinformationen ein, sodass Databricks dringend empfiehlt, die SSL-Verschlüsselung der JDBC-Verbindung zu aktivieren.

Verschlüsselung

  • Sichern von JDBC: Sofern keine SSL-bezogenen Einstellungen in der JDBC-URL vorhanden sind, aktiviert die Datenquelle standardmäßig die SSL-Verschlüsselung und überprüft außerdem, dass der Redshift-Server vertrauenswürdig ist (das heißt sslmode=verify-full). Dazu wird ein Serverzertifikat automatisch von den Amazon-Servern heruntergeladen, wenn er zum ersten Mal benötigt wird. Wenn ein Fehler auftritt, wird eine vorab gebündelte Zertifikatdatei als Fallback verwendet. Dies gilt sowohl für die Redshift- als auch für die PostgreSQL-JDBC-Treiber.

    Falls Probleme mit diesem Feature auftreten, oder falls Sie SSL einfach deaktivieren möchten, können Sie .option("autoenablessl", "false") auf Ihrem DataFrameReader oder Ihrem DataFrameWriter aufrufen.

    Wenn Sie benutzerdefinierte SSL-bezogene Einstellungen angeben möchten, können Sie die Anweisungen in der Redshift-Dokumentation befolgen: Verwenden von SSL- und Serverzertifikaten in Java und JDBC-Treiberkonfigurationsoptionen Alle SSL-bezogenen Optionen, die in der JDBC-url vorhanden sind, die mit der Datenquelle verwendet werden, haben Vorrang (d. h. die automatische Konfiguration wird nicht ausgelöst).

  • Verschlüsseln von UNLOAD-Daten, die in S3 gespeichert sind (Daten, die beim Lesen von Redshift gespeichert sind): Gemäß der Redshift-Dokumentation zum Entladen von Daten nach S3 „verschlüsselt UNLOAD automatisch Datendateien mit der serverseitigen Amazon S3-Verschlüsselung (SSE-S3)“.

    Redshift unterstützt darüber hinaus die clientseitige Verschlüsselung mit einem benutzerdefinierten Schlüssel (siehe: Entladen verschlüsselter Datendateien), aber der Datenquelle fehlt die Möglichkeit, den erforderlichen symmetrischen Schlüssel anzugeben.

  • Verschlüsseln von COPY-Daten, die in S3 gespeichert sind (Daten, die beim Schreiben in Redshift gespeichert sind): Gemäß der Redshift-Dokumentation zum Laden verschlüsselter Datendateien von Amazon S3:

Sie können den COPY-Befehl verwenden, um Datendateien zu laden, die mithilfe der serverseitigen Verschlüsselung mit AWS-verwalteten Verschlüsselungsschlüsseln (SSE-S3 oder SSE-KMS), clientseitiger Verschlüsselung oder beidem in Amazon S3 hochgeladen wurden. COPY unterstützt keine serverseitige Amazon S3-Verschlüsselung mit einem vom Kunden bereitgestellten Schlüssel (SSE-C).

Parameter

Die Parameterzuordnung oder in Spark SQL bereitgestellte OPTIONS unterstützen die folgenden Einstellungen:

Parameter Erforderlich
dbtable Ja, es sei denn, Abfrage ist angegeben.
Abfrage Ja, es sei denn, dbtable ist angegeben.
Benutzer Nein
password Nein
url Ja
search_path Nein
aws_iam_role Nur, wenn Sie IAM-Rollen zum Autorisieren verwenden.
forward_spark_s3_credentials Nein
temporary_aws_access_key_id Nein
temporary_aws_secret_access_key Nein
temporary_aws_session_token Nein
tempdir Ja
jdbcdriver Nein
diststyle Nein
distkey Nein, es sei denn, Sie verwenden DISTSTYLE KEY
sortkeyspec Nein
usestagingtable (veraltet) Nein
Beschreibung Nein
preactions Nein
postactions Nein
extracopyoptions Nein
tempformat Nein
csvnullstring Nein
csvseparator Nein , Trennzeichen, das beim Schreiben temporärer Dateien verwendet werden soll, bei Festlegung von tempformat auf CSV oder
CSV GZIP. Hierbei muss es sich um ein gültiges ASCII-Zeichen handeln, z. B. „,“ oder „|“.
csvignoreleadingwhitespace Nein
csvignoretrailingwhitespace Nein
infer_timestamp_ntz_type Nein

Zusätzliche Konfigurationsoptionen

Konfigurieren der maximalen Größe von Zeichenfolgenspalten

Beim Erstellen von Redshift-Tabellen besteht das Standardverhalten darin, TEXT-Spalten für Zeichenfolgenspalten zu erstellen. Redshift speichert TEXT Spalten als VARCHAR(256), sodass diese Spalten maximal 256 Zeichen enthalten (Quelle).

Um größere Spalten zu unterstützen, können Sie das maxlength-Spaltenmetadatenfeld verwenden, um die maximale Länge einzelner Zeichenfolgenspalten anzugeben. Dies ist auch hilfreich bei der Implementierung von speicherplatzsparenden Leistungsoptimierungen, indem Spalten mit einer kleineren maximalen Länge als der Standardlänge deklariert werden.

Hinweis

Aufgrund von Einschränkungen in Spark unterstützen die SQL- und R-Sprach-APIs nicht die Änderung von Spaltenmetadaten.

Python

df = ... # the dataframe you'll want to write to Redshift

# Specify the custom width of each column
columnLengthMap = {
  "language_code": 2,
  "country_code": 2,
  "url": 2083,
}

# Apply each column metadata customization
for (colName, length) in columnLengthMap.iteritems():
  metadata = {'maxlength': length}
  df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))

df.write \
  .format("com.databricks.spark.redshift") \
  .option("url", jdbcURL) \
  .option("tempdir", s3TempDirectory) \
  .option("dbtable", sessionTable) \
  .save()

Scala

Hier sehen Sie ein Beispiel für das Aktualisieren der Metadatenfelder mehrerer Spalten mithilfe der Skala-API von Spark:

import org.apache.spark.sql.types.MetadataBuilder

// Specify the custom width of each column
val columnLengthMap = Map(
  "language_code" -> 2,
  "country_code" -> 2,
  "url" -> 2083
)

var df = ... // the dataframe you'll want to write to Redshift

// Apply each column metadata customization
columnLengthMap.foreach { case (colName, length) =>
  val metadata = new MetadataBuilder().putLong("maxlength", length).build()
  df = df.withColumn(colName, df(colName).as(colName, metadata))
}

df.write
  .format("com.databricks.spark.redshift")
  .option("url", jdbcURL)
  .option("tempdir", s3TempDirectory)
  .option("dbtable", sessionTable)
.save()

Festlegen eines benutzerdefinierten Spaltentyps

Wenn Sie einen Spaltentyp manuell festlegen müssen, können Sie die redshift_type-Spaltenmetadaten verwenden. Wenn Sie z. B. den Spark SQL Schema -> Redshift SQL-Typabgleicher überschreiben möchten, um einen benutzerdefinierten Spaltentyp zuzuweisen, können Sie wie folgt vorgehen:

Python

# Specify the custom type of each column
columnTypeMap = {
  "language_code": "CHAR(2)",
  "country_code": "CHAR(2)",
  "url": "BPCHAR(111)",
}

df = ... # the dataframe you'll want to write to Redshift

# Apply each column metadata customization
for (colName, colType) in columnTypeMap.iteritems():
  metadata = {'redshift_type': colType}
  df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))

Scala

import org.apache.spark.sql.types.MetadataBuilder

// Specify the custom type of each column
val columnTypeMap = Map(
  "language_code" -> "CHAR(2)",
  "country_code" -> "CHAR(2)",
  "url" -> "BPCHAR(111)"
)

var df = ... // the dataframe you'll want to write to Redshift

// Apply each column metadata customization
columnTypeMap.foreach { case (colName, colType) =>
  val metadata = new MetadataBuilder().putString("redshift_type", colType).build()
  df = df.withColumn(colName, df(colName).as(colName, metadata))
}

Konfigurieren der Spaltencodierung

Verwenden Sie beim Erstellen einer Tabelle das Spaltenmetadatenfeld encoding, um eine Komprimierungscodierung für jede Spalte anzugeben (siehe Amazon-Dokumente für verfügbare Codierungen).

Festlegen von Beschreibungen für Spalten

Mit Redshift können mit Spalten Beschreibungen verbunden werden, die in den meisten Abfragetools (mithilfe des COMMENT-Befehls) angezeigt werden sollten. Sie können das Spaltenmetadatenfeld description festlegen, um eine Beschreibung für einzelne Spalten anzugeben.

Abfrage-Pushdown in Redshift

Der Spark-Optimierer pusht die folgenden Operatoren nach unten in Redshift:

  • Filter
  • Project
  • Sort
  • Limit
  • Aggregation
  • Join

Innerhalb von Project und Filter unterstützt er die folgenden Ausdrücke:

  • Die meisten booleschen Logikoperatoren
  • Vergleiche
  • Grundlegende arithmetische Operationen
  • Numerische und Zeichenfolgenumwandlungen
  • Die meisten Zeichenfolgenfunktionen
  • Skalare Unterabfragen, wenn sie vollständig in Redshift gepusht werden können.

Hinweis

Dieser Pushdown unterstützt keine Ausdrücke, die auf Datums- und Zeitstempeln ausgeführt werden.

Innerhalb von Aggregation unterstützt er die folgenden Aggregationsfunktionen:

  • AVG
  • COUNT
  • MAX
  • MIN
  • SUM
  • STDDEV_SAMP
  • STDDEV_POP
  • VAR_SAMP
  • VAR_POP

kombiniert mit der DISTINCT-Klausel, sofern zutreffend.

Innerhalb von Join unterstützt er die folgenden Arten von Verknüpfungen:

  • INNER JOIN
  • LEFT OUTER JOIN
  • RIGHT OUTER JOIN
  • LEFT SEMI JOIN
  • LEFT ANTI JOIN
  • Unterabfragen, die der Optimierer neu in Join schreibt, z. B. WHERE EXISTS, WHERE NOT EXISTS

Hinweis

Join-Pushdown unterstützt FULL OUTER JOIN nicht.

Der Pushdown ist u. U. bei Abfragen mit LIMIT am nützlichsten. Eine Abfrage, wie z. B. SELECT * FROM large_redshift_table LIMIT 10 könnte sehr lange dauern, da die gesamte Tabelle zunächst per UNLOAD als Zwischenergebnis in S3 entladen würde. Mit dem Pushdown wird LIMIT in Redshift ausgeführt. Bei Abfragen mit Aggregationen hilft auch der Pushdown der Aggregation in Redshift bei der Reduzierung der Datenmenge, die übertragen werden muss.

Der Abfrage-Pushdown in Redshift ist standardmäßig aktiviert. Er kann deaktiviert werden, indem spark.databricks.redshift.pushdown auf false festgelegt wird. Selbst wenn er deaktiviert ist, führt Spark bei den Filtern weiterhin einen Pushdown aus und führt die Spaltenlöschung in Redshift durch.

Redshift-Treiberinstallation

Die Redshift-Datenquelle erfordert darüber hinaus einen mit Redshift kompatiblen JDBC-Treiber. Da Redshift auf dem PostgreSQL-Datenbanksystem basiert, können Sie den in Databricks Runtime enthaltenen PostgreSQL-JDBC-Treiber oder den von Amazon empfohlenen Redshift JDBC-Treiber verwenden. Es ist keine Installation erforderlich, um den PostgreSQL JDBC-Treiber zu verwenden. Die Version des in jeder Databricks Runtime-Version enthaltenen PostgreSQL JDBC-Treibers ist in den Databricks Runtime-Versionshinweisen aufgeführt.

So installieren Sie den Redshift JDBC-Treiber manuell:

  1. Download des Treibers von Amazon.
  2. Upload des Treibers in Ihren Azure Databricks-Arbeitsbereich. Weitere Informationen finden Sie unter Bibliotheken.
  3. Installation der Bibliothek auf Ihrem Cluster.

Hinweis

Databricks empfiehlt die Verwendung der neuesten Version des Redshift JDBC-Treibers. Versionen des Redshift JDBC-Treibers unter 1.2.41 haben die folgenden Einschränkungen:

  • Version 1.2.16 des Treibers gibt leere Daten zurück, wenn in einer SQL-Abfrage eine where-Klausel verwendet wird.
  • Versionen des Treibers vor 1.2.41 geben möglicherweise ungültige Ergebnisse zurück, da die NULL-Zulässigkeit einer Spalte fälschlicherweise als „Lässt keine Nullwerte zu“ anstelle von „Unbekannt“ angegeben wird.

Transaktionsgarantien

In diesem Abschnitt werden die Transaktionsgarantien der Redshift-Datenquelle für Spark beschrieben.

Allgemeiner Hintergrund der Redshift- und S3-Eigenschaften

Allgemeine Informationen zu Redshift-Transaktionsgarantien finden Sie in der Redshift-Dokumentation im Kapitel zum Verwalten gleichzeitiger Schreibvorgänge. Kurz zusammengefasst stellt Redshift eine serialisierbare Isolation gemäß der Dokumentation für den Redshift-Befehl BEGIN bereit:

[obwohl] Sie jede der vier Transaktionsisolationsstufen verwenden können, verarbeitet Amazon Redshift alle Isolationsstufen als serialisierbar.

In der Redshift-Dokumentation steht Folgendes darüber:

Amazon Redshift unterstützt ein standardmäßiges automatisches Commitverhalten, bei dem jeder separat ausgeführte SQL-Befehl einzeln ccommittet.

Daher sind einzelne Befehle wie COPY und UNLOAD atomisch und transaktional, während explizite BEGIN- und END-Befehle nur erforderlich sein sollten, um die Unteilbarkeit mehrerer Befehle oder Abfragen zu erzwingen.

Beim Lesen von und Schreiben in Redshift liest und schreibt die Datenquelle Daten in S3. Sowohl Spark als auch Redshift erzeugen eine partitionierte Ausgabe und speichern sie in mehreren Dateien in S3. Gemäß der Dokumentation zum Amazon S3-Datenkonsistenzmodell sind S3-Bucketauflistungsvorgänge letztendlich konsistent, sodass bei den Dateien besondere Maßnahmen ergriffen werden müssen, um fehlende oder unvollständige Daten aufgrund dieser Quelle letztendlicher Konsistenz zu vermeiden.

Garantien der Redshift-Datenquelle für Spark

Anfügen an eine vorhandene Tabelle

Beim Einfügen von Zeilen in Redshift verwendet die Datenquelle den BEFEHL COPY und gibt Manifeste an, die vor bestimmten letztendlich konsistenten S3-Vorgängen schützen. Folglich haben spark-redshift-Anhänge an vorhandenen Tabellen dieselben atomischen und transaktionalen Eigenschaften wie reguläre Redshift-COPY-Befehle.

Erstellen Sie eine neue Tabelle (SaveMode.CreateIfNotExists).

Das Erstellen einer neuen Tabelle ist ein zweistufiger Prozess, der aus einem CREATE TABLE-Befehl gefolgt von einem COPY-Befehl besteht, um die anfängliche Anzahl an Zeilen anzufügen. Beide Vorgänge werden in derselben Transaktion ausgeführt.

Überschreiben einer vorhandenen Tabelle

Standardmäßig verwendet die Datenquelle Transaktionen, um Überschreibungen auszuführen, die implementiert werden, indem die Zieltabelle gelöscht, eine neue leere Tabelle erstellt und Zeilen an diese angefügt werden.

Wenn die veraltete usestagingtable-Einstellung auf false festgelegt ist, committet die Datenquelle den DELETE TABLE-Befehl vor dem Anfügen von Zeilen an die neue Tabelle, indem die Unteilbarkeit des Überschreibungsvorgangs geopfert wird, wobei jedoch gleichzeitig die Menge des Stagingspeicherplatzes reduziert wird, den Redshift während des Überschreibens benötigt.

Abfrage-Redshift-Tabelle

In Abfragen wird der Redshift-Befehl UNLOAD verwendet, um eine Abfrage auszuführen und ihre Ergebnisse in S3 zu speichern und Manifeste zu verwenden, um vor bestimmten letztendlich konsistenten S3-Vorgängen zu schützen. Daher sollten Abfragen aus der Redshift-Datenquelle für Spark dieselben Konsistenzeigenschaften wie normale Redshift-Abfragen aufweisen.

Häufige Probleme und Lösungen

S3-Bucket- und Redshift-Cluster befinden sich in verschiedenen AWS-Regionen

Standardmäßig funktionieren S3- <-> Redshift-Kopien nicht, wenn sich der S3-Bucket- und der Redshift-Cluster in verschiedenen AWS-Regionen befinden.

Wenn Sie versuchen, eine Redshift-Tabelle zu lesen, wenn sich der S3-Bucket in einer anderen Region befindet, wird möglicherweise ein Fehler wie der folgende angezeigt:

ERROR: S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect.

Ebenso kann der Versuch, mit einem S3-Bucket in einem anderen Bereich in Redshift zu schreiben, den folgenden Fehler verursachen:

error:  Problem reading manifest file - S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect
  • Schreibt: Der Redshift COPY-Befehl unterstützt die explizite Spezifikation der S3-Bucketregion, sodass Sie Schreibvorgänge in Redshift in diesen Fällen ordnungsgemäß vornehmen können, indem Sie der extracopyoptions-Einstellung region 'the-region-name' hinzufügen. Verwenden Sie beispielsweise mit einem Bucket in der Region USA, Osten (Virginia) und der Skala-API:

    .option("extracopyoptions", "region 'us-east-1'")
    

    Alternativ können Sie die awsregion-Einstellung verwenden:

    .option("awsregion", "us-east-1")
    
  • Liest: Der Redshift UNLOAD-Befehl unterstützt auch die explizite Spezifikation der S3-Bucketregion. Sie können Lesevorgänge ordnungsgemäß vornehmen, indem Sie der awsregion-Einstellung die Region hinzufügen:

    .option("awsregion", "us-east-1")
    

Authentifizierungsfehler beim Verwenden eines Kennworts mit Sonderzeichen in der JDBC-URL

Wenn Sie den Benutzernamen und das Kennwort als Teil der JDBC-URL angeben und das Kennwort Sonderzeichen wie ;, ?oder & enthält, wird möglicherweise die folgende Ausnahme angezeigt:

java.sql.SQLException: [Amazon](500310) Invalid operation: password authentication failed for user 'xyz'

Dies wird durch Sonderzeichen im Benutzernamen oder Kennwort verursacht, die vom JDBC-Treiber nicht ordnungsgemäß entfernt werden. Stellen Sie sicher, dass Sie den Benutzernamen und das Kennwort mithilfe der entsprechenden DataFrame-Optionen user und password angeben. Weitere Informationen finden Sie unter Parameter.

Lange ausgeführte Spark-Abfrage hängt unbegrenzt, obwohl der entsprechende Redshift-Vorgang abgeschlossen ist.

Wenn Sie große Datenmengen aus und in Redshift lesen oder schreiben, hängt Ihre Spark-Abfrage möglicherweise unbegrenzt, auch wenn auf der AWS Redshift Monitoring-Seite angezeigt wird, dass der entsprechende LOAD- oder UNLOAD-Vorgang abgeschlossen ist und dass Cluster im Leerlauf ist. Dies wird durch die Verbindung zwischen Redshift und der Spark-Zeitüberschreitung verursacht. Um dies zu vermeiden, stellen Sie sicher, dass das JDBC-Flag tcpKeepAlive aktiviert ist und dass TCPKeepAliveMinutes auf einen niedrigen Wert festgelegt ist (z. B. 1).

Weitere Informationen finden Sie im Abschnitt zur Amazon Redshift JDBC-Treiberkonfiguration.

Semantik für Zeitstempel mit Zeitzone

Beim Lesen von Daten werden Redshift TIMESTAMP- und TIMESTAMPTZ-Datentypen dem Spark-TimestampType zugeordnet, und ein Wert wird in koordinierte Weltzeit (UTC) konvertiert und als UTC-Zeitstempel gespeichert. Bei einem Redshift TIMESTAMP wird die lokale Zeitzone angenommen, da der Wert keine Zeitzoneninformationen enthält. Beim Schreiben von Daten in eine Redshift-Tabelle wird ein Spark TimestampType dem Redshift-Datentyp TIMESTAMP zugeordnet.

Migrationsleitfaden

Die Datenquelle erfordert jetzt, dass Sie explizit forward_spark_s3_credentials festlegen, bevor Spark S3-Anmeldeinformationen an Redshift weitergeleitet werden. Diese Änderung hat keine Auswirkungen, wenn Sie die Authentifizierungsmechanismen aws_iam_role oder temporary_aws_* verwenden. Wenn Sie sich jedoch auf das alte Standardverhalten verlassen haben, müssen Sie forward_spark_s3_credentials jetzt explizit auf true festlegen, um Ihren vorherigen Redshift-zu-S3-Authentifizierungsmechanismus weiterhin verwenden zu können. Eine Erörterung der drei Authentifizierungsmechanismen und ihrer Sicherheits-Trade-offs finden Sie im Abschnitt Authentifizierung für S3 und Redshift in diesem Dokument.