Aracılığıyla paylaş


Azure Databricks kullanarak Amazon Redshift'i sorgulama

Azure Databricks ile Amazon Redshift'ten tablo okuyabilir ve yazabilirsiniz.

Önemli

Bu makalede açıklanan yapılandırmalar Deneysel'tir. Deneysel özellikler olduğu gibi sağlanır ve Databricks tarafından müşteri teknik desteği aracılığıyla desteklenmez. Tam sorgu federasyon desteği almak için bunun yerine, Azure Databricks kullanıcılarınızın Unity Kataloğu söz dizimi ve veri idaresi araçlarından yararlanmasını sağlayan Lakehouse Federation kullanmalısınız.

Databricks Redshift veri kaynağı, Redshift'e verimli bir şekilde veri aktarmak için Amazon S3 kullanır ve JDBC kullanarak Redshift'te uygun COPY ve UNLOAD komutları otomatik olarak tetikler.

Not

Databricks Runtime 11.3 LTS ve üzerinde Databricks Runtime, biçim seçeneği için anahtar sözcük kullanılarak redshift erişilebilen Redshift JDBC sürücüsünü içerir. Bkz . Databricks Runtime sürüm notları sürümleri ve her Databricks Runtime'da bulunan sürücü sürümleri için uyumluluk . Kullanıcı tarafından sağlanan sürücüler hala desteklenmektedir ve paketlenmiş JDBC sürücüsünden önceliklidir.

Databricks Runtime 10.4 LTS ve altında Redshift JDBC sürücüsünün el ile yüklenmesi gerekir ve sorgular biçim için sürücüyü (com.databricks.spark.redshift) kullanmalıdır. Bkz. Redshift sürücü yüklemesi.

Kullanım

Aşağıdaki örneklerde Redshift sürücüsüne bağlanma gösterilmektedir. url PostgreSQL JDBC sürücüsü kullanıyorsanız parametre değerlerini değiştirin.

AWS kimlik bilgilerinizi yapılandırdıktan sonra Veri kaynağını Python, SQL, R veya Scala'daki Spark veri kaynağı API'siyle kullanabilirsiniz.

Önemli

Unity Kataloğu'nda tanımlanan dış konumlar konum olarak tempdir desteklenmez.

Python

# Read data from a table using Databricks Runtime 10.4 LTS and below
df = (spark.read
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("forward_spark_s3_credentials", True)
  .load()
)

# Read data from a table using Databricks Runtime 11.3 LTS and above
df = (spark.read
  .format("redshift")
  .option("host", "hostname")
  .option("port", "port") # Optional - will use default port 5439 if not specified.
  .option("user", "username")
  .option("password", "password")
  .option("database", "database-name")
  .option("dbtable", "schema-name.table-name") # if schema-name is not specified, default to "public".
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("forward_spark_s3_credentials", True)
  .load()
)

# Read data from a query
df = (spark.read
  .format("redshift")
  .option("query", "select x, count(*) <your-table-name> group by x")
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("forward_spark_s3_credentials", True)
  .load()
)

# After you have applied transformations to the data, you can use
# the data source API to write the data back to another table

# Write back to a table
(df.write
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .mode("error")
  .save()
)

# Write back to a table using IAM Role based authentication
(df.write
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
  .mode("error")
  .save()
)

SQL

Databricks Runtime 10.4 LTS ve altında SQL kullanarak verileri okuyun:

DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table
USING redshift
OPTIONS (
  dbtable '<table-name>',
  tempdir 's3a://<bucket>/<directory-path>',
  url 'jdbc:redshift://<database-host-url>',
  user '<username>',
  password '<password>',
  forward_spark_s3_credentials 'true'
);
SELECT * FROM redshift_table;

Databricks Runtime 11.3 LTS ve üzeri üzerinde SQL kullanarak verileri okuyun:


DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table
USING redshift
OPTIONS (
  host '<hostname>',
  port '<port>', /* Optional - will use default port 5439 if not specified. *./
  user '<username>',
  password '<password>',
  database '<database-name>'
  dbtable '<schema-name>.<table-name>', /* if schema-name not provided, default to "public". */
  tempdir 's3a://<bucket>/<directory-path>',
  forward_spark_s3_credentials 'true'
);
SELECT * FROM redshift_table;

SQL kullanarak veri yazma:

DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table_new
USING redshift
OPTIONS (
  dbtable '<new-table-name>',
  tempdir 's3a://<bucket>/<directory-path>',
  url 'jdbc:redshift://<database-host-url>',
  user '<username>',
  password '<password>',
  forward_spark_s3_credentials 'true'
) AS
SELECT * FROM table_name;

SQL API'si yalnızca yeni tabloların oluşturulmasını destekler ve üzerine yazma veya ekleme işlemini desteklemez.

R

Databricks Runtime 10.4 LTS ve altında R kullanarak verileri okuyun:

df <- read.df(
   NULL,
   "com.databricks.spark.redshift",
   tempdir = "s3a://<your-bucket>/<your-directory-path>",
   dbtable = "<your-table-name>",
   url = "jdbc:redshift://<the-rest-of-the-connection-string>")

Databricks Runtime 11.3 LTS ve üzeri üzerinde R kullanarak verileri okuyun:

df <- read.df(
  NULL,
  "redshift",
  host = "hostname",
  port = "port",
  user = "username",
  password = "password",
  database = "database-name",
  dbtable = "schema-name.table-name",
  tempdir = "s3a://<your-bucket>/<your-directory-path>",
  forward_spark_s3_credentials = "true",
  dbtable = "<your-table-name>")

Scala

// Read data from a table using Databricks Runtime 10.4 LTS and below
val df = spark.read
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("forward_spark_s3_credentials", True)
  .load()

// Read data from a table using Databricks Runtime 11.3 LTS and above
val df = spark.read
  .format("redshift")
  .option("host", "hostname")
  .option("port", "port") /* Optional - will use default port 5439 if not specified. */
  .option("user", "username")
  .option("password", "password")
  .option("database", "database-name")
  .option("dbtable", "schema-name.table-name") /* if schema-name is not specified, default to "public". */
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("forward_spark_s3_credentials", true)
  .load()

// Read data from a query
val df = spark.read
  .format("redshift")
  .option("query", "select x, count(*) <your-table-name> group by x")
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("forward_spark_s3_credentials", True)
  .load()

// After you have applied transformations to the data, you can use
// the data source API to write the data back to another table

// Write back to a table
df.write
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .mode("error")
  .save()

// Write back to a table using IAM Role based authentication
df.write
  .format("redshift")
  .option("dbtable", table_name)
  .option("tempdir", "s3a://<bucket>/<directory-path>")
  .option("url", "jdbc:redshift://<database-host-url>")
  .option("user", username)
  .option("password", password)
  .option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
  .mode("error")
  .save()

Redshift ile çalışma önerileri

Sorgu yürütme, S3'e büyük miktarda veri ayıklayabilir. Redshift'te aynı verilere karşı birkaç sorgu gerçekleştirmeyi planlıyorsanız Databricks, delta lake kullanarak ayıklanan verilerin kaydedilmesini önerir.

Yapılandırma

S3 ve Redshift kimlik doğrulaması

Veri kaynağı, aşağıdaki diyagramda gösterildiği gibi birkaç ağ bağlantısı içerir:

                            ┌───────┐
       ┌───────────────────>│  S3   │<─────────────────┐
       │    IAM or keys     └───────┘    IAM or keys   │
       │                        ^                      │
       │                        │ IAM or keys          │
       v                        v               ┌──────v────┐
┌────────────┐            ┌───────────┐         │┌──────────┴┐
│  Redshift  │            │  Spark    │         ││   Spark   │
│            │<──────────>│  Driver   │<────────>| Executors │
└────────────┘            └───────────┘          └───────────┘
               JDBC with                  Configured
               username /                     in
               password                     Spark
        (SSL enabled by default)

