Strukturierte Streaming-Schreibvorgänge in Azure Synapse

Der Azure Synapse-Connector bietet effiziente und skalierbare Unterstützung für strukturierte Streaming-Schreibvorgänge für Azure Synapse, das eine konsistente Benutzeroberfläche mit Batchschreibvorgängen bereitstellt, und für die Übertragung umfangreicher Datenmengen zwischen einem Azure Databricks-Cluster und einer Azure Synapse-Instanz COPY verwendet.

Unterstützung für strukturiertes Streaming zwischen Azure Databricks und Synapse bietet eine einfache Semantik zum Konfigurieren inkrementeller ETL-Aufträge. Das zum Laden von Daten aus Azure Databricks in Synapse verwendete Modell führt zu Wartezeiten, die die SLA-Anforderungen für Workloads in Quasi-Echtzeit möglicherweise nicht erfüllen. Weitere Informationen finden Sie unter Abfragen von Daten in Azure Synapse Analytics.

Unterstützte Ausgabemodi für Streamingschreibvorgänge in Synapse

Der Azure Synapse-Connector unterstützt die Ausgabemodi Append und Complete für Datensatzanfügevorgänge und Aggregationen. Weitere Informationen zu Ausgabemodi und Kompatibilitätsmatrix finden Sie im Leitfaden für strukturiertes Streaming.

Fehlertoleranzsemantik von Synapse

Standardmäßig bietet Azure Synapse Streaming eine End-to-End-Garantie vom Typ Exactly Once für das Schreiben von Daten in eine Azure Synapse-Tabelle, indem der Fortschritt der Abfrage mit einer Kombination aus Prüfpunktspeicherort im DBFS, Prüfpunkttabelle in Azure Synapse und Sperrmechanismus zuverlässig nachverfolgt wird, um sicherzustellen, dass das Streaming alle Arten von Fehlern, Wiederholungen und Abfrageneustarts verarbeiten kann.

Optional können Sie eine weniger restriktive Semantik vom Typ „At-Least-Once“ für Azure Synapse Streaming auswählen, indem Sie die Option spark.databricks.sqldw.streaming.exactlyOnce.enabled auf false festlegen. In diesem Fall kann die Datenduplizierung bei zeitweiligen Verbindungsfehlern bei Azure Synapse oder einer unerwarteten Abfragebeendigung auftreten.

Syntax für strukturiertes Streaming zum Schreiben in Azure Synapse

Die folgenden Codebeispiele veranschaulichen Streamingschreibvorgänge in Synapse mithilfe von strukturiertem Streaming in Scala und Python:

Scala

// Set up the Blob storage account access key in the notebook session conf.
spark.conf.set(
  "fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
  "<your-storage-account-access-key>")

// Prepare streaming source; this could be Kafka or a simple rate stream.
val df: DataFrame = spark.readStream
  .format("rate")
  .option("rowsPerSecond", "100000")
  .option("numPartitions", "16")
  .load()

// Apply some transformations to the data then use
// Structured Streaming API to continuously write the data to a table in Azure Synapse.

df.writeStream
  .format("com.databricks.spark.sqldw")
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
  .option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>")
  .option("forwardSparkAzureStorageCredentials", "true")
  .option("dbTable", "<your-table-name>")
  .option("checkpointLocation", "/tmp_checkpoint_location")
  .start()

Python

# Set up the Blob storage account access key in the notebook session conf.
spark.conf.set(
  "fs.azure.account.key.<your-storage-account-name>.dfs.core.windows.net",
  "<your-storage-account-access-key>")

# Prepare streaming source; this could be Kafka or a simple rate stream.
df = spark.readStream \
  .format("rate") \
  .option("rowsPerSecond", "100000") \
  .option("numPartitions", "16") \
  .load()

# Apply some transformations to the data then use
# Structured Streaming API to continuously write the data to a table in Azure Synapse.

df.writeStream \
  .format("com.databricks.spark.sqldw") \
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
  .option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.windows.net/<your-directory-name>") \
  .option("forwardSparkAzureStorageCredentials", "true") \
  .option("dbTable", "<your-table-name>") \
  .option("checkpointLocation", "/tmp_checkpoint_location") \
  .start()

Eine vollständige Liste der Konfigurationen finden Sie unter Abfragen von Daten in Azure Synapse Analytics.

Verwaltung von Synapse-Streamingprüfpunkttabellen

Die beim Starten einer neuen Streamingabfrage erstellte Streamingprüfpunkttabelle wird vom Azure Synapse-Connector nicht gelöscht. Dieses Verhalten ist konsistent mit dem checkpointLocation, der normalerweise für den Objektspeicher angegeben wird. Databricks empfiehlt, Prüfpunkttabellen für Abfragen, die in Zukunft nicht ausgeführt werden, regelmäßig zu löschen.

Standardmäßig haben alle Prüfpunkttabellen den Namen <prefix>_<query-id>, wobei <prefix> ein konfigurierbares Präfix mit dem Standardwert databricks_streaming_checkpoint und query_id eine Streamingabfrage-ID ist, bei der die Zeichen _ entfernt wurden.

Führen Sie die folgende Abfrage aus, um alle Prüfpunkttabellen für veraltete oder gelöschte Streamingabfragen zu suchen:

SELECT * FROM sys.tables WHERE name LIKE 'databricks_streaming_checkpoint%'

Sie können das Präfix mit der Spark SQL-Konfigurationsoption spark.databricks.sqldw.streaming.exactlyOnce.checkpointTableNamePrefix konfigurieren.

Referenz zu Streamingoptionen für Azure Databricks Synapse-Connector

Die in Spark SQL bereitgestellten OPTIONS unterstützen zusätzlich zu den Batchoptionen die folgenden Optionen für Streaming:

Parameter Erforderlich Standard Notizen
checkpointLocation Ja Kein Standardwert Speicherort in DBFS, der vom strukturierten Streaming zum Schreiben von Metadaten und Prüfpunktinformationen verwendet wird. Weitere Informationen finden Sie im Abschnitt über das Wiederherstellen nach Fehlern mit Prüfpunkten im Programmierleitfaden für strukturiertes Streaming.
numStreamingTempDirsToKeep Nein 0 Gibt an, wie viele (aktuelle) temporäre Verzeichnisse für die regelmäßige Bereinigung von Mikrobatches beim Streaming beibehalten werden sollen. Wenn diese Einstellung auf 0 festgelegt ist, wird das Löschen des Verzeichnisses sofort ausgelöst, nachdem der Mikrobatch committet wurde. Andernfalls werden die angegebenen aktuellen Mikrobatches beibehalten und die restlichen Verzeichnisse entfernt. Verwenden Sie -1, um die regelmäßige Bereinigung zu deaktivieren.

Hinweis

checkpointLocation und numStreamingTempDirsToKeep sind nur beim Streamen von Schreibvorgängen aus Azure Databricks in eine neue Tabelle in Azure Synapse relevant.