Azure Databricks kullanarak Amazon Redshift'i sorgulama
Azure Databricks ile Amazon Redshift'ten tablo okuyabilir ve yazabilirsiniz.
Önemli
Bu makalede açıklanan yapılandırmalar Deneysel'tir. Deneysel özellikler olduğu gibi sağlanır ve Databricks tarafından müşteri teknik desteği aracılığıyla desteklenmez. Tam sorgu federasyon desteği almak için bunun yerine, Azure Databricks kullanıcılarınızın Unity Kataloğu söz dizimi ve veri idaresi araçlarından yararlanmasını sağlayan Lakehouse Federation kullanmalısınız.
Databricks Redshift veri kaynağı, Redshift'e verimli bir şekilde veri aktarmak için Amazon S3 kullanır ve JDBC kullanarak Redshift'te uygun COPY
ve UNLOAD
komutları otomatik olarak tetikler.
Not
Databricks Runtime 11.3 LTS ve üzerinde Databricks Runtime, biçim seçeneği için anahtar sözcük kullanılarak redshift
erişilebilen Redshift JDBC sürücüsünü içerir. Bkz . Databricks Runtime sürüm notları sürümleri ve her Databricks Runtime'da bulunan sürücü sürümleri için uyumluluk . Kullanıcı tarafından sağlanan sürücüler hala desteklenmektedir ve paketlenmiş JDBC sürücüsünden önceliklidir.
Databricks Runtime 10.4 LTS ve altında Redshift JDBC sürücüsünün el ile yüklenmesi gerekir ve sorgular biçim için sürücüyü (com.databricks.spark.redshift
) kullanmalıdır. Bkz. Redshift sürücü yüklemesi.
Kullanım
Aşağıdaki örneklerde Redshift sürücüsüne bağlanma gösterilmektedir. url
PostgreSQL JDBC sürücüsü kullanıyorsanız parametre değerlerini değiştirin.
AWS kimlik bilgilerinizi yapılandırdıktan sonra Veri kaynağını Python, SQL, R veya Scala'daki Spark veri kaynağı API'siyle kullanabilirsiniz.
Önemli
Unity Kataloğu'nda tanımlanan dış konumlar konum olarak tempdir
desteklenmez.
Python
# Read data from a table using Databricks Runtime 10.4 LTS and below
df = (spark.read
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("forward_spark_s3_credentials", True)
.load()
)
# Read data from a table using Databricks Runtime 11.3 LTS and above
df = (spark.read
.format("redshift")
.option("host", "hostname")
.option("port", "port") # Optional - will use default port 5439 if not specified.
.option("user", "username")
.option("password", "password")
.option("database", "database-name")
.option("dbtable", "schema-name.table-name") # if schema-name is not specified, default to "public".
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("forward_spark_s3_credentials", True)
.load()
)
# Read data from a query
df = (spark.read
.format("redshift")
.option("query", "select x, count(*) <your-table-name> group by x")
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("forward_spark_s3_credentials", True)
.load()
)
# After you have applied transformations to the data, you can use
# the data source API to write the data back to another table
# Write back to a table
(df.write
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.mode("error")
.save()
)
# Write back to a table using IAM Role based authentication
(df.write
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
.mode("error")
.save()
)
SQL
Databricks Runtime 10.4 LTS ve altında SQL kullanarak verileri okuyun:
DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table
USING redshift
OPTIONS (
dbtable '<table-name>',
tempdir 's3a://<bucket>/<directory-path>',
url 'jdbc:redshift://<database-host-url>',
user '<username>',
password '<password>',
forward_spark_s3_credentials 'true'
);
SELECT * FROM redshift_table;
Databricks Runtime 11.3 LTS ve üzeri üzerinde SQL kullanarak verileri okuyun:
DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table
USING redshift
OPTIONS (
host '<hostname>',
port '<port>', /* Optional - will use default port 5439 if not specified. *./
user '<username>',
password '<password>',
database '<database-name>'
dbtable '<schema-name>.<table-name>', /* if schema-name not provided, default to "public". */
tempdir 's3a://<bucket>/<directory-path>',
forward_spark_s3_credentials 'true'
);
SELECT * FROM redshift_table;
SQL kullanarak veri yazma:
DROP TABLE IF EXISTS redshift_table;
CREATE TABLE redshift_table_new
USING redshift
OPTIONS (
dbtable '<new-table-name>',
tempdir 's3a://<bucket>/<directory-path>',
url 'jdbc:redshift://<database-host-url>',
user '<username>',
password '<password>',
forward_spark_s3_credentials 'true'
) AS
SELECT * FROM table_name;
SQL API'si yalnızca yeni tabloların oluşturulmasını destekler ve üzerine yazma veya ekleme işlemini desteklemez.
R
Databricks Runtime 10.4 LTS ve altında R kullanarak verileri okuyun:
df <- read.df(
NULL,
"com.databricks.spark.redshift",
tempdir = "s3a://<your-bucket>/<your-directory-path>",
dbtable = "<your-table-name>",
url = "jdbc:redshift://<the-rest-of-the-connection-string>")
Databricks Runtime 11.3 LTS ve üzeri üzerinde R kullanarak verileri okuyun:
df <- read.df(
NULL,
"redshift",
host = "hostname",
port = "port",
user = "username",
password = "password",
database = "database-name",
dbtable = "schema-name.table-name",
tempdir = "s3a://<your-bucket>/<your-directory-path>",
forward_spark_s3_credentials = "true",
dbtable = "<your-table-name>")
Scala
// Read data from a table using Databricks Runtime 10.4 LTS and below
val df = spark.read
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("forward_spark_s3_credentials", True)
.load()
// Read data from a table using Databricks Runtime 11.3 LTS and above
val df = spark.read
.format("redshift")
.option("host", "hostname")
.option("port", "port") /* Optional - will use default port 5439 if not specified. */
.option("user", "username")
.option("password", "password")
.option("database", "database-name")
.option("dbtable", "schema-name.table-name") /* if schema-name is not specified, default to "public". */
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("forward_spark_s3_credentials", true)
.load()
// Read data from a query
val df = spark.read
.format("redshift")
.option("query", "select x, count(*) <your-table-name> group by x")
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("forward_spark_s3_credentials", True)
.load()
// After you have applied transformations to the data, you can use
// the data source API to write the data back to another table
// Write back to a table
df.write
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.mode("error")
.save()
// Write back to a table using IAM Role based authentication
df.write
.format("redshift")
.option("dbtable", table_name)
.option("tempdir", "s3a://<bucket>/<directory-path>")
.option("url", "jdbc:redshift://<database-host-url>")
.option("user", username)
.option("password", password)
.option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
.mode("error")
.save()
Redshift ile çalışma önerileri
Sorgu yürütme, S3'e büyük miktarda veri ayıklayabilir. Redshift'te aynı verilere karşı birkaç sorgu gerçekleştirmeyi planlıyorsanız Databricks, delta lake kullanarak ayıklanan verilerin kaydedilmesini önerir.
Yapılandırma
S3 ve Redshift kimlik doğrulaması
Veri kaynağı, aşağıdaki diyagramda gösterildiği gibi birkaç ağ bağlantısı içerir:
┌───────┐
┌───────────────────>│ S3 │<─────────────────┐
│ IAM or keys └───────┘ IAM or keys │
│ ^ │
│ │ IAM or keys │
v v ┌──────v────┐
┌────────────┐ ┌───────────┐ │┌──────────┴┐
│ Redshift │ │ Spark │ ││ Spark │
│ │<──────────>│ Driver │<────────>| Executors │
└────────────┘ └───────────┘ └───────────┘
JDBC with Configured
username / in
password Spark
(SSL enabled by default)
Veri kaynağı, Verileri Redshift'e/Redshift'ten aktarırken S3'e okur ve yazar. Sonuç olarak, S3 demetine okuma ve yazma erişimi olan AWS kimlik bilgileri gerektirir (yapılandırma parametresi kullanılarak tempdir
belirtilir).
Not
Veri kaynağı, S3'te oluşturduğu geçici dosyaları temizlemez. Sonuç olarak, geçici dosyaların belirtilen süre sonundan sonra otomatik olarak silinmesini sağlamak için nesne yaşam döngüsü yapılandırmasına sahip ayrılmış bir geçici S3 demeti kullanmanızı öneririz. Bu dosyaların nasıl şifrelenmesine ilişkin bir tartışma için bu belgenin Şifreleme bölümüne bakın. Unity Kataloğu'nda tanımlanan dış konumu konum olarak tempdir
kullanamazsınız.
Aşağıdaki bölümlerde her bağlantının kimlik doğrulama yapılandırma seçenekleri açıklanmaktadır:
Redshift'e Spark sürücüsü
Spark sürücüsü, kullanıcı adı ve parola kullanarak JDBC aracılığıyla Redshift'e bağlanır. Redshift, bu bağlantının kimliğini doğrulamak için IAM rollerinin kullanımını desteklemez. Varsayılan olarak, bu bağlantı SSL şifrelemesi kullanır; Daha fazla ayrıntı için bkz . Şifreleme.
Spark'ı S3'e
S3, Redshift'ten okurken veya Redshift'e yazarken toplu verileri depolamak için bir aracı görevi görür. Spark, hem Hadoop FileSystem arabirimlerini hem de doğrudan Amazon Java SDK'sının S3 istemcisini kullanarak S3'e bağlanır.
Not
Redshift için S3'e erişimi yapılandırmak için DBFS bağlamalarını kullanamazsınız.
Hadoop konfederasyonunda anahtarları ayarlama: Hadoop yapılandırma özelliklerini kullanarak AWS anahtarlarını belirtebilirsiniz. Yapılandırmanız
tempdir
birs3a://
dosya sistemine işaret ederse, Ve özelliklerini bir Hadoop XML yapılandırma dosyasında ayarlayabilirfs.s3a.access.key
fs.s3a.secret.key
veya Spark'ın genel Hadoop yapılandırmasını yapılandırmak için çağrısısc.hadoopConfiguration.set()
yapabilirsiniz. Birs3n://
dosya sistemi kullanıyorsanız, aşağıdaki örnekte gösterildiği gibi eski yapılandırma anahtarlarını sağlayabilirsiniz.Scala
Örneğin, dosya sistemini kullanıyorsanız
s3a
şunları ekleyin:sc.hadoopConfiguration.set("fs.s3a.access.key", "<your-access-key-id>") sc.hadoopConfiguration.set("fs.s3a.secret.key", "<your-secret-key>")
Eski
s3n
dosya sistemi için şunları ekleyin:sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "<your-access-key-id>") sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "<your-secret-key>")
Python
Aşağıdaki komut bazı Spark iç işlevlerine dayanır, ancak tüm PySpark sürümleriyle çalışması gerekir ve gelecekte değişme olasılığı düşüktür:
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", "<your-access-key-id>") sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "<your-secret-key>")
Redshift'i S3'e
Spark'ın forward_spark_s3_credentials
JDBC üzerinden S3'e true
bağlanmak için kullandığı AWS anahtar kimlik bilgilerini Redshift'e otomatik olarak iletme seçeneğini ayarlayın. JDBC sorgusu bu kimlik bilgilerini ekler, bu nedenle Databricks, JDBC bağlantısının SSL şifrelemesini etkinleştirmenizi kesinlikle önerir.
Şifreleme
JDBC güvenliğini sağlama: JDBC URL'sinde SSL ile ilgili ayarlar yoksa, veri kaynağı varsayılan olarak SSL şifrelemesini etkinleştirir ve Redshift sunucusunun güvenilir olduğunu da doğrular (yani).
sslmode=verify-full
Bunun için, bir sunucu sertifikası ilk kez gerektiğinde Amazon sunucularından otomatik olarak indirilir. Bu başarısız olursa, geri dönüş olarak önceden paketlenmiş bir sertifika dosyası kullanılır. Bu, hem Redshift hem de PostgreSQL JDBC sürücüleri için geçerlidir.Bu özellikle ilgili herhangi bir sorun olması veya SSL'yi devre dışı bırakmak istemeniz durumunda veya
DataFrameWriter
üzerindenDataFrameReader
arayabilirsiniz.option("autoenablessl", "false")
.SSL ile ilgili özel ayarlar belirtmek istiyorsanız Redshift belgelerindeki yönergeleri izleyebilirsiniz: Java ve JDBC Sürücü Yapılandırma Seçeneklerinde SSL ve Sunucu Sertifikalarını kullanma Veri kaynağıyla kullanılan JDBC'de
url
bulunan SSL ile ilgili seçenekler önceliklidir (yani otomatik yapılandırma tetiklenmez).S3'te depolanan UNLOAD verilerini şifreleme (Redshift'ten okurken depolanan veriler): Verileri S3'e Kaldırma hakkındaki Redshift belgelerine göre, "UNLOAD, Amazon S3 sunucu tarafı şifrelemesini (SSE-S3) kullanarak veri dosyalarını otomatik olarak şifreler."
Redshift ayrıca özel anahtarla istemci tarafı şifrelemeyi de destekler (bkz. Şifrelenmiş Veri Dosyalarını Kaldırma) ancak veri kaynağında gerekli simetrik anahtarı belirtme özelliği yoktur.
S3'te depolanan COPY verilerini şifreleme (Redshift'e yazarken depolanan veriler): Amazon S3'ten Şifrelenmiş Veri Dosyalarını Yükleme hakkındaki Redshift belgelerine göre:
AWS tarafından yönetilen şifreleme anahtarları (SSE-S3 veya SSE-KMS), istemci tarafı şifrelemesi veya her ikisi ile sunucu tarafı şifreleme kullanarak Amazon S3'e yüklenen veri dosyalarını yüklemek için komutunu kullanabilirsiniz COPY
. COPY, müşteri tarafından sağlanan bir anahtarla (SSE-C) Amazon S3 sunucu tarafı şifrelemesini desteklemez.
Parametreler
Spark SQL'de sağlanan parametre haritası veya SEÇENEKLER aşağıdaki ayarları destekler:
Parametre | Zorunlu | ||
---|---|---|---|
dbtable | Evet, sorgu belirtilmediği sürece. | ||
query | Evet, dbtable belirtilmediği sürece. | ||
kullanıcı | Hayır | ||
password | Hayır | ||
url | Yes | ||
search_path | Hayır | ||
aws_iam_role | Yalnızca yetkilendirmek için IAM rollerini kullanıyorsanız. | ||
forward_spark_s3_credentials | Hayır | ||
temporary_aws_access_key_id | Hayır | ||
temporary_aws_secret_access_key | Hayır | ||
temporary_aws_session_token | Hayır | ||
tempdir | Yes | ||
jdbcdriver | Hayır | ||
daiststyle | Hayır | ||
distkey | Hayır, kullanmadığınız sürece DISTSTYLE KEY |
||
sortkeyspec | Hayır | ||
usestagingtable (Kullanım Dışı) | Hayır | ||
açıklama | Hayır | ||
ön işlemler | Hayır | ||
postalar | Hayır | ||
extracopyoptions | Hayır | ||
tempformat | Hayır | ||
csvnullstring | Hayır | ||
csvseparator | Hayır | , |
Geçici biçim veya olarak ayarlanmış CSV geçici dosyalar yazarken kullanılacak ayırıcıCSV GZIP . Bu, "" veya| ", " gibi geçerli bir ASCII karakteri olmalıdır. |
csvignoreleadingwhitespace | Hayır | ||
csvignoretrailingwhitespace | Hayır | ||
infer_timestamp_ntz_type | Hayır |
Ek yapılandırma seçenekleri
Dize sütunlarının en büyük boyutunu yapılandırma
Redshift tabloları oluştururken varsayılan davranış, dize sütunları için sütunlar oluşturmaktır TEXT
. Redshift sütunları olarak depolarTEXT
, bu nedenle bu sütunların boyutu en fazla 256 karakterdir (kaynak).VARCHAR(256)
Daha büyük sütunları desteklemek için sütun meta veri alanını kullanarak maxlength
tek tek dize sütunlarının uzunluk üst sınırını belirtebilirsiniz. Bu, varsayılandan daha küçük en uzunluğa sahip sütunları bildirerek alandan tasarruf sağlayan performans iyileştirmeleri uygulamak için de kullanışlıdır.
Not
Spark'taki sınırlamalar nedeniyle SQL ve R dili API'leri sütun meta verilerinde değişiklik yapmayı desteklemez.
Python
df = ... # the dataframe you'll want to write to Redshift
# Specify the custom width of each column
columnLengthMap = {
"language_code": 2,
"country_code": 2,
"url": 2083,
}
# Apply each column metadata customization
for (colName, length) in columnLengthMap.iteritems():
metadata = {'maxlength': length}
df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))
df.write \
.format("com.databricks.spark.redshift") \
.option("url", jdbcURL) \
.option("tempdir", s3TempDirectory) \
.option("dbtable", sessionTable) \
.save()
Scala
Spark'ın Scala API'sini kullanarak birden çok sütunun meta veri alanlarını güncelleştirme örneği aşağıda verilmiştir:
import org.apache.spark.sql.types.MetadataBuilder
// Specify the custom width of each column
val columnLengthMap = Map(
"language_code" -> 2,
"country_code" -> 2,
"url" -> 2083
)
var df = ... // the dataframe you'll want to write to Redshift
// Apply each column metadata customization
columnLengthMap.foreach { case (colName, length) =>
val metadata = new MetadataBuilder().putLong("maxlength", length).build()
df = df.withColumn(colName, df(colName).as(colName, metadata))
}
df.write
.format("com.databricks.spark.redshift")
.option("url", jdbcURL)
.option("tempdir", s3TempDirectory)
.option("dbtable", sessionTable)
.save()
Özel sütun türü ayarlama
Bir sütun türünü el ile ayarlamanız gerekiyorsa, sütun meta verilerini kullanabilirsiniz redshift_type
. Örneğin, kullanıcı tanımlı bir sütun türü atamak için tür eşleştiricisini geçersiz kılmak Spark SQL Schema -> Redshift SQL
isterseniz aşağıdakileri yapabilirsiniz:
Python
# Specify the custom type of each column
columnTypeMap = {
"language_code": "CHAR(2)",
"country_code": "CHAR(2)",
"url": "BPCHAR(111)",
}
df = ... # the dataframe you'll want to write to Redshift
# Apply each column metadata customization
for colName, colType in columnTypeMap.items():
metadata = {'redshift_type': colType}
df = df.withColumn(colName, df[colName].alias(colName, metadata=metadata))
Scala
import org.apache.spark.sql.types.MetadataBuilder
// Specify the custom type of each column
val columnTypeMap = Map(
"language_code" -> "CHAR(2)",
"country_code" -> "CHAR(2)",
"url" -> "BPCHAR(111)"
)
var df = ... // the dataframe you'll want to write to Redshift
// Apply each column metadata customization
columnTypeMap.foreach { case (colName, colType) =>
val metadata = new MetadataBuilder().putString("redshift_type", colType).build()
df = df.withColumn(colName, df(colName).as(colName, metadata))
}
Sütun kodlamayı yapılandırma
Tablo oluştururken, her sütun için bir sıkıştırma kodlaması belirtmek için sütun meta veri alanını kullanın encoding
(kullanılabilir kodlamalar için Amazon belgelerine bakın).
Sütunlarda açıklama ayarlama
Redshift, sütunların çoğu sorgu aracında (komutunu kullanarak) gösterilmesi gereken açıklamaların eklenmesini COMMENT
sağlar. Tek tek sütunlar için bir açıklama belirtmek üzere sütun meta veri alanını ayarlayabilirsiniz description
.
Redshift'e sorgu gönderme
Spark iyileştiricisi aşağıdaki işleçleri Redshift'e iletir:
Filter
Project
Sort
Limit
Aggregation
Join
ve Filter
içinde Project
aşağıdaki ifadeleri destekler:
- Çoğu Boole mantıksal işleci
- Karşılaştırmalar
- Temel aritmetik işlemler
- Sayısal ve dize atamaları
- Çoğu dize işlevi
- Tamamen Redshift'e gönderilebilen Skaler alt sorgular.
Not
Bu gönderim, tarihlerde ve zaman damgalarında çalışan ifadeleri desteklemez.
içinde Aggregation
aşağıdaki toplama işlevlerini destekler:
AVG
COUNT
MAX
MIN
SUM
STDDEV_SAMP
STDDEV_POP
VAR_SAMP
VAR_POP
varsa yan tümcesiyle DISTINCT
birlikte.
içinde Join
aşağıdaki birleştirme türlerini destekler:
INNER JOIN
LEFT OUTER JOIN
RIGHT OUTER JOIN
LEFT SEMI JOIN
LEFT ANTI JOIN
- İyileştirici tarafından yeniden
Join
yazılan alt sorgular; örneğinWHERE EXISTS
, ,WHERE NOT EXISTS
Not
Birleştirme anında iletme desteğine sahip değildir FULL OUTER JOIN
.
İtme, ile LIMIT
yapılan sorgularda en yararlı olabilir. gibi SELECT * FROM large_redshift_table LIMIT 10
bir sorgu çok uzun sürebilir, bunun nedeni tablonun tamamının S3'e ara sonuç olarak UNLOADed olmasıdır. aşağı itme ile, LIMIT
Redshift'te yürütülür. Toplamaları olan sorgularda toplamayı Redshift'e göndermek, aktarılması gereken veri miktarını azaltmaya da yardımcı olur.
Redshift'e sorgu gönderme varsayılan olarak etkindir. olarak ayarlanarak spark.databricks.redshift.pushdown
false
devre dışı bırakılabilir. Devre dışı bırakıldığında bile Spark yine de filtreleri aşağı iter ve Redshift'e sütun eleme gerçekleştirir.
Redshift sürücüsü yüklemesi
Redshift veri kaynağı ayrıca Redshift uyumlu bir JDBC sürücüsü gerektirir. Redshift, PostgreSQL veritabanı sistemini temel aldığı için Databricks Runtime ile birlikte gelen PostgreSQL JDBC sürücüsünü veya Amazon tarafından önerilen Redshift JDBC sürücüsünü kullanabilirsiniz. PostgreSQL JDBC sürücüsünü kullanmak için yükleme gerekmez. Her Databricks Runtime sürümünde bulunan PostgreSQL JDBC sürücüsünün sürümü Databricks Runtime sürüm notlarında listelenir.
Redshift JDBC sürücüsünü el ile yüklemek için:
- Sürücüyü Amazon'dan indirin .
- Sürücüyü Azure Databricks çalışma alanınıza yükleyin. Bkz. Kitaplıklar.
- Kitaplığı kümenize yükleyin .
Not
Databricks, Redshift JDBC sürücüsünün en son sürümünü kullanmanızı önerir. 1.2.41'in altındaki Redshift JDBC sürücüsünün sürümleri aşağıdaki sınırlamalara sahiptir:
- Sürücünün 1.2.16 sürümü, SQL sorgusunda yan
where
tümcesi kullanılırken boş veriler döndürür. - 1.2.41'in altındaki sürücünün sürümleri geçersiz sonuçlar döndürebilir çünkü bir sütunun null atanabilirliği "Bilinmiyor" yerine yanlışlıkla "Null Atanamaz" olarak bildirilir.
İşlem garantileri
Bu bölümde, Spark için Redshift veri kaynağının işlem garantileri açıklanmaktadır.
Redshift ve S3 özelliklerinde genel arka plan
Redshift işlem garantileri hakkında genel bilgi için Redshift belgelerindeki Eşzamanlı Yazma İşlemlerini Yönetme bölümüne bakın. Özetle Redshift, Redshift BEGIN komutunun belgelerine göre serileştirilebilir yalıtım sağlar:
Dört işlem yalıtım düzeyinden herhangi birini kullanabilirsiniz ancak Amazon Redshift tüm yalıtım düzeylerini serileştirilebilir olarak işler.
Amazon Redshift, ayrı ayrı yürütülen her SQL komutunun ayrı ayrı işlediği varsayılan otomatik işleme davranışını destekler.
Bu nedenle, ve UNLOAD
gibi COPY
tek tek komutlar atomik ve işlemseldir, ancak açıktır BEGIN
ve END
yalnızca birden çok komut veya sorgunun bölünmezliğini zorlamak için gereklidir.
Redshift'ten okurken ve Redshift'e yazarken, veri kaynağı S3'te verileri okur ve yazar. Hem Spark hem de Redshift bölümlenmiş çıkış üretir ve S3'te birden çok dosyada depolar. Amazon S3 Veri Tutarlılığı Modeli belgelerine göre, S3 demet listeleme işlemleri sonunda tutarlıdır, bu nedenle dosyaların bu nihai tutarlılık kaynağı nedeniyle eksik veya eksik verileri önlemek için özel uzunluklara gitmesi gerekir.
Spark için Redshift veri kaynağının garantileri
Var olan bir tablonun sonuna ekleme
Redshift'e satır eklerken, veri kaynağı COPY komutunu kullanır ve nihai olarak tutarlı olan belirli S3 işlemlerine karşı koruma sağlamak için bildirimleri belirtir. Sonuç olarak, spark-redshift
mevcut tablolara eklemeler normal Redshift COPY
komutları ile aynı atomik ve işlem özelliklerine sahiptir.
Yeni tablo oluşturma (SaveMode.CreateIfNotExists
)
Yeni tablo oluşturmak, ilk satır kümesini eklemeye yönelik bir CREATE TABLE
komutun ardından COPY komutundan oluşan iki adımlı bir işlemdir. Her iki işlem de aynı işlemde gerçekleştirilir.
Varolan tablonun üzerine yazma
Varsayılan olarak, veri kaynağı hedef tabloyu silerek, yeni boş bir tablo oluşturarak ve satır ekleyerek uygulanan üzerine yazma işlemleri gerçekleştirmek için işlemleri kullanır.
Kullanım dışı usestagingtable
bırakılan ayar olarak ayarlanırsa false
, veri kaynağı yeni tabloya satır eklemeden önce komutu işler DELETE TABLE
ve üzerine yazma işleminin bölünmezliğinden ödün verir, ancak redshift'in üzerine yazma sırasında ihtiyaç duyduğu hazırlama alanı miktarını azaltır.
Redshift tablosunu sorgulama
Sorgular redshift UNLOAD komutunu kullanarak sorguyu yürütür ve sonuçlarını S3'e kaydeder ve sonunda tutarlı olan belirli S3 işlemlerine karşı koruma sağlamak için bildirimleri kullanır. Sonuç olarak, Spark için Redshift veri kaynağından gelen sorgular, normal Redshift sorguları ile aynı tutarlılık özelliklerine sahip olmalıdır.
Yaygın sorunlar ve çözümler
S3 demeti ve Redshift kümesi farklı AWS bölgelerinde
Varsayılan olarak, S3 <demeti ve Redshift kümesi farklı AWS bölgelerindeyse S3 -> Redshift kopyaları çalışmaz.
S3 demeti farklı bir bölgedeyken Redshift tablosunu okumaya çalışırsanız, aşağıdaki gibi bir hata görebilirsiniz:
ERROR: S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect.
Benzer şekilde, farklı bir bölgedeki S3 demetini kullanarak Redshift'e yazma girişimi aşağıdaki hataya neden olabilir:
error: Problem reading manifest file - S3ServiceException:The S3 bucket addressed by the query is in a different region from this cluster.,Status 301,Error PermanentRedirect
Yazmalar: Redshift COPY komutu S3 demeti bölgesinin açık belirtimini destekler, böylece bu durumlarda ayara ekleyerek
region 'the-region-name'
Redshift'eextracopyoptions
yazma işlemini düzgün bir şekilde yapabilirsiniz. Örneğin, ABD Doğu (Virginia) bölgesinde ve Scala API'sinde bir demetle şunu kullanın:.option("extracopyoptions", "region 'us-east-1'")
Alternatif olarak şu ayarı kullanabilirsiniz
awsregion
:.option("awsregion", "us-east-1")
Okuma: Redshift UNLOAD komutu S3 demeti bölgesinin belirtimini de destekler. Ayarına bölgeyi
awsregion
ekleyerek okumaların düzgün çalışmasını sağlayabilirsiniz:.option("awsregion", "us-east-1")
JDBC URL'sinde özel karakterler içeren bir parola kullanılırken kimlik doğrulama hatası
Kullanıcı adı ve parolayı JDBC URL'sinin bir parçası olarak sağlıyorsanız ve parola , ?
veya &
gibi ;
özel karakterler içeriyorsa aşağıdaki özel durumu görebilirsiniz:
java.sql.SQLException: [Amazon](500310) Invalid operation: password authentication failed for user 'xyz'
Bunun nedeni kullanıcı adı veya paroladaki özel karakterlerin JDBC sürücüsü tarafından doğru şekilde kaçılmamasıdır. İlgili DataFrame seçeneklerini user
password
ve kullanarak kullanıcı adını ve parolayı belirttiğinizden emin olun. Daha fazla bilgi için bkz . Parametreler.
Uzun süre çalışan Spark sorgusu, karşılık gelen Redshift işlemi gerçekleştirilse bile süresiz olarak kilitleniyor
Redshift'ten ve Redshift'e büyük miktarda veri okuyor veya yazıyorsanız, AWS Redshift İzleme sayfası ilgili LOAD
veya UNLOAD
işlemin tamamlandığını ve kümenin boşta olduğunu gösterse bile Spark sorgunuz süresiz olarak yanıt vermemeye başlayabilir. Bunun nedeni Redshift ile Spark arasındaki bağlantının zaman aşımına neden olmasıdır. Bunu önlemek için JDBC bayrağının etkinleştirildiğinden tcpKeepAlive
ve TCPKeepAliveMinutes
düşük bir değere ayarlandığından emin olun (örneğin, 1).
Daha fazla bilgi için bkz . Amazon Redshift JDBC Sürücü Yapılandırması.
Saat dilimi semantiği ile zaman damgası
Veriler okunurken hem Redshift TIMESTAMP
hem TIMESTAMPTZ
de veri türleri Spark TimestampType
ile eşlenir ve bir değer Eşgüdümlü Evrensel Saat'e (UTC) dönüştürülür ve UTC zaman damgası olarak depolanır. Redshift TIMESTAMP
için, değerin saat dilimi bilgisi olmadığından yerel saat dilimi varsayılır. Redshift tablosuna veri yazarken Spark, TimestampType
Redshift TIMESTAMP
veri türüne eşlenir.
Geçiş kılavuzu
Veri kaynağı artık Spark S3 kimlik bilgileri Redshift'e iletilmeden önce açıkça ayarlamanızı forward_spark_s3_credentials
gerektirir. veya temporary_aws_*
kimlik doğrulama mekanizmalarını kullanıyorsanız bu değişikliğin aws_iam_role
hiçbir etkisi olmaz. Ancak, eski varsayılan davranışı kullandıysanız, önceki Redshift to S3 kimlik doğrulama mekanizmanızı kullanmaya devam etmek için true
artık açıkça olarak olarak ayarlamalısınızforward_spark_s3_credentials
. Üç kimlik doğrulama mekanizması ve bunların güvenlikle ilgili dengeleri hakkında bilgi edinmek için bu belgenin S3 ve Redshift kimlik doğrulama bölümüne bakın.