Veri kaynağı, Verileri Redshift'e/Redshift'ten aktarırken S3'e okur ve yazar. Sonuç olarak, S3 demetine okuma ve yazma erişimi olan AWS kimlik bilgileri gerektirir (yapılandırma parametresi kullanılarak tempdir belirtilir).

Not

Veri kaynağı, S3'te oluşturduğu geçici dosyaları temizlemez. Sonuç olarak, geçici dosyaların belirtilen süre sonundan sonra otomatik olarak silinmesini sağlamak için nesne yaşam döngüsü yapılandırmasına sahip ayrılmış bir geçici S3 demeti kullanmanızı öneririz. Bu dosyaların nasıl şifrelenmesine ilişkin bir tartışma için bu belgenin Şifreleme bölümüne bakın. Unity Kataloğu'nda tanımlanan dış konumu konum olarak tempdir kullanamazsınız.

Aşağıdaki bölümlerde her bağlantının kimlik doğrulama yapılandırma seçenekleri açıklanmaktadır:

Redshift'e Spark sürücüsü

Spark sürücüsü, kullanıcı adı ve parola kullanarak JDBC aracılığıyla Redshift'e bağlanır. Redshift, bu bağlantının kimliğini doğrulamak için IAM rollerinin kullanımını desteklemez. Varsayılan olarak, bu bağlantı SSL şifrelemesi kullanır; Daha fazla ayrıntı için bkz . Şifreleme.

Spark'ı S3'e

S3, Redshift'ten okurken veya Redshift'e yazarken toplu verileri depolamak için bir aracı görevi görür. Spark, hem Hadoop FileSystem arabirimlerini hem de doğrudan Amazon Java SDK'sının S3 istemcisini kullanarak S3'e bağlanır.

Not

Redshift için S3'e erişimi yapılandırmak için DBFS bağlamalarını kullanamazsınız.

  • Hadoop konfederasyonunda anahtarları ayarlama: Hadoop yapılandırma özelliklerini kullanarak AWS anahtarlarını belirtebilirsiniz. Yapılandırmanız tempdir bir s3a:// dosya sistemine işaret ederse, Ve özelliklerini bir Hadoop XML yapılandırma dosyasında ayarlayabilir fs.s3a.access.keyfs.s3a.secret.key veya Spark'ın genel Hadoop yapılandırmasını yapılandırmak için çağrısı sc.hadoopConfiguration.set() yapabilirsiniz. Bir s3n:// dosya sistemi kullanıyorsanız, aşağıdaki örnekte gösterildiği gibi eski yapılandırma anahtarlarını sağlayabilirsiniz.

    Scala

    Örneğin, dosya sistemini kullanıyorsanız s3a şunları ekleyin:

    sc.hadoopConfiguration.set("fs.s3a.access.key", "<your-access-key-id>")
    sc.hadoopConfiguration.set("fs.s3a.secret.key", "<your-secret-key>")
    

    Eski s3n dosya sistemi için şunları ekleyin:

    sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "<your-access-key-id>")
    sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "<your-secret-key>")
    
    Python

    Aşağıdaki komut bazı Spark iç işlevlerine dayanır, ancak tüm PySpark sürümleriyle çalışması gerekir ve gelecekte değişme olasılığı düşüktür:

      sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", "<your-access-key-id>")
      sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "<your-secret-key>")
    

Redshift'i S3'e

Spark'ın forward_spark_s3_credentials JDBC üzerinden S3'e true bağlanmak için kullandığı AWS anahtar kimlik bilgilerini Redshift'e otomatik olarak iletme seçeneğini ayarlayın. JDBC sorgusu bu kimlik bilgilerini ekler, bu nedenle Databricks, JDBC bağlantısının SSL şifrelemesini etkinleştirmenizi kesinlikle önerir.

