Uwaga
Dostęp do tej strony wymaga autoryzacji. Może spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Ważne
Ta dokumentacja została wycofana i może nie zostać zaktualizowana. Produkty, usługi lub technologie wymienione w tej zawartości nie są już obsługiwane. Zobacz Wykonywanie zapytań o dane w usłudze Azure Synapse Analytics.
Usługa Databricks zaleca korzystanie z domyślnej funkcji COPY
w Azure Data Lake Storage do połączeń z Azure Synapse. Ten artykuł zawiera starszą dokumentację dotyczącą technologii PolyBase i magazynu obiektów blob.
Azure Synapse Analytics (dawniej SQL Data Warehouse) to oparty na chmurze magazyn danych przedsiębiorstwa, który wykorzystuje masowe przetwarzanie równoległe (MPP), aby szybko uruchamiać złożone zapytania w petabajtach danych. Użyj platformy Azure jako kluczowego składnika rozwiązania do obsługi danych big data. Zaimportuj dane big data na platformę Azure przy użyciu prostych zapytań T-SQL technologii PolyBase lub instrukcji COPY , a następnie użyj możliwości MPP do uruchamiania analizy o wysokiej wydajności. Podczas integracji i analizy hurtownia danych stanie się jedynym źródłem prawdy, na którym Twoja firma może polegać przy szukaniu szczegółowych informacji.
Dostęp do usługi Azure Synapse można uzyskać z usługi Azure Databricks przy użyciu łącznika usługi Azure Synapse, implementacji źródła danych na platformie Apache Spark, która korzysta z usługi Azure Blob Storage oraz technologii PolyBase lub polecenia COPY
w usłudze Azure Synapse, aby wydajnie przesyłać duże ilości danych między klastrem Azure Databricks a wystąpieniem Azure Synapse.
Zarówno klaster usługi Azure Databricks, jak i wystąpienie usługi Azure Synapse uzyskują dostęp do wspólnego kontenera usługi Blob Storage w celu wymiany danych między tymi dwoma systemami. W usłudze Azure Databricks zadania platformy Apache Spark są wyzwalane przez łącznik usługi Azure Synapse w celu odczytywania danych z kontenera usługi Blob Storage i zapisywania ich w kontenerze usługi Blob Storage. Po stronie usługi Azure Synapse operacje ładowania i zwalniania danych wykonywane przez program PolyBase są wyzwalane przez łącznik usługi Azure Synapse za pośrednictwem JDBC. W środowisku Databricks Runtime 7.0 lub nowszym COPY
domyślnie jest używane do ładowania danych do usługi Azure Synapse przez łącznik usługi Azure Synapse za pośrednictwem JDBC.
Uwaga
COPY
jest dostępna tylko w wystąpieniach usługi Azure Synapse Gen2, które zapewniają lepszą wydajność. Jeśli Twoja baza danych nadal używa instancji Gen1, zalecamy migrację bazy danych do Gen2.
Łącznik usługi Azure Synapse jest bardziej odpowiedni dla procesu ETL niż do zapytań interakcyjnych, ponieważ każde wykonywanie zapytań może wyodrębniać duże ilości danych do usługi Blob Storage. Jeśli planujesz wykonać kilka zapytań względem tej samej tabeli usługi Azure Synapse, zalecamy zapisanie wyodrębnionych danych w formacie takim jak Parquet.
Wymagania
Klucz główny bazy danych usługi Azure Synapse.
Uwierzytelnianie
Łącznik usługi Azure Synapse używa trzech typów połączeń sieciowych:
- Sterownik platformy Spark do usługi Azure Synapse
- Sterownik i egzekutory Spark do konta usługi Azure Storage
- Usługa Azure Synapse do konta usługi Azure Storage
┌─────────┐
┌─────────────────────────>│ STORAGE │<────────────────────────┐
│ Storage acc key / │ ACCOUNT │ Storage acc key / │
│ Managed Service ID / └─────────┘ OAuth 2.0 / │
│ │ │
│ │ │
│ │ Storage acc key / │
│ │ OAuth 2.0 / │
│ │ │
v v ┌──────v────┐
┌──────────┐ ┌──────────┐ │┌──────────┴┐
│ Synapse │ │ Spark │ ││ Spark │
│ Analytics│<────────────────────>│ Driver │<───────────────>│ Executors │
└──────────┘ JDBC with └──────────┘ Configured └───────────┘
username & password / in Spark
W poniższych sekcjach opisano opcje konfiguracji uwierzytelniania każdego połączenia.
Sterownik platformy Spark do usługi Azure Synapse
Sterownik Platformy Spark może nawiązać połączenie z usługą Azure Synapse przy użyciu JDBC z nazwą użytkownika i hasłem lub protokołem OAuth 2.0 z jednostką usługi na potrzeby uwierzytelniania.
Nazwa użytkownika i hasło
Zalecamy użycie ciągów połączenia udostępnianych przez witrynę Azure portal dla obu typów uwierzytelniania, które umożliwiają szyfrowanie Secure Sockets Layer (SSL) dla wszystkich danych wysyłanych między sterownikiem Spark a wystąpieniem Azure Synapse przez połączenie JDBC. Aby sprawdzić, czy szyfrowanie SSL jest włączone, możesz wyszukać encrypt=true
w parametry połączenia.
Aby umożliwić sterownikowi platformy Spark dotarcie do usługi Azure Synapse, zalecamy ustawienie Zezwalaj usługom i zasobom platformy Azure na dostęp do tego obszaru roboczego na wartość WŁ . w okienku Sieć w obszarze Zabezpieczenia obszaru roboczego usługi Azure Synapse za pośrednictwem witryny Azure Portal. To ustawienie umożliwia komunikację ze wszystkich adresów IP platformy Azure i wszystkich podsieci platformy Azure, co umożliwia sterownikom platformy Spark dotarcie do wystąpienia usługi Azure Synapse.
Uwierzytelnianie OAuth 2.0 z jednostką usługi
Możesz uwierzytelnić się w usłudze Azure Synapse Analytics przy użyciu jednostki usługi z dostępem do bazowego konta magazynu. Aby uzyskać więcej informacji na temat używania poświadczeń tożsamości usługi głównej dla uzyskania dostępu do konta usługi Azure Storage, zobacz Connect to Azure Data Lake Storage and Blob Storage. Należy ustawić enableServicePrincipalAuth
opcję na true
w parametrach konfiguracji połączenia, aby umożliwić łącznikowi uwierzytelnianie za pomocą jednostki usługi.
Opcjonalnie możesz użyć innej jednostki usługi dla połączenia usługi Azure Synapse Analytics. Przykład konfigurowania poświadczeń podmiotu usługi dla konta magazynu i opcjonalnych poświadczeń podmiotu usługi dla usługi Synapse.
ini
; Defining the Service Principal credentials for the Azure storage account
fs.azure.account.auth.type OAuth
fs.azure.account.oauth.provider.type org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider
fs.azure.account.oauth2.client.id <application-id>
fs.azure.account.oauth2.client.secret <service-credential>
fs.azure.account.oauth2.client.endpoint https://login.microsoftonline.com/<directory-id>/oauth2/token
; Defining a separate set of service principal credentials for Azure Synapse Analytics (If not defined, the connector will use the Azure storage account credentials)
spark.databricks.sqldw.jdbc.service.principal.client.id <application-id>
spark.databricks.sqldw.jdbc.service.principal.client.secret <service-credential>
Skala
// Defining the Service Principal credentials for the Azure storage account
spark.conf.set("fs.azure.account.auth.type", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id", "<application-id>")
spark.conf.set("fs.azure.account.oauth2.client.secret", "<service-credential>")
spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<directory-id>/oauth2/token")
// Defining a separate set of service principal credentials for Azure Synapse Analytics (If not defined, the connector will use the Azure storage account credentials)
spark.conf.set("spark.databricks.sqldw.jdbc.service.principal.client.id", "<application-id>")
spark.conf.set("spark.databricks.sqldw.jdbc.service.principal.client.secret", "<service-credential>")
Pyton
# Defining the service principal credentials for the Azure storage account
spark.conf.set("fs.azure.account.auth.type", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id", "<application-id>")
spark.conf.set("fs.azure.account.oauth2.client.secret", "<service-credential>")
spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<directory-id>/oauth2/token")
# Defining a separate set of service principal credentials for Azure Synapse Analytics (If not defined, the connector will use the Azure storage account credentials)
spark.conf.set("spark.databricks.sqldw.jdbc.service.principal.client.id", "<application-id>")
spark.conf.set("spark.databricks.sqldw.jdbc.service.principal.client.secret", "<service-credential>")
R
# Load SparkR
library(SparkR)
conf <- sparkR.callJMethod(sparkR.session(), "conf")
# Defining the service principal credentials for the Azure storage account
sparkR.callJMethod(conf, "set", "fs.azure.account.auth.type", "OAuth")
sparkR.callJMethod(conf, "set", "fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
sparkR.callJMethod(conf, "set", "fs.azure.account.oauth2.client.id", "<application-id>")
sparkR.callJMethod(conf, "set", "fs.azure.account.oauth2.client.secret", "<service-credential>")
sparkR.callJMethod(conf, "set", "fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<directory-id>/oauth2/token")
# Defining a separate set of service principal credentials for Azure Synapse Analytics (If not defined, the connector will use the Azure storage account credentials)
sparkR.callJMethod(conf, "set", "spark.databricks.sqldw.jdbc.service.principal.client.id", "<application-id>")
sparkR.callJMethod(conf, "set", "spark.databricks.sqldw.jdbc.service.principal.client.secret", "<service-credential>")
Sterownik i egzekutory Spark do konta usługi Azure Storage
Kontener usługi Azure Storage działa jako pośrednik do przechowywania danych zbiorczych podczas odczytywania danych z usługi Azure Synapse lub zapisywania ich w usłudze Azure Synapse. Platforma Spark łączy się z usługą ADLS lub usługąabfss
Blob Storage przy użyciu sterownika.
Dostępne są następujące opcje uwierzytelnienia:
- Klucz dostępu konta magazynowego i tajny sekret
- Uwierzytelnianie OAuth 2.0. Aby uzyskać więcej informacji na temat protokołu OAuth 2.0 i jednostek usługi, zobacz Uzyskiwanie dostępu do magazynu przy użyciu jednostki usługi i identyfikatora entra firmy Microsoft (Azure Active Directory).
W poniższych przykładach przedstawiono te dwa sposoby wykorzystania podejścia z kluczem dostępu do konta magazynu. To samo dotyczy konfiguracji protokołu OAuth 2.0.
Konfiguracja sesji notebooka (preferowana)
Korzystając z tego podejścia, klucz dostępu do konta jest ustawiany w konfiguracji sesji skojarzonej z notesem, który uruchamia polecenie. Ta konfiguracja nie ma wpływu na inne notatniki przypięte do tego samego klastra.
spark
jest obiektem SparkSession
podanym w notesie.
spark.conf.set(
"fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
"<your-storage-account-access-key>")
Globalna konfiguracja usługi Hadoop
Takie podejście aktualizuje globalną konfigurację Hadoop skojarzoną z obiektem SparkContext
współdzielonym przez wszystkie notesy.
Skala
sc.hadoopConfiguration.set(
"fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
"<your-storage-account-access-key>")
Pyton
hadoopConfiguration
nie jest uwidoczniony we wszystkich wersjach programu PySpark. Chociaż następujące polecenie opiera się na niektórych wewnętrznych mechanizmach Spark, powinno działać ze wszystkimi wersjami PySpark i jest mało prawdopodobne, aby przestało działać lub zostało zmienione w przyszłości.
sc._jsc.hadoopConfiguration().set(
"fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
"<your-storage-account-access-key>")
Usługa Azure Synapse do konta usługi Azure Storage
Usługa Azure Synapse również nawiązuje połączenie z kontem magazynu podczas ładowania i zwalniania danych tymczasowych.
W przypadku skonfigurowania klucza konta i tajnego klucza dla konta magazynu można ustawić forwardSparkAzureStorageCredentials
na true
, w takim przypadku łącznik usługi Azure Synapse automatycznie odnajduje klucz dostępu konta ustawiony w konfiguracji sesji notesu lub globalnej konfiguracji usługi Hadoop i przekazuje klucz dostępu konta magazynu do połączonego wystąpienia usługi Azure Synapse przez utworzenie tymczasowego poświadczenia zakresu bazy danych platformy Azure.
Alternatywnie, jeśli używasz usługi ADLS z uwierzytelnianiem OAuth 2.0 lub wystąpienie usługi Azure Synapse jest skonfigurowane tak, aby miało tożsamość usługi zarządzanej (zazwyczaj w połączeniu z konfiguracją sieci wirtualnej i punktów końcowych usługi), musisz ustawić useAzureMSI
na true
. W takim przypadku łącznik określi IDENTITY = 'Managed Service Identity'
dla poświadczeń przypisanych do zakresu bazy danych i nie określi SECRET
.
Obsługa przesyłania strumieniowego
Łącznik usługi Azure Synapse oferuje wydajną i skalowalną obsługę strumieniowego zapisu dla usługi Azure Synapse, co zapewnia spójne doświadczenie użytkownika z zapisami wsadowymi i wykorzystuje technologię PolyBase lub COPY
do dużych transferów danych między klastrem usługi Azure Databricks a instancją usługi Azure Synapse. Podobnie jak w przypadku zapisów wsadowych, przesyłanie strumieniowe jest przeznaczone w dużej mierze dla etL, co zapewnia większe opóźnienie, które może nie być odpowiednie do przetwarzania danych w czasie rzeczywistym w niektórych przypadkach.
Semantyka odporności na uszkodzenia
Domyślnie usługa Azure Synapse Streaming oferuje kompleksową gwarancję dokładnie raz na potrzeby zapisywania danych w tabeli usługi Azure Synapse przez niezawodne śledzenie postępu zapytania przy użyciu kombinacji lokalizacji punktu kontrolnego w systemie DBFS, tabeli punktów kontrolnych w usłudze Azure Synapse i mechanizmu blokowania w celu zapewnienia, że przesyłanie strumieniowe może obsługiwać wszelkie typy błędów, ponawiania prób i ponownego uruchamiania zapytań.
Opcjonalnie możesz wybrać mniej restrykcyjne semantyki co najmniej raz dla usługi Azure Synapse Streaming, ustawiając opcję spark.databricks.sqldw.streaming.exactlyOnce.enabled
na false
, w takim przypadku może wystąpić duplikowanie danych przy sporadycznych niepowodzeniach połączenia z usługą Azure Synapse lub nieoczekiwanym zakończeniem zapytania.
Użycie (partia)
Tego łącznika można używać za pośrednictwem interfejsu API źródła danych w notatnikach Scala, Python, SQL i R.
Skala
// Otherwise, set up the Blob storage account access key in the notebook session conf.
spark.conf.set(
"fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
"<your-storage-account-access-key>")
// Get some data from an Azure Synapse table.
val df: DataFrame = spark.read
.format("com.databricks.spark.sqldw")
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
.option("forwardSparkAzureStorageCredentials", "true")
.option("dbTable", "<your-table-name>")
.load()
// Load data from an Azure Synapse query.
val df: DataFrame = spark.read
.format("com.databricks.spark.sqldw")
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
.option("forwardSparkAzureStorageCredentials", "true")
.option("query", "select x, count(*) as cnt from table group by x")
.load()
// Apply some transformations to the data, then use the
// Data Source API to write the data back to another table in Azure Synapse.
df.write
.format("com.databricks.spark.sqldw")
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
.option("forwardSparkAzureStorageCredentials", "true")
.option("dbTable", "<your-table-name>")
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
.save()
Pyton
# Otherwise, set up the Blob storage account access key in the notebook session conf.
spark.conf.set(
"fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
"<your-storage-account-access-key>")
# Get some data from an Azure Synapse table.
df = spark.read \
.format("com.databricks.spark.sqldw") \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>") \
.option("forwardSparkAzureStorageCredentials", "true") \
.option("dbTable", "<your-table-name>") \
.load()
# Load data from an Azure Synapse query.
df = spark.read \
.format("com.databricks.spark.sqldw") \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>") \
.option("forwardSparkAzureStorageCredentials", "true") \
.option("query", "select x, count(*) as cnt from table group by x") \
.load()
# Apply some transformations to the data, then use the
# Data Source API to write the data back to another table in Azure Synapse.
df.write \
.format("com.databricks.spark.sqldw") \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("forwardSparkAzureStorageCredentials", "true") \
.option("dbTable", "<your-table-name>") \
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>") \
.save()
SQL
-- Otherwise, set up the Blob storage account access key in the notebook session conf.
SET fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net=<your-storage-account-access-key>;
-- Read data using SQL.
CREATE TABLE example_table_in_spark_read
USING com.databricks.spark.sqldw
OPTIONS (
url 'jdbc:sqlserver://<the-rest-of-the-connection-string>',
forwardSparkAzureStorageCredentials 'true',
dbTable '<your-table-name>',
tempDir 'abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>'
);
-- Write data using SQL.
-- Create a new table, throwing an error if a table with the same name already exists:
CREATE TABLE example_table_in_spark_write
USING com.databricks.spark.sqldw
OPTIONS (
url 'jdbc:sqlserver://<the-rest-of-the-connection-string>',
forwardSparkAzureStorageCredentials 'true',
dbTable '<your-table-name>',
tempDir 'abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>'
)
AS SELECT * FROM table_to_save_in_spark;
R
# Load SparkR
library(SparkR)
# Otherwise, set up the Blob storage account access key in the notebook session conf.
conf <- sparkR.callJMethod(sparkR.session(), "conf")
sparkR.callJMethod(conf, "set", "fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net", "<your-storage-account-access-key>")
# Get some data from an Azure Synapse table.
df <- read.df(
source = "com.databricks.spark.sqldw",
url = "jdbc:sqlserver://<the-rest-of-the-connection-string>",
forward_spark_azure_storage_credentials = "true",
dbTable = "<your-table-name>",
tempDir = "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
# Load data from an Azure Synapse query.
df <- read.df(
source = "com.databricks.spark.sqldw",
url = "jdbc:sqlserver://<the-rest-of-the-connection-string>",
forward_spark_azure_storage_credentials = "true",
query = "select x, count(*) as cnt from table group by x",
tempDir = "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
# Apply some transformations to the data, then use the
# Data Source API to write the data back to another table in Azure Synapse.
write.df(
df,
source = "com.databricks.spark.sqldw",
url = "jdbc:sqlserver://<the-rest-of-the-connection-string>",
forward_spark_azure_storage_credentials = "true",
dbTable = "<your-table-name>",
tempDir = "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
Użycie (przesyłanie strumieniowe)
Dane można zapisywać przy użyciu Structured Streaming w notebookach Scala i Pythona.
Skala
// Set up the Blob storage account access key in the notebook session conf.
spark.conf.set(
"fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
"<your-storage-account-access-key>")
// Prepare streaming source; this could be Kafka or a simple rate stream.
val df: DataFrame = spark.readStream
.format("rate")
.option("rowsPerSecond", "100000")
.option("numPartitions", "16")
.load()
// Apply some transformations to the data then use
// Structured Streaming API to continuously write the data to a table in Azure Synapse.
df.writeStream
.format("com.databricks.spark.sqldw")
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
.option("forwardSparkAzureStorageCredentials", "true")
.option("dbTable", "<your-table-name>")
.option("checkpointLocation", "/tmp_checkpoint_location")
.start()
Pyton
# Set up the Blob storage account access key in the notebook session conf.
spark.conf.set(
"fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
"<your-storage-account-access-key>")
# Prepare streaming source; this could be Kafka or a simple rate stream.
df = spark.readStream \
.format("rate") \
.option("rowsPerSecond", "100000") \
.option("numPartitions", "16") \
.load()
# Apply some transformations to the data then use
# Structured Streaming API to continuously write the data to a table in Azure Synapse.
df.writeStream \
.format("com.databricks.spark.sqldw") \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>") \
.option("forwardSparkAzureStorageCredentials", "true") \
.option("dbTable", "<your-table-name>") \
.option("checkpointLocation", "/tmp_checkpoint_location") \
.start()
Konfigurowanie
W tej sekcji opisano sposób konfigurowania semantyki zapisu dla łącznika, wymaganych uprawnień i różnych parametrów konfiguracji.
W tej sekcji:
- Obsługiwane tryby zapisu dla zapisów wsadowych
- Obsługiwane tryby danych wyjściowych dla zapisów przesyłanych strumieniowo
- Zapisywanie semantyki
- Wymagane uprawnienia usługi Azure Synapse dla programu PolyBase
-
Wymagane uprawnienia usługi Azure Synapse dla instrukcji
COPY
- Parametry
- Wypychanie zapytań do usługi Azure Synapse
- Tymczasowe zarządzanie danymi
- Tymczasowe zarządzanie obiektami
- Zarządzanie tabelami punktów kontrolnych przesyłania strumieniowego
Obsługiwane tryby zapisywania dla zapisów wsadowych
Łącznik usługi Azure Synapse obsługuje ErrorIfExists
tryby , Ignore
, Append
i Overwrite
zapisywania z domyślnym trybem to ErrorIfExists
.
Aby uzyskać więcej informacji na temat obsługiwanych trybów zapisywania na platformie Apache Spark, zobacz dokumentację spark SQL dotyczącą trybów zapisywania.
Obsługiwane tryby danych wyjściowych dla zapisów przesyłanych strumieniowo
Łącznik usługi Azure Synapse obsługuje Append
tryby danych wyjściowych i Complete
dołączania i agregacji rekordów.
Aby uzyskać więcej informacji na temat trybów danych wyjściowych i macierzy zgodności, zobacz Przewodnik dotyczący Structured Streaming.
Semantyka zapisu
Uwaga
COPY
jest dostępny w środowisku Databricks Runtime 7.0 lub nowszym.
Oprócz technologii PolyBase łącznik usługi Azure Synapse obsługuje instrukcję COPY
. Instrukcja COPY
oferuje wygodniejszy sposób ładowania danych do usługi Azure Synapse bez konieczności tworzenia tabeli zewnętrznej, wymaga mniejszej liczby uprawnień do ładowania danych i poprawia wydajność pozyskiwania danych w usłudze Azure Synapse.
Domyślnie łącznik automatycznie odnajduje najlepszą semantykę zapisu (COPY
w przypadku określania wartości docelowej wystąpienia usługi Azure Synapse Gen2, a w przeciwnym razie PolyBase). Można również określić semantykę zapisu przy użyciu następującej konfiguracji:
Skala
// Configure the write semantics for Azure Synapse connector in the notebook session conf.
spark.conf.set("spark.databricks.sqldw.writeSemantics", "<write-semantics>")
Pyton
# Configure the write semantics for Azure Synapse connector in the notebook session conf.
spark.conf.set("spark.databricks.sqldw.writeSemantics", "<write-semantics>")
SQL
-- Configure the write semantics for Azure Synapse connector in the notebook session conf.
SET spark.databricks.sqldw.writeSemantics=<write-semantics>;
R
# Load SparkR
library(SparkR)
# Configure the write semantics for Azure Synapse connector in the notebook session conf.
conf <- sparkR.callJMethod(sparkR.session(), "conf")
sparkR.callJMethod(conf, "set", "spark.databricks.sqldw.writeSemantics", "<write-semantics>")
gdzie <write-semantics>
należy polybase
użyć technologii PolyBase lub copy
użyć instrukcji COPY
.
Wymagane uprawnienia usługi Azure Synapse dla programu PolyBase
W przypadku korzystania z technologii PolyBase łącznik usługi Azure Synapse wymaga od użytkownika połączenia JDBC uprawnienia do uruchamiania następujących poleceń w połączonym wystąpieniu usługi Azure Synapse:
- CREATE DATABASE POŚWIADCZENIE W ZAKRESIE
- TWORZENIE ZEWNĘTRZNEGO ŹRÓDŁA DANYCH
- UTWÓRZ ZEWNĘTRZNY FORMAT PLIKU
- UTWÓRZ ZEWNĘTRZNY TABLE
W ramach wymagań wstępnych dla pierwszego polecenia łącznik oczekuje, że klucz główny bazy danych już istnieje dla określonego wystąpienia usługi Azure Synapse. Jeśli nie, możesz utworzyć klucz przy użyciu polecenia CREATE MASTER KEY .
Ponadto, aby odczytać tabele usługi Azure Synapse ustawione za pomocą dbTable
lub tabele, o których mowa w query
, użytkownik JDBC musi mieć uprawnienia dostępu do potrzebnych tabel usługi Azure Synapse. Aby zapisywać dane do tabeli Azure Synapse ustawionej za pomocą dbTable
, użytkownik JDBC musi mieć uprawnienia do zapisu w tej tabeli usługi Azure Synapse.
Poniższa tabela zawiera podsumowanie wymaganych uprawnień dla wszystkich operacji za pomocą technologii PolyBase:
Operacja | Uprawnienia | Uprawnienia podczas korzystania z zewnętrznego źródła danych |
---|---|---|
Zapis wsadowy | KONTROLA | Zobacz Zapisywanie wsadowe |
Zapis przesyłany strumieniowo | KONTROLA | Zobacz Zapis przesyłany strumieniowo |
Przeczytaj | KONTROLA | Zobacz Czytaj |
Wymagane uprawnienia usługi Azure Synapse dla programu PolyBase z opcją zewnętrznego źródła danych
Można użyć programu PolyBase z wcześniej skonfigurowanym zewnętrznym źródłem danych. Zobacz parametr externalDataSource
w Parametrach, aby uzyskać więcej informacji.
Aby można było używać technologii PolyBase ze wstępnie aprowizowanym zewnętrznym źródłem danych, łącznik usługi Azure Synapse wymaga, aby użytkownik połączenia JDBC miał uprawnienia do uruchamiania następujących poleceń w połączonym wystąpieniu usługi Azure Synapse:
Aby utworzyć zewnętrzne źródło danych, należy najpierw utworzyć poświadczenie o zakresie bazy danych. Poniższe linki opisują sposób tworzenia poświadczeń z określonym zakresem dla podmiotów usługi oraz zewnętrznego źródła danych dla lokalizacji ABFS.
Uwaga
Lokalizacja zewnętrznego źródła danych musi wskazywać kontener. Łącznik nie będzie działać, jeśli lokalizacja jest katalogiem w kontenerze.
Poniższa tabela zawiera podsumowanie uprawnień do operacji zapisu PolyBase w przypadku opcji zewnętrznego źródła danych.
Operacja | Uprawnienia (wstawianie do istniejącej tabeli) | Uprawnienia (wstawianie do nowej tabeli) |
---|---|---|
Zapis wsadowy | ADMINISTROWANIE ZBIORCZYMI OPERACJAMI BAZY DANYCH INSERT CREATE TABLE ZMIENIAJ DOWOLNY SCHEMA ZMIENIANIE DOWOLNEGO ZEWNĘTRZNEGO ŹRÓDŁA DANYCH ZMIENIANIE DOWOLNEGO FORMATU PLIKU ZEWNĘTRZNEGO |
ADMINISTROWANIE ZBIORCZYMI OPERACJAMI BAZY DANYCH INSERT CREATE TABLE ZMIENIAJ DOWOLNY SCHEMA ZMIENIANIE DOWOLNEGO ZEWNĘTRZNEGO ŹRÓDŁA DANYCH ZMIENIANIE DOWOLNEGO FORMATU PLIKU ZEWNĘTRZNEGO |
Zapis przesyłany strumieniowo | ADMINISTROWANIE ZBIORCZYMI OPERACJAMI BAZY DANYCH INSERT CREATE TABLE ZMIENIAJ DOWOLNY SCHEMA ZMIENIANIE DOWOLNEGO ZEWNĘTRZNEGO ŹRÓDŁA DANYCH ZMIENIANIE DOWOLNEGO FORMATU PLIKU ZEWNĘTRZNEGO |
ADMINISTROWANIE ZBIORCZYMI OPERACJAMI BAZY DANYCH INSERT CREATE TABLE ZMIENIAJ DOWOLNY SCHEMA ZMIENIANIE DOWOLNEGO ZEWNĘTRZNEGO ŹRÓDŁA DANYCH ZMIENIANIE DOWOLNEGO FORMATU PLIKU ZEWNĘTRZNEGO |
W poniższej tabeli podsumowano uprawnienia operacji odczytu programu PolyBase z opcją zewnętrznego źródła danych:
Operacja | Uprawnienia |
---|---|
Przeczytaj | CREATE TABLE ZMIENIAJ DOWOLNY SCHEMA ZMIENIANIE DOWOLNEGO ZEWNĘTRZNEGO ŹRÓDŁA DANYCH ZMIENIANIE DOWOLNEGO FORMATU PLIKU ZEWNĘTRZNEGO |
Za pomocą tego łącznika można odczytywać przez interfejs API źródła danych w notesach języka Scala, Python, SQL i R.
Skala
// Get some data from an Azure Synapse table.
val df: DataFrame = spark.read
.format("com.databricks.spark.sqldw")
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
.option("externalDataSource", "<your-pre-provisioned-data-source>")
.option("dbTable", "<your-table-name>")
.load()
Pyton
# Get some data from an Azure Synapse table.
df = spark.read \
.format("com.databricks.spark.sqldw") \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>") \
.option("externalDataSource", "<your-pre-provisioned-data-source>") \
.option("dbTable", "<your-table-name>") \
.load()
SQL
-- Read data using SQL.
CREATE TABLE example_table_in_spark_read
USING com.databricks.spark.sqldw
OPTIONS (
url 'jdbc:sqlserver://<the-rest-of-the-connection-string>',
forwardSparkAzureStorageCredentials 'true',
dbTable '<your-table-name>',
tempDir 'abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>',
externalDataSource '<your-pre-provisioned-data-source>'
);
R
# Get some data from an Azure Synapse table.
df <- read.df(
source = "com.databricks.spark.sqldw",
url = "jdbc:sqlserver://<the-rest-of-the-connection-string>",
forward_spark_azure_storage_credentials = "true",
dbTable = "<your-table-name>",
tempDir = "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>"
externalDataSource = "<your-pre-provisioned-data-source>")
Wymagane uprawnienia usługi Azure Synapse dla instrukcji COPY
Uwaga
Dostępne w środowisku Databricks Runtime 7.0 lub nowszym.
W przypadku korzystania z instrukcji COPY
łącznik usługi Azure Synapse wymaga, aby użytkownik połączenia JDBC miał uprawnienia do uruchamiania następujących poleceń w połączonym wystąpieniu usługi Azure Synapse:
Jeśli tabela docelowa nie istnieje w usłudze Azure Synapse, oprócz powyższego polecenia wymagane jest uprawnienie do uruchomienia następującego polecenia:
Poniższa tabela zawiera podsumowanie uprawnień do zapisów wsadowych i strumieniowych z użyciem COPY
.
Operacja | Uprawnienia (wstawianie do istniejącej tabeli) | Uprawnienia (wstawianie do nowej tabeli) |
---|---|---|
Zapis wsadowy | ADMINISTROWANIE ZBIORCZYMI OPERACJAMI BAZY DANYCH INSERT |
ADMINISTROWANIE ZBIORCZYMI OPERACJAMI BAZY DANYCH INSERT CREATE TABLE ALTER ON SCHEMA :: dbo |
Zapis przesyłany strumieniowo | ADMINISTROWANIE ZBIORCZYMI OPERACJAMI BAZY DANYCH INSERT |
ADMINISTROWANIE ZBIORCZYMI OPERACJAMI BAZY DANYCH INSERT CREATE TABLE ALTER ON SCHEMA :: dbo |
Parametry
Mapa parametrów lub OPTIONS
podana w usłudze Spark SQL obsługuje następujące ustawienia:
Parametr | Wymagane | Wartość domyślna | Uwagi |
---|---|---|---|
dbTable |
Tak, chyba że query określono |
Brak wartości domyślnej | Tabela do utworzenia lub odczytania w usłudze Azure Synapse. Ten parametr jest wymagany podczas zapisywania danych z powrotem do usługi Azure Synapse. Możesz również użyć {SCHEMA NAME}.{TABLE NAME} polecenia , aby uzyskać dostęp do tabeli w danym schemacie. Jeśli nie podano nazwy schematu, zostanie użyty domyślny schemat skojarzony z użytkownikiem JDBC.Wcześniej obsługiwany dbtable wariant jest przestarzały i zostanie zignorowany w przyszłych wersjach. Zamiast tego użyj nazwy "camel case". |
query |
Tak, chyba że dbTable określono |
Brak wartości domyślnej | Zapytanie do odczytu w usłudze Azure Synapse. W przypadku tabel, o których mowa w zapytaniu, można również użyć {SCHEMA NAME}.{TABLE NAME} , aby uzyskać dostęp do tabeli w danym schemacie. Jeśli nie podano nazwy schematu, zostanie użyty domyślny schemat skojarzony z użytkownikiem JDBC. |
user |
Nie. | Brak wartości domyślnej | Nazwa użytkownika usługi Azure Synapse. Należy używać razem z opcją password . Można go używać tylko wtedy, gdy użytkownik i hasło nie zostaną przekazane w adresie URL. Przekazanie obu spowoduje wystąpienie błędu. |
password |
Nie. | Brak wartości domyślnej | Hasło usługi Azure Synapse. Należy używać razem z opcją user . Można go używać tylko wtedy, gdy użytkownik i hasło nie zostaną przekazane w adresie URL. Przekazanie obu spowoduje wystąpienie błędu. |
url |
Tak | Brak wartości domyślnej | Adres URL JDBC z sqlserver ustawionym jako podprotocol. Zaleca się używanie parametry połączenia udostępnianych przez witrynę Azure Portal. Ustawienie encrypt=true jest zdecydowanie zalecane, ponieważ umożliwia szyfrowanie SSL połączenia JDBC. Jeśli user i password są ustawione oddzielnie, nie musisz ich uwzględniać w adresie URL. |
jdbcDriver |
Nie. | Określone przez podprotokół adresu URL JDBC | Nazwa klasy sterownika JDBC do użycia. Ta klasa musi znajdować się w classpath. W większości przypadków nie należy określać tej opcji, ponieważ odpowiednia nazwa klasy sterownika powinna być automatycznie określana przez podprotocol adresu URL JDBC. Wcześniej obsługiwany jdbc_driver wariant jest przestarzały i zostanie zignorowany w przyszłych wersjach. Zamiast tego użyj nazwy "camel case". |
tempDir |
Tak | Brak wartości domyślnej | URI abfss . Zalecamy użycie dedykowanego kontenera usługi Blob Storage dla usługi Azure Synapse.Wcześniej obsługiwany tempdir wariant jest przestarzały i zostanie zignorowany w przyszłych wersjach. Zamiast tego użyj nazwy "camel case". |
tempFormat |
Nie. | PARQUET |
Format, w którym zapisywać pliki tymczasowe w magazynie obiektów blob podczas zapisywania w usłudze Azure Synapse. Wartości domyślne to PARQUET ; żadne inne wartości nie są teraz dozwolone. |
tempCompression |
Nie. | SNAPPY |
Algorytm kompresji, który ma być używany do kodowania/dekodowania tymczasowego zarówno przez platformę Spark, jak i usługę Azure Synapse. Obecnie obsługiwane wartości to: UNCOMPRESSED , SNAPPY i GZIP . |
forwardSparkAzureStorageCredentials |
Nie. | fałsz | Jeśli true , biblioteka automatycznie odnajduje poświadczenia używane przez Spark do nawiązywania połączenia z kontenerem usługi Blob storage i dostarcza te poświadczenia do usługi Azure Synapse za pośrednictwem protokołu JDBC. Te poświadczenia są wysyłane w ramach zapytania JDBC. Dlatego zdecydowanie zaleca się włączenie szyfrowania SSL połączenia JDBC podczas korzystania z tej opcji.Bieżąca wersja łącznika usługi Azure Synapse wymaga, aby dokładnie jeden z forwardSparkAzureStorageCredentials , enableServicePrincipalAuth lub useAzureMSI był jawnie ustawiony na true .Wcześniej obsługiwany forward_spark_azure_storage_credentials wariant jest przestarzały i zostanie zignorowany w przyszłych wersjach. Zamiast tego użyj nazwy "camel case". |
useAzureMSI |
Nie. | fałsz | Jeśli true , biblioteka określi IDENTITY = 'Managed Service Identity' , ale nie określi SECRET dla tworzonych przez nią poświadczeń o określonym zakresie bazy danych.Bieżąca wersja łącznika usługi Azure Synapse wymaga, aby dokładnie jeden z forwardSparkAzureStorageCredentials , enableServicePrincipalAuth lub useAzureMSI był jawnie ustawiony na true . |
enableServicePrincipalAuth |
Nie. | fałsz | Jeśli true biblioteka użyje podanych poświadczeń jednostki usługi, aby nawiązać połączenie z kontem usługi Azure Storage i usługą Azure Synapse Analytics za pośrednictwem protokołu JDBC.Bieżąca wersja łącznika usługi Azure Synapse wymaga, aby dokładnie jeden z forwardSparkAzureStorageCredentials , enableServicePrincipalAuth lub useAzureMSI był jawnie ustawiony na true . |
tableOptions |
Nie. |
CLUSTERED COLUMNSTORE INDEX , DISTRIBUTION = ROUND_ROBIN |
Ciąg służący do określania opcji tabeli podczas tworzenia tabeli usługi Azure Synapse ustawionej za pomocą polecenia dbTable . Ten ciąg jest przekazywany dosłownie do WITH klauzuli instrukcji SQL wydanej CREATE TABLE dla usługi Azure Synapse.Wcześniej obsługiwany table_options wariant jest przestarzały i zostanie zignorowany w przyszłych wersjach. Zamiast tego użyj nazwy "camel case". |
preActions |
Nie. | Brak wartości domyślnej (pusty ciąg) | Oddzielona ; lista poleceń SQL do wykonania w usłudze Azure Synapse przed zapisaniem danych w wystąpieniu usługi Azure Synapse. Te polecenia SQL muszą być prawidłowymi poleceniami akceptowanymi przez usługę Azure Synapse.Jeśli którykolwiek z tych poleceń zakończy się niepowodzeniem, jest traktowany jako błąd, a operacja zapisu nie jest wykonywana. |
postActions |
Nie. | Brak wartości domyślnej (pusty ciąg) |
; Oddzielona lista poleceń SQL do wykonania w usłudze Azure Synapse po pomyślnym zapisaniu danych przez łącznik w wystąpieniu usługi Azure Synapse. Te polecenia SQL muszą być prawidłowymi poleceniami akceptowanymi przez usługę Azure Synapse.Jeśli którykolwiek z tych poleceń zakończy się niepowodzeniem, zostanie on potraktowany jako błąd i otrzymasz wyjątek po pomyślnym zapisaniu danych w wystąpieniu usługi Azure Synapse. |
maxStrLength |
Nie. | 256 |
StringType na platformie Spark jest mapowany na NVARCHAR(maxStrLength) typ w usłudze Azure Synapse. Za pomocą maxStrLength można ustawić długość ciągu dla wszystkich kolumn typu NVARCHAR(maxStrLength) , które znajdują się w tabeli o nazwie dbTable w usłudze Azure Synapse.Wcześniej obsługiwany maxstrlength wariant jest przestarzały i zostanie zignorowany w przyszłych wersjach. Zamiast tego użyj nazwy "camel case". |
checkpointLocation |
Tak | Brak wartości domyślnej | Lokalizacja w systemie plików DBFS, która będzie używana przez Strukturalne Przesyłanie Strumieniowe do zapisywania metadanych i informacji o punktach kontrolnych. Zobacz Odzyskiwanie po awariach za pomocą punktów kontrolnych w przewodniku programowania dotyczącego obsługiwania strumieni danych w trybie strukturalnym. |
numStreamingTempDirsToKeep |
Nie. | 0 | Wskazuje, ile (najnowszych) katalogów tymczasowych należy utrzymać do okresowego czyszczenia mikropartii w przetwarzaniu strumieniowym. Po ustawieniu 0 opcji usuwanie katalogu jest wyzwalane natychmiast po zatwierdzeniu mikropartii, inaczej przechowywana jest określona liczba najnowszych mikropartii, a pozostałe katalogi zostają usunięte. Użyj polecenia -1 , aby wyłączyć okresowe czyszczenie. |
applicationName |
Nie. | Databricks-User-Query |
Tag połączenia dla każdego zapytania. Jeśli wartość nie zostanie określona lub jest pustym ciągiem, domyślna wartość tagu zostanie dodana do adresu URL JDBC. Wartość domyślna uniemożliwia narzędziu do monitorowania usługi Azure DB zgłaszanie fałszywych alertów iniekcji SQL względem zapytań. |
maxbinlength |
Nie. | Brak wartości domyślnej | Steruj długością BinaryType kolumn. Ten parametr jest tłumaczony jako VARBINARY(maxbinlength) . |
identityInsert |
Nie. | fałsz | Ustawienie true umożliwia włączenie trybu IDENTITY_INSERT , który wstawia wartość DataFrame w kolumnie tożsamości tabeli Azure Synapse.Zobacz Jawne wstawianie wartości do kolumny IDENTITY. |
externalDataSource |
Nie. | Brak wartości domyślnej | Wstępnie przygotowane zewnętrzne źródło danych do odczytu danych z Azure Synapse. Zewnętrzne źródło danych może być używane tylko z technologią PolyBase i usuwa wymaganie uprawnień CONTROL, ponieważ łącznik nie musi tworzyć poświadczeń o określonym zakresie i zewnętrznego źródła danych w celu załadowania danych. Na przykład użycie i lista uprawnień wymaganych podczas korzystania z zewnętrznego źródła danych można znaleźć w temacie Required Azure Synapse permissions for PolyBase with the external data source option (Wymagane uprawnienia usługi Azure Synapse dla programu PolyBase z opcją zewnętrznego źródła danych). |
maxErrors |
Nie. | 0 | Maksymalna liczba wierszy, które można odrzucić podczas operacji odczytu i zapisu przed anulowaniem operacji ładowania (PolyBase lub COPY). Odrzucone wiersze zostaną zignorowane. Jeśli na przykład dwa z dziesięciu rekordów zawierają błędy, zostaną przetworzone tylko osiem rekordów. Zapoznaj się z dokumentacją REJECT_VALUE w CREATE EXTERNAL TABLE oraz dokumentacją MAXERRORS w COPY. |
Uwaga
-
tableOptions
, ,preActions
postActions
imaxStrLength
są istotne tylko podczas zapisywania danych z usługi Azure Databricks do nowej tabeli w usłudze Azure Synapse. -
externalDataSource
Jest istotne tylko podczas odczytywania danych z usługi Azure Synapse i zapisywania danych z usługi Azure Databricks do nowej tabeli w usłudze Azure Synapse za pomocą semantyki polyBase. Nie należy określać innych typów uwierzytelniania magazynu podczas używaniaexternalDataSource
takich jakforwardSparkAzureStorageCredentials
lubuseAzureMSI
. -
checkpointLocation
inumStreamingTempDirsToKeep
są istotne tylko w przypadku zapisów przesyłanych strumieniowo z usługi Azure Databricks do nowej tabeli w usłudze Azure Synapse. - Mimo że wszystkie nazwy opcji źródła danych są bez uwzględniania wielkości liter, zalecamy określenie ich w przypadku "camel case" w celu uzyskania jasności.
Wypychanie zapytań do usługi Azure Synapse
Łącznik usługi Azure Synapse implementuje zestaw reguł optymalizacji, aby wypchnąć następujące operatory do usługi Azure Synapse:
Filter
Project
Limit
Operatory Project
i Filter
obsługują następujące wyrażenia:
- Większość operatorów logiki boolowskiej
- Porównania
- Podstawowe operacje arytmetyczne
- Rzutowanie liczbowe i ciągowe
Dla operatora Limit
, wypychanie jest obsługiwane tylko wtedy, gdy nie określono żadnego porządku. Na przykład:
SELECT TOP(10) * FROM table
, ale nie SELECT TOP(10) * FROM table ORDER BY col
.
Uwaga
Łącznik usługi Azure Synapse nie wypycha wyrażeń działających na ciągach, datach lub znacznikach czasu.
Wypychanie zapytań utworzone za pomocą łącznika usługi Azure Synapse jest domyślnie włączone.
Można ją wyłączyć, ustawiając wartość spark.databricks.sqldw.pushdown
false
.
Tymczasowe zarządzanie danymi
Łącznik usługi Azure Synapse nie usuwa plików tymczasowych tworzonych w kontenerze usługi Blob Storage.
Dlatego zalecamy okresowe usuwanie plików tymczasowych w lokalizacji podanej tempDir
przez użytkownika.
Aby ułatwić oczyszczanie danych, łącznik usługi Azure Synapse nie przechowuje plików danych bezpośrednio w obszarze tempDir
, ale zamiast tego tworzy podkatalog formularza: <tempDir>/<yyyy-MM-dd>/<HH-mm-ss-SSS>/<randomUUID>/
.
Można skonfigurować zadania okresowe (przy użyciu funkcji zadań lakeflow lub w inny sposób), aby rekursywnie usuwać wszystkie podkatalogi starsze niż określony próg (na przykład 2 dni), przy założeniu, że nie można uruchamiać zadań platformy Spark dłużej niż ten próg.
Prostszą alternatywą jest okresowe usunięcie całego kontenera i utworzenie nowego o tej samej nazwie. Wymaga to użycia dedykowanego kontenera dla danych tymczasowych utworzonych przez łącznik usługi Azure Synapse i że można znaleźć przedział czasu, w którym można zagwarantować, że żadne zapytania z udziałem łącznika nie są uruchomione.
Tymczasowe zarządzanie obiektami
Łącznik usługi Azure Synapse automatyzuje transfer danych pomiędzy klastrem usługi Azure Databricks a wystąpieniem usługi Azure Synapse.
W celu odczytywania danych z tabeli usługi Azure Synapse lub wykonywania zapytań lub zapisywania danych w tabeli usługi Azure Synapse łącznik usługi Azure Synapse tworzy obiekty tymczasowe, w tym DATABASE SCOPED CREDENTIAL
, EXTERNAL DATA SOURCE
, EXTERNAL FILE FORMAT
i EXTERNAL TABLE
w tle. Te obiekty są aktywne tylko przez cały czas trwania odpowiedniego zadania platformy Spark i powinny zostać automatycznie usunięte.
Jeśli klaster uruchamia zapytanie przy użyciu łącznika usługi Azure Synapse, a proces sterownika Spark ulegnie awarii lub zostanie wymuszony do ponownego uruchomienia, albo jeśli klaster zostanie wymuszony do zakończenia lub ponownego uruchomienia, obiekty tymczasowe mogą nie zostać usunięte.
Aby ułatwić identyfikację i ręczne usuwanie tych obiektów, łącznik Azure Synapse poprzedza nazwy wszystkich pośrednich obiektów tymczasowych utworzonych w wystąpieniu Azure Synapse znacznikiem w formacie: tmp_databricks_<yyyy_MM_dd_HH_mm_ss_SSS>_<randomUUID>_<internalObject>
.
Zalecamy okresowe wyszukiwanie ujawnionych obiektów przy użyciu zapytań, takich jak:
SELECT * FROM sys.database_scoped_credentials WHERE name LIKE 'tmp_databricks_%'
SELECT * FROM sys.external_data_sources WHERE name LIKE 'tmp_databricks_%'
SELECT * FROM sys.external_file_formats WHERE name LIKE 'tmp_databricks_%'
SELECT * FROM sys.external_tables WHERE name LIKE 'tmp_databricks_%'
Zarządzanie tabelami punktów kontrolnych przesyłania strumieniowego
Łącznik usługi Azure Synapse nie usuwa tabeli punktów kontrolnych przesyłania strumieniowego utworzonej podczas uruchamiania nowego zapytania strumieniowego.
To zachowanie jest zgodne z checkpointLocation
w DBFS. Dlatego zalecamy okresowe usuwanie tabel punktów kontrolnych w tym samym czasie co usuwanie lokalizacji punktów kontrolnych w systemie plików DBFS dla zapytań, które nie będą uruchamiane w przyszłości lub mają już usuniętą lokalizację punktu kontrolnego.
Domyślnie wszystkie tabele punktów kontrolnych mają nazwę <prefix>_<query-id>
, gdzie <prefix>
jest konfigurowalnym prefiksem z wartością databricks_streaming_checkpoint
domyślną i query_id
jest identyfikatorem zapytania przesyłania strumieniowego z usuniętymi znakami _
. Aby znaleźć wszystkie tabele punktów kontrolnych dla nieaktualnych lub usuniętych zapytań przesyłanych strumieniowo, uruchom zapytanie:
SELECT * FROM sys.tables WHERE name LIKE 'databricks_streaming_checkpoint%'
Prefiks można skonfigurować przy użyciu opcji spark.databricks.sqldw.streaming.exactlyOnce.checkpointTableNamePrefix
konfiguracji Spark SQL .
Często zadawane pytania
Wystąpił błąd podczas korzystania z łącznika usługi Azure Synapse. Jak sprawdzić, czy ten błąd pochodzi z usługi Azure Synapse lub azure Databricks?
Aby ułatwić debugowanie błędów, wszelkie wyjątki zgłaszane przez kod specyficzny dla łącznika usługi Azure Synapse są opakowane w wyjątek rozszerzający cechę SqlDWException
. Wyjątki wprowadzają również następujące rozróżnienie.
-
SqlDWConnectorException
reprezentuje błąd zgłaszany przez łącznik usługi Azure Synapse -
SqlDWSideException
reprezentuje błąd rzucany przez połączone wystąpienie usługi Azure Synapse
Co należy zrobić, jeśli moje zapytanie nie powiodło się z powodu błędu "Nie znaleziono klucza dostępu w konficie sesji lub globalnej conf usługi Hadoop"?
Ten błąd oznacza, że łącznik usługi Azure Synapse nie może odnaleźć klucza dostępu do konta magazynu w konfiguracji sesji notesu lub globalnej konfiguracji usługi Hadoop dla konta magazynu określonego w parametrze tempDir
.
Zobacz Użycie (Batch), aby zapoznać się z przykładami prawidłowego konfigurowania dostępu do konta magazynowego. Jeśli tabela Spark jest tworzona przy użyciu łącznika Azure Synapse, musisz nadal podać poświadczenia dostępu do konta magazynu, aby móc odczytywać lub zapisywać dane w tabeli Spark.
Czy mogę użyć sygnatury dostępu współdzielonego (SAS), aby uzyskać dostęp do kontenera usługi Blob Storage określonego za pomocą parametru tempDir
?
Usługa Azure Synapse nie obsługuje uzyskiwania dostępu do Blob Storage przy użyciu sygnatury dostępu współdzielonego (SAS). W związku z tym łącznik Azure Synapse nie obsługuje SAS do uzyskania dostępu do kontenera w usłudze Blob Storage określonego przez .
Utworzyłem tabelę Spark przy użyciu łącznika Azure Synapse z opcją dbTable
, zapisałem do tej tabeli dane, a następnie usunąłem tę tabelę Spark. Czy tabela utworzona po stronie usługi Azure Synapse zostanie porzucona?
Nr. Usługa Azure Synapse jest uważana za zewnętrzne źródło danych. Tabela w usłudze Azure Synapse, której nazwa została ustawiona przez dbTable
, nie zostanie usunięta, gdy tabela Spark zostanie usunięta.
Dlaczego podczas zapisywania parametru DateFrame w usłudze Azure Synapse muszę użyć ciągu .option("dbTable", tableName).save()
zamiast po prostu .saveAsTable(tableName)
?
Wynika to z faktu, że chcemy wyjaśnić następujące rozróżnienie: .option("dbTable", tableName)
odwołuje się do tabeli bazy danych (czyli usługi Azure Synapse), natomiast .saveAsTable(tableName)
odwołuje się do tabeli Spark. W rzeczywistości można nawet połączyć te dwa elementy: df.write. ... .option("dbTable", tableNameDW).saveAsTable(tableNameSpark)
, która tworzy tabelę w usłudze Azure Synapse o nazwie tableNameDW
, oraz tabelę zewnętrzną na platformie Spark o nazwie tableNameSpark
, która jest wspierana przez tabelę usługi Azure Synapse.
Ostrzeżenie
Należy pamiętać o następującej różnicy między elementami .save()
i .saveAsTable()
:
- W przypadku
df.write. ... .option("dbTable", tableNameDW).mode(writeMode).save()
,writeMode
działa na tabeli Azure Synapse zgodnie z oczekiwaniami. - W przypadku
df.write. ... .option("dbTable", tableNameDW).mode(writeMode).saveAsTable(tableNameSpark)
,writeMode
działa na tabeli Spark, natomiasttableNameDW
jest nadpisywane, jeśli już istnieje w usłudze Azure Synapse.
To zachowanie nie różni się od zapisywania do dowolnego innego źródła danych. Jest to tylko zastrzeżenie interfejsu API Spark DataFrameWriter.