Verbinden von Azure Databricks und Azure Synapse mit PolyBase (Legacy)
Wichtig
Diese Dokumentation wurde eingestellt und wird unter Umständen nicht aktualisiert. Die in diesem Inhalt erwähnten Produkte, Dienste oder Technologien werden nicht mehr unterstützt. Weitere Informationen finden Sie unter Abfragen von Daten in Azure Synapse Analytics.
Databricks empfiehlt die Verwendung der COPY
-Standardfunktionalität mit Azure Data Lake Storage Gen2 für Verbindungen mit Azure Synapse. Dieser Artikel enthält Legacydokumentation zu PolyBase und Blob Storage.
Azure Synapse Analytics (ehemals SQL Data Warehouse) ist ein cloudbasiertes Enterprise Data Warehouse mit MPP (Massively Parallel Processing) zur schnellen Ausführung komplexer Abfragen für mehrere Petabytes an Daten. Verwenden Sie Azure als Schlüsselkomponente einer Big Data-Lösung. Importieren Sie große Datenmengen mit einfachen PolyBase-T-SQL-Abfragen oder der COPY-Anweisung in Azure, und nutzen Sie dann die Vorteile von MPP für aufwendige Analysen. Im Zuge von Integrationen und Analysen wird das Data Warehouse zur Single Version of Truth, auf die sich Ihr Unternehmen bei der Gewinnung von Erkenntnissen verlassen kann.
Auf Azure Synapse können Sie über Azure Databricks mithilfe des Azure Synapse-Connectors zugreifen, bei dem es sich um eine Datenquellenimplementierung für Apache Spark handelt, die Azure Blob Storage und PolyBase oder die COPY
-Anweisung in Azure Synapse verwendet, um große Datenmengen effizient zwischen einem Azure Databricks-Cluster und einer Azure Synapse-Instanz zu übertragen.
Sowohl der Azure Databricks-Cluster als auch die Azure Synapse-Instanz greifen auf einen gemeinsamen Blob Storage-Container zu, um Daten zwischen diesen beiden Systemen auszutauschen. In Azure Databricks löst der Azure Synapse-Connector Apache Spark-Aufträge aus, um Daten aus dem Blob Storage-Container zu lesen oder in diesen zu schreiben. In Azure Synapse löst der Azure Synapse-Connector Vorgänge zum Laden und Entladen von Daten mit PolyBase über JDBC aus. In Databricks Runtime 7.0 und höheren Versionen wird COPY
vom Azure Synapse-Connector standardmäßig zum Laden von Daten in Azure Synapse über JDBC verwendet.
Hinweis
COPY
ist nur auf Azure Synapse Gen2-Instanzen verfügbar, die eine bessere Leistung bieten. Wenn Ihre Datenbank noch Gen1-Instanzen verwendet, sollten Sie die Datenbank zu Gen2 migrieren.
Der Azure Synapse-Connector eignet sich besser für ETL als für interaktive Abfragen, da bei jeder Abfrageausführung große Datenmengen in Blob Storage extrahiert werden können. Wenn Sie mehrere Abfragen für dieselbe Azure Synapse-Tabelle durchführen möchten, sollten Sie die extrahierten Daten in einem Format wie Parquet speichern.
Anforderungen
Ein Azure Synapse-Datenbankhauptschlüssel.
Authentifizierung
Für den Azure Synapse-Connector werden drei Arten von Netzwerkverbindungen verwendet:
- Spark-Treiber mit Azure Synapse
- Spark-Treiber und -Executors mit Azure-Speicherkonto
- Azure Synapse für das Azure-Speicherkonto
┌─────────┐
┌─────────────────────────>│ STORAGE │<────────────────────────┐
│ Storage acc key / │ ACCOUNT │ Storage acc key / │
│ Managed Service ID / └─────────┘ OAuth 2.0 / │
│ │ │
│ │ │
│ │ Storage acc key / │
│ │ OAuth 2.0 / │
│ │ │
v v ┌──────v────┐
┌──────────┐ ┌──────────┐ │┌──────────┴┐
│ Synapse │ │ Spark │ ││ Spark │
│ Analytics│<────────────────────>│ Driver │<───────────────>│ Executors │
└──────────┘ JDBC with └──────────┘ Configured └───────────┘
username & password / in Spark
In den folgenden Abschnitten werden die Konfigurationsoptionen für die Authentifizierung der einzelnen Verbindungen beschrieben.
Spark-Treiber für Azure Synapse
Der Spark-Treiber kann eine Verbindung mit Azure Synapse herstellen, indem er zur Authentifizierung JDBC mit einem Benutzernamen und Kennwort oder OAuth 2.0 mit einem Dienstprinzipal verwendet.
Benutzername und Kennwort
Es wird empfohlen, die im Azure-Portal bereitgestellten Verbindungszeichenfolgen für beide Authentifizierungstypen zu verwenden, sodass alle zwischen dem Spark-Treiber und der Azure Synapse-Instanz über die JDBC-Verbindung gesendeten Daten mittels SSL (Secure Sockets Layer) verschlüsselt werden können. Ob die SSL-Verschlüsselung aktiviert ist, können Sie überprüfen, indem Sie in der Verbindungszeichenfolge nach encrypt=true
suchen.
Um dem Spark-Treiber Zugriff auf Azure Synapse zu ermöglichen, empfehlen wir, die Option Azure-Diensten und -Ressourcen den Zugriff auf diesen Arbeitsbereich gestatten auf EIN festzulegen. Sie finden diese über das Azure-Portal im Bereich „Netzwerke“ unter „Sicherheit“ des Azure Synapse-Arbeitsbereichs. Diese Einstellung ermöglicht die Kommunikation über alle Azure-IP-Adressen und alle Azure-Subnetze, sodass Spark-Treiber die Azure Synapse-Instanz erreichen können.
OAuth 2.0 mit einem Dienstprinzipal
Sie können sich bei Azure Synapse Analytics mit einem Dienstprinzipal mit Zugriff auf das zugrunde liegende Speicherkonto authentifizieren. Weitere Informationen zur Verwendung von Dienstprinzipal-Anmeldeinformationen für den Zugriff auf ein Azure-Speicherkonto finden Sie unter herstellen einer Verbindung mit Azure Data Lake Storage Gen2 und Blob Storage. Sie müssen die Option enableServicePrincipalAuth
in den Parametern der Verbindungskonfiguration auf true
festlegen, damit sich der Connector bei einem Dienstprinzipal authentifizieren kann.
Sie können optional auch einen anderen Dienstprinzipal für die Azure Synapse Analytics verwenden. Ein Beispiel, bei dem die Anmeldeinformationen des Dienstprinzipals für das Speicherkonto und die Anmeldeinformationen des optionalen Dienstprinzipals für Synapse konfiguriert werden:
ini
; Defining the Service Principal credentials for the Azure storage account
fs.azure.account.auth.type OAuth
fs.azure.account.oauth.provider.type org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider
fs.azure.account.oauth2.client.id <application-id>
fs.azure.account.oauth2.client.secret <service-credential>
fs.azure.account.oauth2.client.endpoint https://login.microsoftonline.com/<directory-id>/oauth2/token
; Defining a separate set of service principal credentials for Azure Synapse Analytics (If not defined, the connector will use the Azure storage account credentials)
spark.databricks.sqldw.jdbc.service.principal.client.id <application-id>
spark.databricks.sqldw.jdbc.service.principal.client.secret <service-credential>
Scala
// Defining the Service Principal credentials for the Azure storage account
spark.conf.set("fs.azure.account.auth.type", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id", "<application-id>")
spark.conf.set("fs.azure.account.oauth2.client.secret", "<service-credential>")
spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<directory-id>/oauth2/token")
// Defining a separate set of service principal credentials for Azure Synapse Analytics (If not defined, the connector will use the Azure storage account credentials)
spark.conf.set("spark.databricks.sqldw.jdbc.service.principal.client.id", "<application-id>")
spark.conf.set("spark.databricks.sqldw.jdbc.service.principal.client.secret", "<service-credential>")
Python
# Defining the service principal credentials for the Azure storage account
spark.conf.set("fs.azure.account.auth.type", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id", "<application-id>")
spark.conf.set("fs.azure.account.oauth2.client.secret", "<service-credential>")
spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<directory-id>/oauth2/token")
# Defining a separate set of service principal credentials for Azure Synapse Analytics (If not defined, the connector will use the Azure storage account credentials)
spark.conf.set("spark.databricks.sqldw.jdbc.service.principal.client.id", "<application-id>")
spark.conf.set("spark.databricks.sqldw.jdbc.service.principal.client.secret", "<service-credential>")
R
# Load SparkR
library(SparkR)
conf <- sparkR.callJMethod(sparkR.session(), "conf")
# Defining the service principal credentials for the Azure storage account
sparkR.callJMethod(conf, "set", "fs.azure.account.auth.type", "OAuth")
sparkR.callJMethod(conf, "set", "fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
sparkR.callJMethod(conf, "set", "fs.azure.account.oauth2.client.id", "<application-id>")
sparkR.callJMethod(conf, "set", "fs.azure.account.oauth2.client.secret", "<service-credential>")
sparkR.callJMethod(conf, "set", "fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<directory-id>/oauth2/token")
# Defining a separate set of service principal credentials for Azure Synapse Analytics (If not defined, the connector will use the Azure storage account credentials)
sparkR.callJMethod(conf, "set", "spark.databricks.sqldw.jdbc.service.principal.client.id", "<application-id>")
sparkR.callJMethod(conf, "set", "spark.databricks.sqldw.jdbc.service.principal.client.secret", "<service-credential>")
Spark-Treiber und -Executors mit Azure-Speicherkonto
Der Azure-Speichercontainer dient als Vermittler zum Speichern von Massendaten beim Lesen oder Schreiben in Azure Synapse. Spark stellt mithilfe des abfss
-Treibers eine Verbindung zu ADLS Gen2 oder Blob Storage her.
Folgende Authentifizierungsoptionen sind verfügbar:
- Zugriffsschlüssel und Geheimnis für das Speicherkonto
- OAuth 2.0-Authentifizierung. Weitere Informationen zu OAuth 2.0 und Dienstprinzipalen finden Sie unter Zugriff auf Speicher mit einem Dienstprinzipal und Microsoft Entra ID (Azure Active Directory).
In den folgenden Beispielen werden diese beiden Möglichkeiten unter Verwendung eines Zugriffsschlüssels veranschaulicht. Dasselbe gilt für die OAuth 2.0-Konfiguration.
Notebook-Sitzungskonfiguration (bevorzugt)
Bei diesem Ansatz wird der Zugriffsschlüssel für das Konto in der Sitzungskonfiguration festgelegt, die dem Notebook zugeordnet ist, das den Befehl ausgeführt. Diese Konfiguration wirkt sich nicht auf andere, ebenfalls diesem Cluster zugeordnete Notebooks aus. spark
ist das im Notebook bereitgestellte SparkSession
-Objekt.
spark.conf.set(
"fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
"<your-storage-account-access-key>")
Globale Hadoop-Konfiguration
Bei diesem Ansatz wird die globale Hadoop-Konfiguration aktualisiert, die dem von allen Notebooks gemeinsam genutzten SparkContext
-Objekt zugeordnet ist.
Scala
sc.hadoopConfiguration.set(
"fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
"<your-storage-account-access-key>")
Python
hadoopConfiguration
ist nicht in allen Versionen von PySpark verfügbar. Der folgende Befehl basiert zwar auf einigen internen Spark-Versionen, sollte jedoch mit allen PySpark-Versionen funktionieren. Es ist unwahrscheinlich, dass er nicht funktioniert oder sich in Zukunft ändert:
sc._jsc.hadoopConfiguration().set(
"fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
"<your-storage-account-access-key>")
Azure Synapse für das Azure-Speicherkonto
Azure Synapse stellt auch beim Laden und Entladen von temporären Daten eine Verbindung mit einem Speicherkonto her.
Wenn Sie für das Speicherkonto einen Kontoschlüssel und ein Geheimnis eingerichtet haben, können Sie forwardSparkAzureStorageCredentials
auf true
setzen. In diesem Fall erkennt der Azure Synapse-Connector den in der Notebook-Sitzungskonfiguration festgelegten Zugriffsschlüssel des Kontos automatisch und gibt den Zugriffsschlüssel des Speicherkontos an die verbundene Azure Synapse-Instanz weiter, indem er temporäre datenbankweit gültige Anmeldeinformationen für Azure erstellt.
Wenn Sie die ADLS Gen2- mit OAuth 2.0-Authentifizierung verwenden oder die Azure Synapse-Instanz so konfiguriert ist, dass sie über eine verwaltete Dienstidentität (in der Regel zusammen mit einem Setup aus VNet und Dienstendpunkten) verfügt, müssen Sie useAzureMSI
auf true
festlegen. In diesem Fall gibt der Connector IDENTITY = 'Managed Service Identity'
für die Anmeldeinformationen im Datenbankbereich und nicht SECRET
an.
Unterstützung des Streamings
Der Azure Synapse-Connector bietet effiziente und skalierbare Schreibunterstützung für strukturiertes Streaming für Azure Synapse. Dadurch wird eine konsistente Benutzeroberfläche mit Batch-Schreibvorgängen bereitstellt, und für die Übertragung umfangreicher Datenmengen zwischen einem Azure Databricks-Cluster und einer Azure Synapse-Instanz wird PolyBase oder COPY
verwendet. Ähnlich wie bei Batch-Schreibvorgängen ist das Streaming größtenteils für ETL konzipiert und bietet daher eine höhere Latenz, die in einigen Fällen möglicherweise nicht für die Echtzeitdatenverarbeitung geeignet ist.
Fehlertoleranzsemantik
Standardmäßig bietet Azure Synapse Streaming eine End-to-End-Garantie vom Typ Exactly Once für das Schreiben von Daten in eine Azure Synapse-Tabelle, indem der Fortschritt der Abfrage mit einer Kombination aus Prüfpunktspeicherort im DBFS, Prüfpunkttabelle in Azure Synapse und Sperrmechanismus zuverlässig nachverfolgt wird, um sicherzustellen, dass das Streaming alle Arten von Fehlern, Wiederholungen und Abfrageneustarts verarbeiten kann.
Optional können Sie eine weniger restriktive Semantik vom Typ „At-Least-Once“ für Azure Synapse Streaming auswählen, indem Sie die Option spark.databricks.sqldw.streaming.exactlyOnce.enabled
auf false
festlegen. In diesem Fall kann die Datenduplizierung bei zeitweiligen Verbindungsfehlern bei Azure Synapse oder einer unerwarteten Abfragebeendigung auftreten.
Verwendung (Batch)
Sie können diesen Connector über die Datenquellen-API in Scala-, Python-, SQL- und R-Notebooks verwenden.
Scala
// Otherwise, set up the Blob storage account access key in the notebook session conf.
spark.conf.set(
"fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
"<your-storage-account-access-key>")
// Get some data from an Azure Synapse table.
val df: DataFrame = spark.read
.format("com.databricks.spark.sqldw")
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
.option("forwardSparkAzureStorageCredentials", "true")
.option("dbTable", "<your-table-name>")
.load()
// Load data from an Azure Synapse query.
val df: DataFrame = spark.read
.format("com.databricks.spark.sqldw")
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
.option("forwardSparkAzureStorageCredentials", "true")
.option("query", "select x, count(*) as cnt from table group by x")
.load()
// Apply some transformations to the data, then use the
// Data Source API to write the data back to another table in Azure Synapse.
df.write
.format("com.databricks.spark.sqldw")
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
.option("forwardSparkAzureStorageCredentials", "true")
.option("dbTable", "<your-table-name>")
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
.save()
Python
# Otherwise, set up the Blob storage account access key in the notebook session conf.
spark.conf.set(
"fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
"<your-storage-account-access-key>")
# Get some data from an Azure Synapse table.
df = spark.read \
.format("com.databricks.spark.sqldw") \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>") \
.option("forwardSparkAzureStorageCredentials", "true") \
.option("dbTable", "<your-table-name>") \
.load()
# Load data from an Azure Synapse query.
df = spark.read \
.format("com.databricks.spark.sqldw") \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>") \
.option("forwardSparkAzureStorageCredentials", "true") \
.option("query", "select x, count(*) as cnt from table group by x") \
.load()
# Apply some transformations to the data, then use the
# Data Source API to write the data back to another table in Azure Synapse.
df.write \
.format("com.databricks.spark.sqldw") \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("forwardSparkAzureStorageCredentials", "true") \
.option("dbTable", "<your-table-name>") \
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>") \
.save()
SQL
-- Otherwise, set up the Blob storage account access key in the notebook session conf.
SET fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net=<your-storage-account-access-key>;
-- Read data using SQL.
CREATE TABLE example_table_in_spark_read
USING com.databricks.spark.sqldw
OPTIONS (
url 'jdbc:sqlserver://<the-rest-of-the-connection-string>',
forwardSparkAzureStorageCredentials 'true',
dbTable '<your-table-name>',
tempDir 'abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>'
);
-- Write data using SQL.
-- Create a new table, throwing an error if a table with the same name already exists:
CREATE TABLE example_table_in_spark_write
USING com.databricks.spark.sqldw
OPTIONS (
url 'jdbc:sqlserver://<the-rest-of-the-connection-string>',
forwardSparkAzureStorageCredentials 'true',
dbTable '<your-table-name>',
tempDir 'abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>'
)
AS SELECT * FROM table_to_save_in_spark;
R
# Load SparkR
library(SparkR)
# Otherwise, set up the Blob storage account access key in the notebook session conf.
conf <- sparkR.callJMethod(sparkR.session(), "conf")
sparkR.callJMethod(conf, "set", "fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net", "<your-storage-account-access-key>")
# Get some data from an Azure Synapse table.
df <- read.df(
source = "com.databricks.spark.sqldw",
url = "jdbc:sqlserver://<the-rest-of-the-connection-string>",
forward_spark_azure_storage_credentials = "true",
dbTable = "<your-table-name>",
tempDir = "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
# Load data from an Azure Synapse query.
df <- read.df(
source = "com.databricks.spark.sqldw",
url = "jdbc:sqlserver://<the-rest-of-the-connection-string>",
forward_spark_azure_storage_credentials = "true",
query = "select x, count(*) as cnt from table group by x",
tempDir = "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
# Apply some transformations to the data, then use the
# Data Source API to write the data back to another table in Azure Synapse.
write.df(
df,
source = "com.databricks.spark.sqldw",
url = "jdbc:sqlserver://<the-rest-of-the-connection-string>",
forward_spark_azure_storage_credentials = "true",
dbTable = "<your-table-name>",
tempDir = "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
Verwendung (Streaming)
Sie können Daten mithilfe von strukturiertem Streaming in Scala- und Python-Notebooks schreiben.
Scala
// Set up the Blob storage account access key in the notebook session conf.
spark.conf.set(
"fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
"<your-storage-account-access-key>")
// Prepare streaming source; this could be Kafka or a simple rate stream.
val df: DataFrame = spark.readStream
.format("rate")
.option("rowsPerSecond", "100000")
.option("numPartitions", "16")
.load()
// Apply some transformations to the data then use
// Structured Streaming API to continuously write the data to a table in Azure Synapse.
df.writeStream
.format("com.databricks.spark.sqldw")
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
.option("forwardSparkAzureStorageCredentials", "true")
.option("dbTable", "<your-table-name>")
.option("checkpointLocation", "/tmp_checkpoint_location")
.start()
Python
# Set up the Blob storage account access key in the notebook session conf.
spark.conf.set(
"fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
"<your-storage-account-access-key>")
# Prepare streaming source; this could be Kafka or a simple rate stream.
df = spark.readStream \
.format("rate") \
.option("rowsPerSecond", "100000") \
.option("numPartitions", "16") \
.load()
# Apply some transformations to the data then use
# Structured Streaming API to continuously write the data to a table in Azure Synapse.
df.writeStream \
.format("com.databricks.spark.sqldw") \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>") \
.option("forwardSparkAzureStorageCredentials", "true") \
.option("dbTable", "<your-table-name>") \
.option("checkpointLocation", "/tmp_checkpoint_location") \
.start()
Konfiguration
In diesem Abschnitt wird beschrieben, wie die Schreibsemantik für den Connector, erforderliche Berechtigungen und verschiedene Konfigurationsparameter konfiguriert werden.
Inhalt dieses Abschnitts:
- Unterstützte Speichermodi für Batchschreibvorgänge
- Unterstützte Ausgabemodi für Streamingschreibvorgänge
- Schreibsemantik
- Erforderliche Azure Synapse-Berechtigungen für PolyBase
- Erforderliche Azure Synapse-Berechtigungen für die
COPY
-Anweisung - Parameter
- Abfragepushdown in Azure Synapse
- Verwaltung temporärer Daten
- Verwaltung temporärer Objekte
- Verwaltung von Streamingprüfpunkttabellen
Unterstützte Speichermodi für Batchschreibvorgänge
Der Azure Synapse-Connector unterstützt die Speichermodi ErrorIfExists
, Ignore
, Append
und Overwrite
mit dem Standardmodus ErrorIfExists
.
Weitere Informationen zu unterstützten Speichermodi in Apache Spark finden Sie in der Spark SQL-Dokumentation zu Speichermodi.
Unterstützte Ausgabemodi für Streamingschreibvorgänge
Der Azure Synapse-Connector unterstützt die Ausgabemodi Append
und Complete
für Datensatzanfügevorgänge und Aggregationen.
Weitere Informationen zu Ausgabemodi und Kompatibilitätsmatrix finden Sie im Leitfaden für strukturiertes Streaming.
Schreibsemantik
Hinweis
COPY
ist in Databricks Runtime 7.0 und höheren Versionen verfügbar.
Zusätzlich zu PolyBase unterstützt der Azure Synapse-Connector die COPY
-Anweisung. Die COPY
-Anweisung bietet eine bequemere Möglichkeit, Daten in Azure Synapse zu laden, ohne eine externe Tabelle erstellen zu müssen, erfordert weniger Berechtigungen zum Laden von Daten und verbessert die Leistung der Datenerfassung in Azure Synapse.
Standardmäßig erkennt der Connector automatisch die beste Schreibsemantik (COPY
bei einer Ausrichtung auf eine Azure Synapse Gen2-Instanz, andernfalls PolyBase). Sie können die Schreibsemantik auch mit der folgenden Konfiguration angeben:
Scala
// Configure the write semantics for Azure Synapse connector in the notebook session conf.
spark.conf.set("spark.databricks.sqldw.writeSemantics", "<write-semantics>")
Python
# Configure the write semantics for Azure Synapse connector in the notebook session conf.
spark.conf.set("spark.databricks.sqldw.writeSemantics", "<write-semantics>")
SQL
-- Configure the write semantics for Azure Synapse connector in the notebook session conf.
SET spark.databricks.sqldw.writeSemantics=<write-semantics>;
R
# Load SparkR
library(SparkR)
# Configure the write semantics for Azure Synapse connector in the notebook session conf.
conf <- sparkR.callJMethod(sparkR.session(), "conf")
sparkR.callJMethod(conf, "set", "spark.databricks.sqldw.writeSemantics", "<write-semantics>")
wobei <write-semantics>
entweder polybase
entspricht, um PolyBase zu verwenden, oder copy
, um die COPY
-Anweisung zu verwenden.
Erforderliche Azure Synapse-Berechtigungen für PolyBase
Wenn Sie PolyBase verwenden, verlangt der Azure Synapse-Connector, dass der JDBC-Verbindungsbenutzer über die Berechtigung zum Ausführen der folgenden Befehle in der verbundenen Azure Synapse verfügt:
- CREATE DATABASE SCOPED CREDENTIAL
- CREATE EXTERNAL DATA SOURCE
- CREATE EXTERNAL FILE FORMAT
- CREATE EXTERNAL TABLE
Als Voraussetzung für den ersten Befehl erwartet der Connector, dass bereits ein Datenbankhauptschlüssel für die angegebene Azure Synapse vorhanden ist. Ist dies nicht der Fall, können Sie mit dem Befehl CREATE MASTER KEY einen Schlüssel erstellen.
Zudem muss der JDBC-Benutzer über die Berechtigung für den Zugriff auf die erforderlichen Azure Synapse-Tabellen verfügen, um die über dbTable
festgelegten Azure Synapse-Tabellen oder die Tabellen, auf die in query
verwiesen wird, lesen zu können. Zum Zurückschreiben von Daten in eine Azure Synapse-Tabelle, die über dbTable
festgelegt wurde, muss der JDBC-Benutzer über die Berechtigung zum Schreiben in diese Azure Synapse-Tabelle verfügen.
In der folgenden Tabelle sind die erforderlichen Berechtigungen für alle Vorgänge mit PolyBase zusammengefasst:
Vorgang | Berechtigungen | Berechtigungen bei Verwendung einer externen Datenquelle |
---|---|---|
Batchschreibvorgang | CONTROL | Informationen hierzu finden Sie unter Batchschreibvorgang. |
Streamingschreibvorgang | CONTROL | Informationen hierzu finden Sie unter Streamingschreibvorgang. |
Lesen | CONTROL | Informationen hierzu finden Sie unter Lesevorgang. |
Erforderliche Azure Synapse-Berechtigungen für PolyBase mit der Option einer externen Datenquelle
Sie können PolyBase mit einer vorab bereitgestellten externen Datenquelle verwenden. Weitere Informationen zum Parameter externalDataSource
finden Sie unter Parameter.
Wenn Sie PolyBase mit einer vorab bereitgestellten Datenquelle verwenden, verlangt der Azure Synapse-Connector, dass der JDBC-Verbindungsbenutzer über die Berechtigung zum Ausführen der folgenden Befehle in der verbundenen Azure Synapse verfügt:
Zum Erstellen einer externen Datenquelle erstellen Sie zunächst datenbankweit gültige Anmeldeinformationen. Unter den folgenden Links wird beschrieben, wie Sie bereichsbezogene Anmeldeinformationen für Dienstprinzipale und eine externe Datenquelle für einen ABFS-Speicherort erstellen:
Hinweis
Der Speicherort der externen Datenquelle muss auf einen Container verweisen. Der Connector funktioniert nicht, wenn der Speicherort ein Verzeichnis in einem Container ist.
In der folgenden Tabelle sind die Berechtigungen für PolyBase-Schreibvorgänge mit der Option einer externen Datenquelle zusammengefasst:
Vorgang | Berechtigungen (in eine vorhandene Tabelle einfügen) | Berechtigungen (in eine neue Tabelle einfügen) |
---|---|---|
Batchschreibvorgang | ADMINISTER DATABASE BULK OPERATIONS INSERT CREATE TABLE ALTER ANY SCHEMA ALTER ANY EXTERNAL DATA SOURCE ALTER ANY EXTERNAL FILE FORMAT |
ADMINISTER DATABASE BULK OPERATIONS INSERT CREATE TABLE ALTER ANY SCHEMA ALTER ANY EXTERNAL DATA SOURCE ALTER ANY EXTERNAL FILE FORMAT |
Streamingschreibvorgang | ADMINISTER DATABASE BULK OPERATIONS INSERT CREATE TABLE ALTER ANY SCHEMA ALTER ANY EXTERNAL DATA SOURCE ALTER ANY EXTERNAL FILE FORMAT |
ADMINISTER DATABASE BULK OPERATIONS INSERT CREATE TABLE ALTER ANY SCHEMA ALTER ANY EXTERNAL DATA SOURCE ALTER ANY EXTERNAL FILE FORMAT |
In der folgenden Tabelle sind die Berechtigungen für PolyBase-Lesevorgänge mit der Option einer externen Datenquelle zusammengefasst:
Vorgang | Berechtigungen |
---|---|
Lesen | CREATE TABLE ALTER ANY SCHEMA ALTER ANY EXTERNAL DATA SOURCE ALTER ANY EXTERNAL FILE FORMAT |
Sie können diesen Connector zum Lesen über die Datenquellen-API in Scala-, Python-, SQL- und R-Notebooks verwenden.
Scala
// Get some data from an Azure Synapse table.
val df: DataFrame = spark.read
.format("com.databricks.spark.sqldw")
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
.option("externalDataSource", "<your-pre-provisioned-data-source>")
.option("dbTable", "<your-table-name>")
.load()
Python
# Get some data from an Azure Synapse table.
df = spark.read \
.format("com.databricks.spark.sqldw") \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>") \
.option("externalDataSource", "<your-pre-provisioned-data-source>") \
.option("dbTable", "<your-table-name>") \
.load()
SQL
-- Read data using SQL.
CREATE TABLE example_table_in_spark_read
USING com.databricks.spark.sqldw
OPTIONS (
url 'jdbc:sqlserver://<the-rest-of-the-connection-string>',
forwardSparkAzureStorageCredentials 'true',
dbTable '<your-table-name>',
tempDir 'abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>',
externalDataSource '<your-pre-provisioned-data-source>'
);
R
# Get some data from an Azure Synapse table.
df <- read.df(
source = "com.databricks.spark.sqldw",
url = "jdbc:sqlserver://<the-rest-of-the-connection-string>",
forward_spark_azure_storage_credentials = "true",
dbTable = "<your-table-name>",
tempDir = "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>"
externalDataSource = "<your-pre-provisioned-data-source>")
Erforderliche Azure Synapse-Berechtigungen für die COPY
-Anweisung
Hinweis
Verfügbar in Databricks Runtime 7.0 und höher
Wenn Sie die COPY
-Anweisung verwenden, verlangt der Azure Synapse-Connector, dass der JDBC-Verbindungsbenutzer über die Berechtigung zum Ausführen der folgenden Befehle in der verbundenen Azure Synapse verfügt:
Wenn die Zieltabelle in der Azure Synapse-Instanz nicht vorhanden ist, ist zusätzlich zum obigen Befehl die Berechtigung zum Ausführen des folgenden Befehls erforderlich:
In der folgenden Tabelle sind die Berechtigungen für Batch- und Streamingschreibvorgänge mit COPY
zusammengefasst:
Vorgang | Berechtigungen (in eine vorhandene Tabelle einfügen) | Berechtigungen (in eine neue Tabelle einfügen) |
---|---|---|
Batchschreibvorgang | ADMINISTER DATABASE BULK OPERATIONS INSERT |
ADMINISTER DATABASE BULK OPERATIONS INSERT CREATE TABLE ALTER ON SCHEMA :: dbo |
Streamingschreibvorgang | ADMINISTER DATABASE BULK OPERATIONS INSERT |
ADMINISTER DATABASE BULK OPERATIONS INSERT CREATE TABLE ALTER ON SCHEMA :: dbo |
Parameter
Die Parameterzuordnung oder OPTIONS
in Spark SQL unterstützen die folgenden Einstellungen:
Parameter | Erforderlich | Standard | Notizen |
---|---|---|---|
dbTable |
Ja, es sei denn query ist angegeben |
Kein Standardwert | Die Tabelle, aus der in Azure Synapse erstellt oder gelesen werden soll. Dieser Parameter ist erforderlich, wenn Daten wieder in Azure Synapse gespeichert werden. Sie können auch {SCHEMA NAME}.{TABLE NAME} verwenden, um auf eine Tabelle in einem bestimmten Schema zuzugreifen. Wenn kein Schemaname angegeben wird, wird das Standardschema verwendet, das dem JDBC-Benutzer zugeordnet ist.Die bislang unterstützte dbtable -Variante ist veraltet und wird in zukünftigen Releases ignoriert. Verwenden Sie stattdessen den Namen mit gemischter Groß-/Kleinschreibung. |
query |
Ja, es sei denn dbTable ist angegeben |
Kein Standardwert | Die Abfrage, aus der in Azure Synapse gelesen werden soll. Für Tabellen, auf die in der Abfrage verwiesen wird, können Sie auch {SCHEMA NAME}.{TABLE NAME} verwenden, um auf eine Tabelle in einem bestimmten Schema zuzugreifen. Wenn kein Schemaname angegeben wird, wird das Standardschema verwendet, das dem JDBC-Benutzer zugeordnet ist. |
user |
Nein | Kein Standardwert | Der Azure Synapse-Benutzername. Muss zusammen mit der Option password verwendet werden. Kann nur verwendet werden, wenn der Benutzername und das Kennwort nicht in der URL übergeben werden. Andernfalls tritt ein Fehler auf. |
password |
Nein | Kein Standardwert | Das Azure Synapse-Kennwort. Muss zusammen mit der Option user verwendet werden. Kann nur verwendet werden, wenn der Benutzername und das Kennwort nicht in der URL übergeben werden. Andernfalls tritt ein Fehler auf. |
url |
Ja | Kein Standardwert | Eine JDBC-URL, wobei sqlserver als Unterprotokoll festgelegt ist. Es wird empfohlen, die im Azure-Portal bereitgestellte Verbindungszeichenfolge zu verwenden. EinstellungDie Einstellung encrypt=true wird dringend empfohlen, weil damit die SSL-Verschlüsselung der JDBC-Verbindung aktiviert wird. Wenn user und password separat festgelegt werden, müssen Sie sie nicht in die URL einbinden. |
jdbcDriver |
Nein | Wird durch das Unterprotokoll der JDBC-URL bestimmt. | Der Klassenname des zu verwendenden JDBC-Treibers. Diese Klasse muss im Klassenpfad vorhanden sein. In den meisten Fällen sollte es nicht erforderlich sein, diese Option angeben zu müssen, da der entsprechende Treiberklassenname automatisch durch das Unterprotokoll der JDBC-URL bestimmt werden sollte. Die bislang unterstützte jdbc_driver -Variante ist veraltet und wird in zukünftigen Releases ignoriert. Verwenden Sie stattdessen den Namen mit gemischter Groß-/Kleinschreibung. |
tempDir |
Ja | Kein Standardwert | Ein abfss -URI. Es wird empfohlen, einen dedizierten Blobspeichercontainer für die Azure Synapse-Instanz zu verwenden.Die bislang unterstützte tempdir -Variante ist veraltet und wird in zukünftigen Releases ignoriert. Verwenden Sie stattdessen den Namen mit gemischter Groß-/Kleinschreibung. |
tempFormat |
Nein | PARQUET |
Das Format, in dem temporäre Dateien beim Schreiben in Azure Synapse im Blobspeicher gespeichert werden sollen. Der Standardwert ist PARQUET . Derzeit sind keine anderen Werte zulässig. |
tempCompression |
Nein | SNAPPY |
Der Komprimierungsalgorithmus, der zum Codieren/Decodieren von temporären Daten sowohl von Spark als auch von Azure Synapse verwendet werden soll. Derzeit werden die Werte UNCOMPRESSED , SNAPPY und GZIP unterstützt. |
forwardSparkAzureStorageCredentials |
Nein | false | Wenn der Wert true ist, ermittelt die Bibliothek die Anmeldeinformationen automatisch, die Spark zum Herstellen einer Verbindung mit dem Blob Storage-Container verwendet, und leitet diese Anmeldeinformationen über JDBC an Azure Synapse weiter. Diese Anmeldeinformationen werden als Teil der JDBC-Abfrage gesendet. Daher wird dringend empfohlen, die SSL-Verschlüsselung der JDBC-Verbindung zu aktivieren, wenn Sie diese Option verwenden.In der aktuellen Version des Azure Synapse-Connectors muss (genau) eine der Optionen forwardSparkAzureStorageCredentials , enableServicePrincipalAuth bzw. useAzureMSI explizit auf true festgelegt werden.Die bislang unterstützte forward_spark_azure_storage_credentials -Variante ist veraltet und wird in zukünftigen Releases ignoriert. Verwenden Sie stattdessen den Namen mit gemischter Groß-/Kleinschreibung. |
useAzureMSI |
Nein | false | Wenn der Wert true ist, gibt die Bibliothek IDENTITY = 'Managed Service Identity' und kein SECRET für die datenbankweit gültigen Anmeldeinformationen an, die sie erstellt.In der aktuellen Version des Azure Synapse-Connectors muss (genau) eine der Optionen forwardSparkAzureStorageCredentials , enableServicePrincipalAuth bzw. useAzureMSI explizit auf true festgelegt werden. |
enableServicePrincipalAuth |
Nein | false | Wenn der Wert true ist, verwendet die Bibliothek die bereitgestellten Anmeldeinformationen des Dienstprinzipals, um eine Verbindung mit dem Azure-Speicherkonto und Azure Synapse Analytics über JDBC herzustellen.In der aktuellen Version des Azure Synapse-Connectors muss (genau) eine der Optionen forwardSparkAzureStorageCredentials , enableServicePrincipalAuth bzw. useAzureMSI explizit auf true festgelegt werden. |
tableOptions |
Nein | CLUSTERED COLUMNSTORE INDEX , DISTRIBUTION = ROUND_ROBIN |
Eine Zeichenfolge, die zum Angeben von Tabellenoptionen verwendet wird, wenn die Azure Synapse-Tabellen über dbTable erstellt werden. Diese Zeichenfolge wird wortwörtlich an die WITH -Klausel der CREATE TABLE -Anweisung übergeben, die für Azure Synapse ausgegeben wird.Die bislang unterstützte table_options -Variante ist veraltet und wird in zukünftigen Releases ignoriert. Verwenden Sie stattdessen den Namen mit gemischter Groß-/Kleinschreibung. |
preActions |
Nein | Kein Standardwert (leere Zeichenfolge) | Eine durch ; getrennte Liste mit SQL-Befehlen, die in Azure Synapse ausgeführt werden müssen, bevor Daten in die Azure Synapse-Instanz geschrieben werden können. Diese SQL-Befehle müssen gültige Befehle sein, die von Azure Synapse akzeptiert werden.Wenn einer dieser Befehle fehlschlägt, wird er als Fehler behandelt, und der Schreibvorgang wird nicht ausgeführt. |
postActions |
Nein | Kein Standardwert (leere Zeichenfolge) | Eine durch ; getrennte Liste mit SQL-Befehlen, die in Azure Synapse ausgeführt werden müssen, nachdem der Connector erfolgreich Daten in die Azure Synapse-Instanz geschrieben hat. Diese SQL-Befehle müssen gültige Befehle sein, die von Azure Synapse akzeptiert werden.Wenn einer dieser Befehle fehlschlägt, wird dies als Fehler behandelt, und es wird eine Ausnahme ausgelöst, nachdem die Daten erfolgreich in die Azure Synapse-Instanz geschrieben wurden. |
maxStrLength |
Nein | 256 | StringType in Spark wird dem Typ NVARCHAR(maxStrLength) in Azure Synapse zugeordnet. Sie können maxStrLength verwenden, um die Zeichenfolgenlänge für alle Spalten vom Typ NVARCHAR(maxStrLength) festzulegen, die sich in der Tabelle mit dem NamendbTable in Azure Synapse befinden.Die bislang unterstützte maxstrlength -Variante ist veraltet und wird in zukünftigen Releases ignoriert. Verwenden Sie stattdessen den Namen mit gemischter Groß-/Kleinschreibung. |
checkpointLocation |
Ja | Kein Standardwert | Speicherort in DBFS, der vom strukturierten Streaming zum Schreiben von Metadaten und Prüfpunktinformationen verwendet wird. Weitere Informationen finden Sie im Abschnitt über das Wiederherstellen nach Fehlern mit Prüfpunkten im Programmierleitfaden für strukturiertes Streaming. |
numStreamingTempDirsToKeep |
Nein | 0 | Gibt an, wie viele (aktuelle) temporäre Verzeichnisse für die regelmäßige Bereinigung von Mikrobatches beim Streaming beibehalten werden sollen. Wenn diese Einstellung auf 0 festgelegt ist, wird das Löschen des Verzeichnisses sofort ausgelöst, nachdem der Mikrobatch committet wurde. Andernfalls werden die angegebenen aktuellen Mikrobatches beibehalten und die restlichen Verzeichnisse entfernt. Verwenden Sie -1 , um die regelmäßige Bereinigung zu deaktivieren. |
applicationName |
Nein | Databricks-User-Query |
Das Tag der Verbindung für die einzelnen Abfragen. Wenn kein Wert angegeben wird oder der Wert eine leere Zeichenfolge ist, wird dem Standardwert des Tags die JDBC-URL hinzugefügt. Mit dem Standardwert wird verhindert, dass das Azure DB-Überwachungstool fälschlicherweise Warnungen bei der SQL-Einschleusung für Abfragen ausgibt. |
maxbinlength |
Nein | Kein Standardwert | Steuert die Spaltenlänge von BinaryType -Spalten. Dieser Parameter wird als VARBINARY(maxbinlength) übersetzt. |
identityInsert |
Nein | false | Durch Festlegen auf true wird der IDENTITY_INSERT -Modus aktiviert, durch den ein vom Datenrahmen bereitgestellter Wert in die Identitätsspalte der Azure Synapse-Tabelle eingefügt wird.Weitere Informationen finden Sie unter Explizites Einfügen von Werten in eine IDENTITY-Spalte. |
externalDataSource |
Nein | Kein Standardwert | Eine vorab bereitgestellte externe Datenquelle zum Lesen von Daten aus Azure Synapse. Eine externe Datenquelle kann nur mit PolyBase verwendet werden und entfernt die CONTROL-Berechtigungsanforderung, da der Connector keine bereichsbezogenen Anmeldeinformationen und keine externe Datenquelle zum Laden von Daten erstellen muss. Beispiele für die Verwendung und die Liste der Berechtigungen, die bei Verwendung einer externen Datenquelle erforderlich sind, finden Sie unter Erforderliche Azure Synapse-Berechtigungen für PolyBase mit der Option einer externen Datenquelle. |
maxErrors |
Nein | 0 | Die maximale Anzahl von Zeilen, die während Lese- und Schreibvorgängen abgelehnt werden können, bevor der Ladevorgang (entweder PolyBase oder COPY) abgebrochen wird. Die abgelehnten Zeilen werden ignoriert. Wenn z. B. zwei von zehn Datensätzen Fehler aufweisen, werden nur acht Datensätze verarbeitet. Weitere Informationen finden Sie in REJECT_VALUE der DOKUMENTATION CREATE EXTERNAL TABLEund MAXERRORS in COPY. |
Hinweis
tableOptions
,preActions
,postActions
undmaxStrLength
sind nur relevant, wenn Daten aus Azure Databricks in eine neue Tabelle in Azure Synapse geschrieben werden.externalDataSource
ist nur relevant, wenn Daten aus Azure Synapse gelesen und Daten aus Azure Databricks in eine neue Tabelle in Azure Synapse mit PolyBase-Semantik geschrieben werden. Wenn SieexternalDataSource
wie etwaforwardSparkAzureStorageCredentials
oderuseAzureMSI
verwenden, sollten Sie keine anderen Speicherauthentifizierungstypen angeben.checkpointLocation
undnumStreamingTempDirsToKeep
sind nur beim Streamen von Schreibvorgängen aus Azure Databricks in eine neue Tabelle in Azure Synapse relevant.- Obwohl die Groß-/Kleinschreibung bei Datenquellenoptionsnamen nicht beachtet wird, sollten Sie die Namen aus Gründen der Übersichtlichkeit mit Groß- und Kleinbuchstaben angeben.
Abfrage-Pushdown in Azure Synapse
Der Azure Synapse-Connector implementiert eine Reihe von Optimierungsregeln, um für die folgenden Operatoren in die Azure Synapse einen Pushdown durchzuführen:
Filter
Project
Limit
Die Operatoren Project
und Filter
unterstützen die folgenden Ausdrücke:
- Die meisten booleschen Logikoperatoren
- Vergleiche
- Grundlegende arithmetische Operationen
- Numerische und Zeichenfolgenumwandlungen
Für den Operator Limit
wird Pushdown nur unterstützt, wenn keine Reihenfolge angegeben ist. Beispiele:
SELECT TOP(10) * FROM table
, aber nicht SELECT TOP(10) * FROM table ORDER BY col
.
Hinweis
Der Azure Synapse-Connector führt für Ausdrücke, die für Zeichenfolgen, Datumsangaben oder Zeitstempel verwendet werden, keinen Pushdown aus.
Der im Azure Synapse-Connector integrierte Abfragepushdown ist standardmäßig aktiviert.
Sie können diese Einstellung deaktivieren, indem Sie spark.databricks.sqldw.pushdown
auf false
festlegen.
Verwaltung temporärer Daten
Die vom Azure Synapse-Connector im Blob Storage-Container erstellten temporären Dateien werden vom Connector nicht gelöscht.
Daher sollten Sie temporäre Dateien in dem vom Benutzer angegebenen tempDir
-Verzeichnis regelmäßig löschen.
Zur Vereinfachung der Datenbereinigung speichert der Azure Synapse-Connector Datendateien nicht direkt im Verzeichnis tempDir
, sondern erstellt stattdessen ein Unterverzeichnis im Format <tempDir>/<yyyy-MM-dd>/<HH-mm-ss-SSS>/<randomUUID>/
.
Sie können (mithilfe der Azure Databricks-Funktion Aufträge oder auf andere Weise) regelmäßige Aufträge einrichten, mit denen alle Unterverzeichnisse rekursiv gelöscht werden, die älter sind als ein angegebener Schwellenwert (z. B. 2 Tage), wobei davon ausgegangen wird, dass Spark-Aufträge nicht über diesen Schwellenwert hinaus ausgeführt werden dürfen.
Eine einfachere Alternative besteht darin, in regelmäßigen Abständen den gesamten Container zu löschen und einen neuen Container mit demselben Namen zu erstellen. Dies setzt voraus, dass Sie für die vom Azure Synapse-Connector erstellten temporären Daten einen dedizierten Container verwenden und ein Zeitfenster finden, für das Sie garantieren können, dass keine den Connector betreffenden Abfragen ausgeführt werden.
Verwaltung temporärer Objekte
Mit dem Azure Synapse-Connector wird die Datenübertragung zwischen einem Azure Databricks-Cluster und einer Azure Synapse-Instanz automatisiert.
Zum Lesen von Daten aus einer Azure Synapse-Tabelle oder -Abfrage oder zum Schreiben von Daten in eine Azure Synapse-Tabelle erstellt der Azure Synapse-Connector im Hintergrund temporäre Objekte, wie DATABASE SCOPED CREDENTIAL
, EXTERNAL DATA SOURCE
, EXTERNAL FILE FORMAT
und EXTERNAL TABLE
. Diese Objekte sind nur während der Dauer des entsprechenden Spark-Auftrags vorhanden und sollten danach automatisch gelöscht werden.
Wenn ein Cluster eine Abfrage mit dem Azure Synapse-Connector ausführt und der Spark-Treiberprozess abstürzt oder zwangsweise neu gestartet wird, oder wenn der Cluster zwangsweise beendet oder neu gestartet wird, werden temporäre Objekte möglicherweise nicht gelöscht.
Azure Synapse-Connector stellt den Namen aller temporären Zwischenobjekte, die in der Azure Synapse-Instanz erstellt werden, ein Tag der Form tmp_databricks_<yyyy_MM_dd_HH_mm_ss_SSS>_<randomUUID>_<internalObject>
voran, um die Identifizierung und das manuelle Löschen dieser Objekte zu erleichtern.
Es wird empfohlen, in regelmäßigen Abständen mithilfe von Abfragen wie den folgenden nach verloren gegangenen Objekten zu suchen:
SELECT * FROM sys.database_scoped_credentials WHERE name LIKE 'tmp_databricks_%'
SELECT * FROM sys.external_data_sources WHERE name LIKE 'tmp_databricks_%'
SELECT * FROM sys.external_file_formats WHERE name LIKE 'tmp_databricks_%'
SELECT * FROM sys.external_tables WHERE name LIKE 'tmp_databricks_%'
Verwaltung von Streamingprüfpunkttabellen
Die beim Starten einer neuen Streamingabfrage erstellte Streamingprüfpunkttabelle wird vom Azure Synapse-Connector nicht gelöscht.
Dieses Verhalten entspricht dem checkpointLocation
in DBFS. Daher wird empfohlen, Prüfpunkttabellen regelmäßig zu löschen und gleichzeitig Prüfpunktspeicherorte in DBFS für Abfragen zu entfernen, die in Zukunft nicht mehr ausgeführt werden oder deren Prüfpunktspeicherort bereits entfernt wurde.
Standardmäßig haben alle Prüfpunkttabellen den Namen <prefix>_<query-id>
, wobei <prefix>
ein konfigurierbares Präfix mit dem Standardwert databricks_streaming_checkpoint
und query_id
eine Streamingabfrage-ID ist, bei der die Zeichen _
entfernt wurden. Führen Sie die folgende Abfrage aus, um alle Prüfpunkttabellen für veraltete oder gelöschte Streamingabfragen zu suchen:
SELECT * FROM sys.tables WHERE name LIKE 'databricks_streaming_checkpoint%'
Sie können das Präfix mit der Spark SQL-Konfigurationsoption spark.databricks.sqldw.streaming.exactlyOnce.checkpointTableNamePrefix
konfigurieren.
Häufig gestellte Fragen (FAQ)
Bei Verwendung des Azure Synapse-Connectors wurde ein Fehler angezeigt. Wie kann ich feststellen, ob dieser Fehler von Azure Synapse oder Azure Databricks stammt?
Zur Unterstützung bei der Fehlersuche wird jede Ausnahme, die durch Code ausgelöst wird, der für den Azure Synapse-Connector spezifisch ist, von einer Ausnahme umschlossen, mit der das Merkmal SqlDWException
erweitert wird. Des weiteren wird bei Ausnahmen wie folgt unterschieden:
SqlDWConnectorException
stellt einen Fehler dar, der vom Azure Synapse-Connector ausgelöst wird.SqlDWSideException
stellt einen Fehler dar, der von der verbundenen Azure Synapse-Instanz ausgelöst wird.
Was soll ich tun, wenn bei meiner Abfrage die Fehlermeldung „No access key found in the session conf or the global Hadoop conf“ (Kein Zugriffsschlüssel in der Sitzungskonfiguration oder in der globalen Hadoop-Konfiguration gefunden) angezeigt wird?
Diese Fehlermeldung besagt, dass der Zugriffsschlüssel des Speicherkontos in der Notebook-Sitzungskonfiguration oder in der globalen Hadoop-Konfiguration für das in tempDir
angegebene Speicherkonto vom Azure Synapse-Connector nicht gefunden werden konnte.
Beispiele für die ordnungsgemäße Konfiguration des Speicherkontos finden Sie unter Verwendung (Batch). Wenn eine Spark-Tabelle mithilfe des Azure Synapse-Connectors erstellt wird, müssen Sie dennoch die Anmeldeinformationen für den Speicherkontozugriff angeben, damit die Spark-Tabelle gelesen bzw. beschrieben werden kann.
Kann ich für den Zugriff auf den im tempDir
angegebenen Blob Storage-Container Shared Access Signature (SAS) verwenden?
Von Azure Synapse wird die Verwendung von SAS für den Zugriff auf Blob Storage nicht unterstützt. Daher wird vom Azure Synapse-Connector auch SAS nicht für den Zugriff auf den im tempDir
angegebenen Blob Storage-Container unterstützt.
Ich habe mit dem Azure Synapse-Connector eine Spark-Tabelle mit der Option dbTable
erstellt, einige Daten in diese Spark-Tabelle geschrieben und diese Spark-Tabelle danach gelöscht. Wird die auf der Azure Synapse-Instanz erstellte Tabelle gelöscht?
Nein. Azure Synapse wird als externe Datenquelle betrachtet. Die Azure Synapse-Tabelle mit dem durch dbTable
festgelegten Namen wird beim Löschen der Spark-Tabelle nicht gelöscht.
Warum muss ich beim Schreiben eines Datenrahmens in Azure Synapse .option("dbTable", tableName).save()
und nicht einfach .saveAsTable(tableName)
angeben?
Das liegt daran, dass wir den folgenden Unterschied deutlich machen möchten: .option("dbTable", tableName)
bezieht sich auf die Datenbanktabelle (d. h. auf die Azure Synapse-Tabelle), während sich .saveAsTable(tableName)
auf die Spark-Tabelle bezieht. Genau genommen können Sie die beiden sogar kombinieren: Mit df.write. ... .option("dbTable", tableNameDW).saveAsTable(tableNameSpark)
wird in Azure Synapse eine Tabelle mit dem Namen tableNameDW
und in Spark eine externe Tabelle mit dem Namen tableNameSpark
erstellt, die von der Azure Synapse-Tabelle unterstützt wird.
Warnung
Beachten Sie den folgenden Unterschied zwischen und .save()
.saveAsTable()
:
- Bei
df.write. ... .option("dbTable", tableNameDW).mode(writeMode).save()
wirdwriteMode
erwartungsgemäß auf die Azure Synapse-Tabelle angewendet. - Bei
df.write. ... .option("dbTable", tableNameDW).mode(writeMode).saveAsTable(tableNameSpark)
wirdwriteMode
auf die Spark-Tabelle angewendet, währendtableNameDW
automatisch überschrieben wird, falls bereits in Azure Synapse vorhanden.
Dieses Verhalten entspricht dem Schreiben in eine andere Datenquelle. Dies ist nur eine Einschränkung der Spark DataFrameWriter-API.
Feedback
https://aka.ms/ContentUserFeedback.
Bald verfügbar: Im Laufe des Jahres 2024 werden wir GitHub-Issues stufenweise als Feedbackmechanismus für Inhalte abbauen und durch ein neues Feedbacksystem ersetzen. Weitere Informationen finden Sie unterFeedback senden und anzeigen für