Şifreleme

  • JDBC güvenliğini sağlama: JDBC URL'sinde SSL ile ilgili ayarlar yoksa, veri kaynağı varsayılan olarak SSL şifrelemesini etkinleştirir ve Redshift sunucusunun güvenilir olduğunu da doğrular (yani). sslmode=verify-full Bunun için, bir sunucu sertifikası ilk kez gerektiğinde Amazon sunucularından otomatik olarak indirilir. Bu başarısız olursa, geri dönüş olarak önceden paketlenmiş bir sertifika dosyası kullanılır. Bu, hem Redshift hem de PostgreSQL JDBC sürücüleri için geçerlidir.

    Bu özellikle ilgili herhangi bir sorun olması veya SSL'yi devre dışı bırakmak istemeniz durumunda veya DataFrameWriterüzerinden DataFrameReader arayabilirsiniz.option("autoenablessl", "false").

    SSL ile ilgili özel ayarlar belirtmek istiyorsanız Redshift belgelerindeki yönergeleri izleyebilirsiniz: Java ve JDBC Sürücü Yapılandırma Seçeneklerinde SSL ve Sunucu Sertifikalarını kullanma Veri kaynağıyla kullanılan JDBC'de url bulunan SSL ile ilgili seçenekler önceliklidir (yani otomatik yapılandırma tetiklenmez).

  • S3'te depolanan UNLOAD verilerini şifreleme (Redshift'ten okurken depolanan veriler): Verileri S3'e Kaldırma hakkındaki Redshift belgelerine göre, "UNLOAD, Amazon S3 sunucu tarafı şifrelemesini (SSE-S3) kullanarak veri dosyalarını otomatik olarak şifreler."

    Redshift ayrıca özel anahtarla istemci tarafı şifrelemeyi de destekler (bkz. Şifrelenmiş Veri Dosyalarını Kaldırma) ancak veri kaynağında gerekli simetrik anahtarı belirtme özelliği yoktur.

  • S3'te depolanan COPY verilerini şifreleme (Redshift'e yazarken depolanan veriler): Amazon S3'ten Şifrelenmiş Veri Dosyalarını Yükleme hakkındaki Redshift belgelerine göre:

AWS tarafından yönetilen şifreleme anahtarları (SSE-S3 veya SSE-KMS), istemci tarafı şifrelemesi veya her ikisi ile sunucu tarafı şifreleme kullanarak Amazon S3'e yüklenen veri dosyalarını yüklemek için komutunu kullanabilirsiniz COPY . COPY, müşteri tarafından sağlanan bir anahtarla (SSE-C) Amazon S3 sunucu tarafı şifrelemesini desteklemez.

Parametreler

Spark SQL'de sağlanan parametre haritası veya SEÇENEKLER aşağıdaki ayarları destekler:

Parametre Zorunlu
dbtable Evet, sorgu belirtilmediği sürece.
query Evet, dbtable belirtilmediği sürece.
kullanıcı Hayır
password Hayır
url Yes
search_path Hayır
aws_iam_role Yalnızca yetkilendirmek için IAM rollerini kullanıyorsanız.
forward_spark_s3_credentials Hayır
temporary_aws_access_key_id Hayır
temporary_aws_secret_access_key Hayır
temporary_aws_session_token Hayır
tempdir Yes
jdbcdriver Hayır
daiststyle Hayır
distkey Hayır, kullanmadığınız sürece DISTSTYLE KEY
sortkeyspec Hayır
usestagingtable (Kullanım Dışı) Hayır
açıklama Hayır
ön işlemler Hayır
postalar Hayır
extracopyoptions Hayır
tempformat Hayır
csvnullstring Hayır
csvseparator Hayır , Geçici biçim veya olarak ayarlanmış CSV geçici dosyalar yazarken kullanılacak ayırıcı
CSV GZIP. Bu, "" veya| "," gibi geçerli bir ASCII karakteri olmalıdır.
csvignoreleadingwhitespace Hayır
csvignoretrailingwhitespace Hayır
infer_timestamp_ntz_type Hayır

Ek yapılandırma seçenekleri

Dize sütunlarının en büyük boyutunu yapılandırma

Redshift tabloları oluştururken varsayılan davranış, dize sütunları için sütunlar oluşturmaktır TEXT . Redshift sütunları olarak depolarTEXT, bu nedenle bu sütunların boyutu en fazla 256 karakterdir (kaynak).VARCHAR(256)

Daha büyük sütunları desteklemek için sütun meta veri alanını kullanarak maxlength tek tek dize sütunlarının uzunluk üst sınırını belirtebilirsiniz. Bu, varsayılandan daha küçük en uzunluğa sahip sütunları bildirerek alandan tasarruf sağlayan performans iyileştirmeleri uygulamak için de kullanışlıdır.

Not

Spark'taki sınırlamalar nedeniyle SQL ve R dili API'leri sütun meta verilerinde değişiklik yapmayı desteklemez.

Python

df = ... # the dataframe you'll want to write to Redshift

# Specify the custom width of each column
columnLengthMap = {
  "language_code": 2,
  "country_code": 2,
  "url": 2083,
}

# Apply each column metadata customization
for (colName, length) in columnLengthMap.iteritems():
  metadata = {'maxlength': length}
  df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))

df.write \
  .format("com.databricks.spark.redshift") \
  .option("url", jdbcURL) \
  .option("tempdir", s3TempDirectory) \
  .option("dbtable", sessionTable) \
  .save()

Scala

Spark'ın Scala API'sini kullanarak birden çok sütunun meta veri alanlarını güncelleştirme örneği aşağıda verilmiştir:

import org.apache.spark.sql.types.MetadataBuilder

// Specify the custom width of each column
val columnLengthMap = Map(
  "language_code" -> 2,
  "country_code" -> 2,
  "url" -> 2083
)

var df = ... // the dataframe you'll want to write to Redshift

// Apply each column metadata customization
columnLengthMap.foreach { case (colName, length) =>
  val metadata = new MetadataBuilder().putLong("maxlength", length).build()
  df = df.withColumn(colName, df(colName).as(colName, metadata))
}

df.write
  .format("com.databricks.spark.redshift")
  .option("url", jdbcURL)
  .option("tempdir", s3TempDirectory)
  .option("dbtable", sessionTable)
.save()

Özel sütun türü ayarlama

Bir sütun türünü el ile ayarlamanız gerekiyorsa, sütun meta verilerini kullanabilirsiniz redshift_type . Örneğin, kullanıcı tanımlı bir sütun türü atamak için tür eşleştiricisini geçersiz kılmak Spark SQL Schema -> Redshift SQL isterseniz aşağıdakileri yapabilirsiniz:

Python

# Specify the custom type of each column
columnTypeMap = {
  "language_code": "CHAR(2)",
  "country_code": "CHAR(2)",
  "url": "BPCHAR(111)",
}

df = ... # the dataframe you'll want to write to Redshift

# Apply each column metadata customization
for colName, colType in columnTypeMap.items():
  metadata = {'redshift_type': colType}
  df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))

Scala

import org.apache.spark.sql.types.MetadataBuilder

// Specify the custom type of each column
val columnTypeMap = Map(
  "language_code" -> "CHAR(2)",
  "country_code" -> "CHAR(2)",
  "url" -> "BPCHAR(111)"
)

var df = ... // the dataframe you'll want to write to Redshift

// Apply each column metadata customization
columnTypeMap.foreach { case (colName, colType) =>
  val metadata = new MetadataBuilder().putString("redshift_type", colType).build()
  df = df.withColumn(colName, df(colName).as(colName, metadata))
}

Sütun kodlamayı yapılandırma

Tablo oluştururken, her sütun için bir sıkıştırma kodlaması belirtmek için sütun meta veri alanını kullanın encoding (kullanılabilir kodlamalar için Amazon belgelerine bakın).

Sütunlarda açıklama ayarlama

Redshift, sütunların çoğu sorgu aracında (komutunu kullanarak) gösterilmesi gereken açıklamaların eklenmesini COMMENT sağlar. Tek tek sütunlar için bir açıklama belirtmek üzere sütun meta veri alanını ayarlayabilirsiniz description .

Redshift'e sorgu gönderme

Spark iyileştiricisi aşağıdaki işleçleri Redshift'e iletir:

  • Filter
  • Project
  • Sort
  • Limit
  • Aggregation
  • Join

ve Filteriçinde Project aşağıdaki ifadeleri destekler:

  • Çoğu Boole mantıksal işleci
  • Karşılaştırmalar
  • Temel aritmetik işlemler
  • Sayısal ve dize atamaları
  • Çoğu dize işlevi
  • Tamamen Redshift'e gönderilebilen Skaler alt sorgular.

Not

Bu gönderim, tarihlerde ve zaman damgalarında çalışan ifadeleri desteklemez.

içinde Aggregationaşağıdaki toplama işlevlerini destekler:

  • AVG
  • COUNT
  • MAX
  • MIN
  • SUM
  • STDDEV_SAMP
  • STDDEV_POP
  • VAR_SAMP
  • VAR_POP

varsa yan tümcesiyle DISTINCT birlikte.

içinde Joinaşağıdaki birleştirme türlerini destekler:

  • INNER JOIN
  • LEFT OUTER JOIN
  • RIGHT OUTER JOIN
  • LEFT SEMI JOIN
  • LEFT ANTI JOIN
  • İyileştirici tarafından yeniden Join yazılan alt sorgular; örneğin WHERE EXISTS, , WHERE NOT EXISTS

Not

Birleştirme anında iletme desteğine sahip değildir FULL OUTER JOIN.

İtme, ile LIMITyapılan sorgularda en yararlı olabilir. gibi SELECT * FROM large_redshift_table LIMIT 10 bir sorgu çok uzun sürebilir, bunun nedeni tablonun tamamının S3'e ara sonuç olarak UNLOADed olmasıdır. aşağı itme ile, LIMIT Redshift'te yürütülür. Toplamaları olan sorgularda toplamayı Redshift'e göndermek, aktarılması gereken veri miktarını azaltmaya da yardımcı olur.

Redshift'e sorgu gönderme varsayılan olarak etkindir. olarak ayarlanarak spark.databricks.redshift.pushdownfalsedevre dışı bırakılabilir. Devre dışı bırakıldığında bile Spark yine de filtreleri aşağı iter ve Redshift'e sütun eleme gerçekleştirir.

Redshift sürücüsü yüklemesi

Redshift veri kaynağı ayrıca Redshift uyumlu bir JDBC sürücüsü gerektirir. Redshift, PostgreSQL veritabanı sistemini temel aldığı için Databricks Runtime ile birlikte gelen PostgreSQL JDBC sürücüsünü veya Amazon tarafından önerilen Redshift JDBC sürücüsünü kullanabilirsiniz. PostgreSQL JDBC sürücüsünü kullanmak için yükleme gerekmez. Her Databricks Runtime sürümünde bulunan PostgreSQL JDBC sürücüsünün sürümü Databricks Runtime sürüm notlarında listelenir.

Redshift JDBC sürücüsünü el ile yüklemek için:

  1. Sürücüyü Amazon'dan indirin .
  2. Sürücüyü Azure Databricks çalışma alanınıza yükleyin. Bkz. Kitaplıklar.
  3. Kitaplığı kümenize yükleyin .

Not

Databricks, Redshift JDBC sürücüsünün en son sürümünü kullanmanızı önerir. 1.2.41'in altındaki Redshift JDBC sürücüsünün sürümleri aşağıdaki sınırlamalara sahiptir:

  • Sürücünün 1.2.16 sürümü, SQL sorgusunda yan where tümcesi kullanılırken boş veriler döndürür.
  • 1.2.41'in altındaki sürücünün sürümleri geçersiz sonuçlar döndürebilir çünkü bir sütunun null atanabilirliği "Bilinmiyor" yerine yanlışlıkla "Null Atanamaz" olarak bildirilir.

İşlem garantileri

Bu bölümde, Spark için Redshift veri kaynağının işlem garantileri açıklanmaktadır.

Redshift ve S3 özelliklerinde genel arka plan

Redshift işlem garantileri hakkında genel bilgi için Redshift belgelerindeki Eşzamanlı Yazma İşlemlerini Yönetme bölümüne bakın. Özetle Redshift, Redshift BEGIN komutunun belgelerine göre serileştirilebilir yalıtım sağlar:

Dört işlem yalıtım düzeyinden herhangi birini kullanabilirsiniz ancak Amazon Redshift tüm yalıtım düzeylerini serileştirilebilir olarak işler.

Redshift belgelerine göre:

Amazon Redshift, ayrı ayrı yürütülen her SQL komutunun ayrı ayrı işlediği varsayılan otomatik işleme davranışını destekler.

Bu nedenle, ve UNLOAD gibi COPY tek tek komutlar atomik ve işlemseldir, ancak açıktır BEGIN ve END yalnızca birden çok komut veya sorgunun bölünmezliğini zorlamak için gereklidir.

Redshift'ten okurken ve Redshift'e yazarken, veri kaynağı S3'te verileri okur ve yazar. Hem Spark hem de Redshift bölümlenmiş çıkış üretir ve S3'te birden çok dosyada depolar. Amazon S3 Veri Tutarlılığı Modeli belgelerine göre, S3 demet listeleme işlemleri sonunda tutarlıdır, bu nedenle dosyaların bu nihai tutarlılık kaynağı nedeniyle eksik veya eksik verileri önlemek için özel uzunluklara gitmesi gerekir.

Spark için Redshift veri kaynağının garantileri

Var olan bir tablonun sonuna ekleme

Redshift'e satır eklerken, veri kaynağı COPY komutunu kullanır ve nihai olarak tutarlı olan belirli S3 işlemlerine karşı koruma sağlamak için bildirimleri belirtir. Sonuç olarak, spark-redshift mevcut tablolara eklemeler normal Redshift COPY komutları ile aynı atomik ve işlem özelliklerine sahiptir.

Yeni tablo oluşturma (SaveMode.CreateIfNotExists)

Yeni tablo oluşturmak, ilk satır kümesini eklemeye yönelik bir CREATE TABLE komutun ardından COPY komutundan oluşan iki adımlı bir işlemdir. Her iki işlem de aynı işlemde gerçekleştirilir.

Varolan tablonun üzerine yazma

Varsayılan olarak, veri kaynağı hedef tabloyu silerek, yeni boş bir tablo oluşturarak ve satır ekleyerek uygulanan üzerine yazma işlemleri gerçekleştirmek için işlemleri kullanır.

Kullanım dışı usestagingtable bırakılan ayar olarak ayarlanırsa false, veri kaynağı yeni tabloya satır eklemeden önce komutu işler DELETE TABLE ve üzerine yazma işleminin bölünmezliğinden ödün verir, ancak redshift'in üzerine yazma sırasında ihtiyaç duyduğu hazırlama alanı miktarını azaltır.

Redshift tablosunu sorgulama

Sorgular redshift UNLOAD komutunu kullanarak sorguyu yürütür ve sonuçlarını S3'e kaydeder ve sonunda tutarlı olan belirli S3 işlemlerine karşı koruma sağlamak için bildirimleri kullanır. Sonuç olarak, Spark için Redshift veri kaynağından gelen sorgular, normal Redshift sorguları ile aynı tutarlılık özelliklerine sahip olmalıdır.

Yaygın sorunlar ve çözümler

S3 demeti ve Redshift kümesi farklı AWS bölgelerinde

Varsayılan olarak, S3 <demeti ve Redshift kümesi farklı AWS bölgelerindeyse S3 -> Redshift kopyaları çalışmaz.

S3 demeti farklı bir bölgedeyken Redshift tablosunu okumaya çalışırsanız, aşağıdaki gibi bir hata görebilirsiniz:

ERROR: S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect.

Benzer şekilde, farklı bir bölgedeki S3 demetini kullanarak Redshift'e yazma girişimi aşağıdaki hataya neden olabilir:

error:  Problem reading manifest file - S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect
  • Yazmalar: Redshift COPY komutu S3 demeti bölgesinin açık belirtimini destekler, böylece bu durumlarda ayara ekleyerek region 'the-region-name' Redshift'e extracopyoptions yazma işlemini düzgün bir şekilde yapabilirsiniz. Örneğin, ABD Doğu (Virginia) bölgesinde ve Scala API'sinde bir demetle şunu kullanın:

    .option("extracopyoptions", "region 'us-east-1'")
    

    Alternatif olarak şu ayarı kullanabilirsiniz awsregion :

    .option("awsregion", "us-east-1")
    
  • Okuma: Redshift UNLOAD komutu S3 demeti bölgesinin belirtimini de destekler. Ayarına bölgeyi awsregion ekleyerek okumaların düzgün çalışmasını sağlayabilirsiniz:

    .option("awsregion", "us-east-1")
    

JDBC URL'sinde özel karakterler içeren bir parola kullanılırken kimlik doğrulama hatası

Kullanıcı adı ve parolayı JDBC URL'sinin bir parçası olarak sağlıyorsanız ve parola , ?veya &gibi ;özel karakterler içeriyorsa aşağıdaki özel durumu görebilirsiniz:

java.sql.SQLException: [Amazon](500310) Invalid operation: password authentication failed for user 'xyz'

Bunun nedeni kullanıcı adı veya paroladaki özel karakterlerin JDBC sürücüsü tarafından doğru şekilde kaçılmamasıdır. İlgili DataFrame seçeneklerini userpasswordve kullanarak kullanıcı adını ve parolayı belirttiğinizden emin olun. Daha fazla bilgi için bkz . Parametreler.

Uzun süre çalışan Spark sorgusu, karşılık gelen Redshift işlemi gerçekleştirilse bile süresiz olarak kilitleniyor

Redshift'ten ve Redshift'e büyük miktarda veri okuyor veya yazıyorsanız, AWS Redshift İzleme sayfası ilgili LOAD veya UNLOAD işlemin tamamlandığını ve kümenin boşta olduğunu gösterse bile Spark sorgunuz süresiz olarak yanıt vermemeye başlayabilir. Bunun nedeni Redshift ile Spark arasındaki bağlantının zaman aşımına neden olmasıdır. Bunu önlemek için JDBC bayrağının etkinleştirildiğinden tcpKeepAlive ve TCPKeepAliveMinutes düşük bir değere ayarlandığından emin olun (örneğin, 1).

Daha fazla bilgi için bkz . Amazon Redshift JDBC Sürücü Yapılandırması.

Saat dilimi semantiği ile zaman damgası

Veriler okunurken hem Redshift TIMESTAMP hem TIMESTAMPTZ de veri türleri Spark TimestampTypeile eşlenir ve bir değer Eşgüdümlü Evrensel Saat'e (UTC) dönüştürülür ve UTC zaman damgası olarak depolanır. Redshift TIMESTAMPiçin, değerin saat dilimi bilgisi olmadığından yerel saat dilimi varsayılır. Redshift tablosuna veri yazarken Spark, TimestampType Redshift TIMESTAMP veri türüne eşlenir.

Geçiş kılavuzu

Veri kaynağı artık Spark S3 kimlik bilgileri Redshift'e iletilmeden önce açıkça ayarlamanızı forward_spark_s3_credentials gerektirir. veya temporary_aws_* kimlik doğrulama mekanizmalarını kullanıyorsanız bu değişikliğin aws_iam_role hiçbir etkisi olmaz. Ancak, eski varsayılan davranışı kullandıysanız, önceki Redshift to S3 kimlik doğrulama mekanizmanızı kullanmaya devam etmek için true artık açıkça olarak olarak ayarlamalısınızforward_spark_s3_credentials. Üç kimlik doğrulama mekanizması ve bunların güvenlikle ilgili dengeleri hakkında bilgi edinmek için bu belgenin S3 ve Redshift kimlik doğrulama bölümüne bakın.