Kompatibilität der Umgebungsversion

Important

Umgebungsversionen für SDP befinden sich in der Betaversion.

Pipelines mit einer environment-Version legen fest, Python Code über Spark Connect auszuführen. Diese Seite behandelt, was inkompatibel ist, was sich anders verhält, wie eine Pipeline nach betroffenen Mustern durchsucht wird und wie eine vorhandene Pipeline migriert wird.

Einschränkungen

Umgebungsversionen sind noch nicht mit allen Pipelinefunktionen kompatibel. Eine Pipelineausführung mit einem Umgebungsversionssatz schlägt fehl, wenn der Python Code der Pipeline einen der folgenden Aktionen ausführt:

  • Mutiert den Spark-Sitzungszustand innerhalb einer Funktion, die mit einem Pipeline-Dekorateur versehen ist. Beispiele sind : spark.conf.set(...), spark.sql("USE CATALOG ...")und createOrReplaceTempView.
  • Verwendet PySpark-APIs, die in Spark Connect nicht verfügbar sind, einschließlich SparkContext, RDD, SQLContextund alle Py4J-APIs. Erfahren Sie , was in Spark Connect unterstützt wird.

Wenn die Aktivierung einer Umgebungsversion für eine Pipeline zu einem Fehler führt, gibt das Deaktivieren der Umgebungsversion die Pipeline in den vorherigen Zustand zurück.

Verhaltensänderungen

Spark Connect weist eine kleine Anzahl von Verhaltensunterschieden von der klassischen PySpark-Laufzeit auf. Die vollständige Referenz finden Sie unter Spark Connect im Vergleich zum klassischen Spark . Die Kompatibilitätsüberprüfung erkennt diese Muster vorab und blockiert die Aktivierung, bis sie behoben werden, sodass Sie sie finden und beheben können, bevor sie sich auf Produktionsdaten auswirken.

In einer Pipeline sind die häufigsten Situationen, in denen sich das Verhalten unterscheiden kann:

Interleaved DataFrame-Konstruktion und Sitzungsmutation

Wenn eine Pipeline einen DataFrame erstellt, ändert sich der Spark-Sitzungszustand (z. B. ändert den Standardkatalog oder das Standardschema, legt eine Konfiguration fest, ersetzt eine temporäre Ansicht oder registriert eine UDF erneut), und verwendet dann den DataFrame:

  • Ohne Eine Umgebungsversion verwendet DataFrame den Sitzungszustand vor der Mutation .
  • Mit einer Umgebungsversion verwendet DataFrame den Sitzungszustand nach der Mutation .

Beispiel:

from pyspark import pipelines as dp

spark.createDataFrame([(1, "Original Row")], ["id", "data"]) \
  .createOrReplaceTempView("my_view")

df = spark.sql("SELECT * FROM my_view")

spark.createDataFrame([(2, "Replaced Row")], ["id", "data"]) \
  .createOrReplaceTempView("my_view")

@dp.materialized_view
def mytable():
  return df

Enthält ohne eine Umgebungsversion mytable[(1, "Original Row")]. Enthält eine Umgebungsversion mytable[(2, "Replaced Row")].

UDFs, die auf änderbare Python Zustand verweisen

Wenn eine UDF auf eine Python globale Variable verweist, deren Wert sich ändert, nachdem die UDF definiert wurde:

  • Ohne Eine Umgebungsversion verwendet die UDF den neuesten Wert der Variablen.
  • Bei einer Umgebungsversion verwendet die UDF den Wert zum Zeitpunkt der Definition der UDF.

Beispiel:

from pyspark import pipelines as dp
from pyspark.sql.functions import col, udf

suffix = "a"

@udf
def my_udf(s):
  return s + suffix

suffix = "b"

@dp.materialized_view
def my_mv():
  return spark.createDataFrame([("alex",)], ["name"]).select(my_udf(col("name")))

Enthält ohne eine Umgebungsversion my_mv[("alex_b",)]. Enthält eine Umgebungsversion my_mv[("alex_a",)].

Wenn eine Pipeline von beiden Mustern abhängig ist, überwachen Sie sie, bevor Sie eine Umgebungsversion aktivieren.

Kompatibilitätsscan

Die Kompatibilitätsüberprüfung hilft Ihnen beim Auffinden von Codemustern in Ihrer Pipeline, die unterschiedliche Ergebnisse unter einer Umgebungsversion erzeugen würden, bevor Sie sie aktivieren. Der Scan ist opt-in. Wenn der Scan in einer Pipeline aktiviert ist:

  • Jede Pipelineausführung gibt ein BehaviorChangeInSparkConnectWARN Ereignis im Pipelineereignisprotokoll pro erkannten Muster aus.
  • Sie können eine Umgebungsversion in der Pipeline erst aktivieren, wenn Sie alle Kompatibilitätswarnungen aus dem vorherigen erfolgreichen Update beheben.

Wenn der Scan nicht aktiviert ist, werden keine Ereignisse ausgegeben und environment_version die Aktivierung wird nicht blockiert. Databricks empfiehlt, die Überprüfung und Auflösung von erkannten Mustern zu aktivieren, bevor eine Umgebungsversion in der Pipeline aktiviert wird.

Aktivieren des Scans in einer Pipeline

Sie können die Kompatibilitätsüberprüfung aktivieren, indem Sie die pipelines.environmentVersion.enableCompatibilityScan Pipelinekonfiguration hinzufügen. Sie können die Konfiguration über die Pipeline-Editor-Benutzeroberfläche hinzufügen oder einen Eintrag zu der Pipelinekonfigurations-JSON hinzufügen.

Über die Benutzeroberfläche:

  1. Klicken Sie im Pipeline-Editor auf "Einstellungen".
  2. Suchen Sie den Abschnitt "Konfiguration" in den Pipelineeinstellungen.
  3. Klicken Sie auf das Plussymbol.Konfiguration hinzufügen.
  4. Geben Sie pipelines.environmentVersion.enableCompatibilityScan als Schlüssel und true als Wert ein.
  5. Speichern Sie die Pipelineeinstellungen.

In der Pipeline JSON:

Fügen Sie dem Block den folgenden Eintrag hinzu configuration :

"configuration": {
  "pipelines.environmentVersion.enableCompatibilityScan": "true"
}
  1. Aktivieren Sie den Scan in der Pipeline.
  2. Auslösen einer Pipelineausführung
  3. Abfragen des Pipelineereignisprotokolls für BehaviorChangeInSparkConnectWARN Ereignisse. Eine vollständige Liste der Problemcodes, Beispielmuster und vorgeschlagene Korrekturen finden Sie in der Referenz zu Kompatibilitätsereignissen .
  4. Aktualisieren Sie den Pipelinecode, um die erkannten Muster zu entfernen und die Pipeline erneut auszuführen, bis keine weiteren Ereignisse ausgegeben werden.
  5. Fügen Sie environment_version der Pipeline eine der Methoden in "Aktivieren einer Umgebungsversion in einer Pipeline" hinzu.

Wenn Sie glauben, dass eine Kompatibilitätswarnung falsch positiv ist und trotzdem aktiviert environment_version werden soll, entfernen Sie den pipelines.environmentVersion.enableCompatibilityScan Eintrag aus der Pipelinekonfiguration, um die Überprüfung zu umgehen. (Das Festlegen des Werts auf false "Nicht zulässig" – Sie müssen den Eintrag vollständig entfernen.)

Die Preflight-Überprüfung wird nicht für Pipelines ausgeführt, die kein vorheriges Update aufweisen, oder für Pipelines, die bereits eine Umgebungsversion festgelegt haben.

Migrieren einer vorhandenen Pipeline zu Umgebungsversionen

Um eine vorhandene Pipeline zu migrieren, die noch keine Umgebungsversion verwendet, folgen Sie diesem End-to-End-Workflow. Es führt Sie durch die Suche nach Codemustern, die sich unter Spark Connect möglicherweise anders verhalten, sie reparieren und die Umgebungsversion sicher bereitstellen.

  1. Aktivieren Sie den Kompatibilitätsscan für die Pipeline. Aktivieren Sie den Scan in der Pipeline, wie in der Kompatibilitätsüberprüfung beschrieben. Dies bewirkt, dass erkannte Muster im Ereignisprotokoll angezeigt werden und was die Preflight-Überprüfung ermöglicht, die den Aktivierungsversuch schützt.

  2. Auslösen einer Pipelineausführung und Überprüfen von Kompatibilitätsereignissen. Auslösen eines normalen Pipelineupdates. Nachdem es erfolgreich abgeschlossen wurde, fragen Sie das Pipelineereignisprotokoll nach BehaviorChangeInSparkConnectWARN Ereignissen ab. Jedes Ereignis meldet ein erkanntes Muster. Eine vollständige Liste der Problemcodes, Beispielmuster und vorgeschlagene Korrekturen finden Sie in der Referenz zu Kompatibilitätsereignissen .

  3. Aktualisieren Sie Den Pipelinecode, um erkannte Muster zu beheben. Aktualisieren Sie für jedes erkannte Muster den Pipelinecode nach dem vorgeschlagenen Fix. Lösen Sie nach jeder Änderung ein weiteres Pipelineupdate aus, und überprüfen Sie, ob die entsprechenden Ereignisse nicht mehr angezeigt werden. Wiederholen Sie diesen Vorgang, bis das Ereignisprotokoll keine Kompatibilitätsereignisse mehr für ein erfolgreiches Update enthält.

  4. Aktivieren Sie die Umgebungsversion in der Pipeline. Nachdem das neueste erfolgreiche Update keine Kompatibilitätsereignisse aufweist, fügen Sie environment_version der Pipeline mithilfe der Benutzeroberfläche, API oder des Bundles hinzu, wie unter "Aktivieren einer Umgebungsversion für eine Pipeline" beschrieben. Das nächste Update wird mit Spark Connect und der angehefteten Python Sprachversion und vorinstallierten Bibliotheken ausgeführt.

    Wenn das Update fehlschlägt, da noch Kompatibilitätswarnungen vorhanden sind, legen Sie die environment_versionWarnungen ab, kehren Sie zu Schritt 2 zurück, und beheben Sie die verbleibenden Warnungen, bevor Sie es erneut versuchen.

  5. Überprüfen Sie die Migration. Überprüfen Sie nach Abschluss des ersten Updates mit der Umgebungsversion Folgendes:

    • Das create_update Ereignis im Ereignisprotokoll zeigt environment_version den erwarteten Wert an.
    • Die Pipeline erzeugt die erwarteten Daten, und es werden keine neuen Fehlerereignisse angezeigt.
    • Nachgeschaltete Spotüberprüfungstabellen für subtile Verhaltensunterschiede, die in Verhaltensänderungen beschrieben werden.

Rollback

Wenn die Pipeline nach der Migration falsch funktioniert, entfernen Sie die environment_version Aus den Pipelineeinstellungen. Das nächste Update wird mit der vorherigen Python Laufzeitkonfiguration ausgeführt. Verwenden Sie die Rollback-Ausführung zum Debuggen, und wiederholen Sie dann die Migration aus Schritt 2, nachdem Sie das Problem identifiziert und behoben haben.

Referenz zu Kompatibilitätsereignissen

Wenn die Kompatibilitätsüberprüfung für eine Pipeline aktiviert ist, gibt SDP ein BehaviorChangeInSparkConnectWARN Ereignis im Pipelineereignisprotokoll pro erkanntem Muster aus. Wenn der Scan aktiviert ist und das vorherige erfolgreiche Update Muster erkannt hat, blockiert environment_version SDP auch die Aktivierung, bis die Muster behoben werden.

Jedes Ereignis meldet einen einzelnen Problemcode, der angibt, was erkannt wurde. Um einen Code nachzuschlagen, suchen Sie ihn in der Tabelle "Problemcodes ", wobei jede Zeile mit dem Kategorieabschnitt verknüpft ist, der ein Beispielmuster und die vorgeschlagene Korrektur enthält.

Ereignis-Shape

BehaviorChangeInSparkConnect Ereignisse folgen dem Standardmäßigen Pipelineereignisprotokollschema:

  • event_type ist behavior_change_in_spark_connect.
  • level ist WARN.
  • details enthält das behavior_change_in_spark_connect Objekt, das über ein einzelnes issue Feld verfügt. Der Ausgabewert ist einer der unten aufgeführten Codes.
  • message ist eine lesbare Beschreibung des erkannten Musters.

Problemcodes

Kategorie Problemcode Description
Datenbank- und Katalogmutationen USE_CATALOG_OUTSIDE_QUERY_FUNCTION_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR Der Standardkatalog wurde geändert, nachdem ein DataFrame erstellt wurde. Der vorhandene DataFrame kann Tabellen mithilfe des neuen Standardkatalogs auflösen.
Datenbank- und Katalogmutationen USE_CATALOG_OUTSIDE_QUERY_FUNCTION_COULD_CHANGE_BEHAVIOR USE CATALOG wurde außerhalb einer Funktion aufgerufen, die von einem Rohrleitungsdekoror verziert wurde. Der Standardkatalog kann sich bei nachfolgenden Vorgängen unerwartet ändern.
Datenbank- und Katalogmutationen USE_DATABASE_OUTSIDE_QUERY_FUNCTION_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR Die Standarddatenbank wurde geändert, nachdem ein DataFrame erstellt wurde. Der vorhandene DataFrame kann Tabellen mithilfe der neuen Standarddatenbank auflösen.
Datenbank- und Katalogmutationen USE_DATABASE_OUTSIDE_QUERY_FUNCTION_COULD_CHANGE_BEHAVIOR USE DATABASE wurde außerhalb einer Funktion aufgerufen, die von einem Rohrleitungsdekoror verziert wurde. Die Standarddatenbank kann sich bei nachfolgenden Vorgängen unerwartet ändern.
Eiferige Ausführung innerhalb von Flussfunktionen CHECKPOINT_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED Die Flussfunktion ruft einen Prüfpunktbefehl auf.
Eiferige Ausführung innerhalb von Flussfunktionen CREATE_DATAFRAME_VIEW_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED Die Flussfunktion erstellt eine DataFrame-Ansicht (createOrReplaceTempView oder ähnlich).
Eiferige Ausführung innerhalb von Flussfunktionen CREATE_RESOURCE_PROFILE_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED Die Flussfunktion erstellt ein Ressourcenprofil.
Eiferige Ausführung innerhalb von Flussfunktionen GET_RESOURCES_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED Die Ablauffunktion ruft oder spark.resources eine zugehörige Ressourcen-API auf.
Eiferige Ausführung innerhalb von Flussfunktionen MERGE_INTO_TABLE_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED Die Flussfunktion führt eine zieltabelle aus MERGE INTO .
Eiferige Ausführung innerhalb von Flussfunktionen ML_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED Die Flussfunktion führt einen eifrigen Spark ML-Vorgang aus.
Eiferige Ausführung innerhalb von Flussfunktionen REGISTER_DATA_SOURCE_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED Die Flussfunktion registriert eine Python Datenquelle.
Eiferige Ausführung innerhalb von Flussfunktionen STREAMING_QUERY_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED Die Flussfunktion arbeitet mit einem aktiven Streamingabfragehandle.
Eiferige Ausführung innerhalb von Flussfunktionen STREAMING_QUERY_LISTENER_BUS_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED Die Ablauffunktion registriert oder entfernt einen Streamingabfragelistener.
Eiferige Ausführung innerhalb von Flussfunktionen STREAMING_QUERY_MANAGER_COMMAND_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED Die Ablauffunktion ruft zum Verwalten von Streamingabfragen auf spark.streams .
Eiferige Ausführung innerhalb von Flussfunktionen WRITE_OPERATION_V2_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED Die Flussfunktion führt einen eifrigen DataFrameWriterV2 Vorgang aus.
Eiferige Ausführung innerhalb von Flussfunktionen WRITE_OPERATION_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED Die Flussfunktion führt einen eifrigen DataFrame.write Vorgang aus.
Eiferige Ausführung innerhalb von Flussfunktionen WRITE_STREAM_OPERATION_START_WITHIN_QUERY_FUNCTION_NOT_SUPPORTED Die Flussfunktion startet eine Streamingabfrage (writeStream.start()).
Spark-Konfigurationsmutationen CHANGE_CONF_INSIDE_QUERY_FUNCTION_NOT_SUPPORTED spark.conf.set() oder spark.conf.unset() wurde in einer Funktion aufgerufen, die von einem Pipelines-Dekorateur verziert wurde. Dies wird mit einer Umgebungsversion nicht unterstützt.
Spark-Konfigurationsmutationen SET_CONF_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR spark.conf.set() wurde außerhalb einer Funktion aufgerufen, die von einem Pipelines-Dekorateur nach der Erstellung eines DataFrames eingerichtet wurde. Die Konfigurationsänderung kann sich auf den vorhandenen DataFrame zur Ausführungszeit auswirken.
Spark-Konfigurationsmutationen UNSET_CONF_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR spark.conf.unset() wurde außerhalb einer Funktion aufgerufen, die von einem Pipelines-Dekorateur nach der Erstellung eines DataFrames eingerichtet wurde. Die Konfigurationsänderung kann sich auf den vorhandenen DataFrame zur Ausführungszeit auswirken.
Temporäre Ansichtsersetzungen REPLACE_GLOBAL_TEMP_VIEW_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR Eine globale temporäre Ansicht wurde ersetzt, nachdem ein DataFrame darauf verweist. Der Ersatz kann im vorhandenen DataFrame widerspiegelt werden.
Temporäre Ansichtsersetzungen REPLACE_TEMP_VIEW_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR Eine temporäre Ansicht wurde ersetzt, nachdem ein DataFrame darauf verweist. Der Ersatz kann im vorhandenen DataFrame widerspiegelt werden.
UDF- und UDTF-Mutationen OVERWRITE_SESSION_UDF_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR Eine UDF wurde mit demselben Namen erneut registriert, nachdem ein DataFrame darauf verweist, dass es erstellt wurde. Der vorhandene DataFrame kann die neue UDF-Definition verwenden.
UDF- und UDTF-Mutationen OVERWRITE_SESSION_UDTF_AFTER_DATAFRAME_COULD_CHANGE_BEHAVIOR Ein UDTF wurde mit demselben Namen erneut registriert, nachdem ein DataFrame darauf verweist, dass es erstellt wurde. Der vorhandene DataFrame kann die neue UDTF-Definition verwenden.
UDF- und UDTF-Mutationen UDF_REFERENCES_GLOBAL_VARIABLE_COULD_CHANGE_BEHAVIOR Eine UDF verweist auf eine globale veränderbare Python Variable. Bei einer Umgebungsversion verwendet die UDF den Wert der Variablen zum Zeitpunkt der Definition der UDF, nicht zum Aufrufzeitpunkt.
UDF- und UDTF-Mutationen UDTF_REFERENCES_GLOBAL_VARIABLE_COULD_CHANGE_BEHAVIOR Ein UDTF verweist auf eine globale veränderbare Python Variable. Bei einer Umgebungsversion verwendet udTF den Wert der Variablen zum Zeitpunkt der DEFINITION des UDTF, nicht zum Aufrufzeitpunkt.

Datenbank- und Katalogmutationen

Diese Probleme werden ausgegeben, wenn Pipelinecode die Standarddatenbank oder den Standardkatalog stummschaltet. Mit einer Umgebungsversion erstellt DataFrames, bevor die Mutation Tabellen mithilfe der neuen Datenbank oder des neuen Katalogs auflösen kann.

Beispielmuster, das ein Ereignis auslöst:

from pyspark import pipelines as dp

spark.sql("USE CATALOG marketing")
df = spark.read.table("events")

spark.sql("USE CATALOG sales")  # changes the default catalog after df was created

@dp.materialized_view
def events_summary():
  return df.groupBy("region").count()

Ohne Eine Umgebungsversion dfevents wird aus dem marketing Katalog aufgelöst. Bei einer Umgebungsversion df wird events der sales Katalog aufgelöst.

Vorgeschlagene Lösung: Vollqualifizierte Tabellennamen, sodass die Auflösung nicht vom Standardkatalog oder der Standarddatenbank abhängt, und vermeiden Sie das Ändern des Standardkatalogs oder der Datenbank zwischen der Erstellung und Verwendung von DataFrame.

from pyspark import pipelines as dp

df = spark.read.table("marketing.default.events")

@dp.materialized_view
def events_summary():
  return df.groupBy("region").count()

Spark-Konfigurationsmutationen

Diese Probleme werden ausgegeben, wenn der Pipelinecode spark-Konfiguration so stummschaltet, dass das DataFrame-Verhalten unter einer Umgebungsversion geändert werden kann.

Beispielmuster, das ein Ereignis auslöst:

from pyspark import pipelines as dp

df = spark.read.table("events")

spark.conf.set("spark.sql.ansi.enabled", "true")  # changes session conf after df was created

@dp.materialized_view
def events_strict():
  return df.selectExpr("CAST(price AS INT) AS price")

Ohne Eine Umgebungsversion verwendet die Umwandlung den Konf-Wert zur Erstellungszeit von DataFrame. Bei einer Umgebungsversion wird die Umwandlung verwendet spark.sql.ansi.enabled=true und kann bei ungültiger Eingabe fehlschlagen.

Vorgeschlagene Lösung: Legen Sie alle erforderlichen Spark-Konfigurationen oben in der Pipelinedatei fest, bevor DataFrame erstellt wird. Verwenden Sie für die Konfiguration pro Abfrage die Einstellung der Pipeline configuration in der Pipelinespezifikation.

Temporäre Ansichtsersetzungen

Diese Probleme werden ausgegeben, wenn Pipelinecode eine temporäre Ansicht ersetzt, nachdem ein DataFrame darauf verweist, dass er erstellt wurde. Mit einer Umgebungsversion kann der vorhandene DataFrame den neuen Ansichtsinhalt widerspiegeln.

Beispielmuster, das ein Ereignis auslöst:

from pyspark import pipelines as dp

spark.createDataFrame([(1, "Original Row")], ["id", "data"]) \
  .createOrReplaceTempView("my_view")

df = spark.sql("SELECT * FROM my_view")

spark.createDataFrame([(2, "Replaced Row")], ["id", "data"]) \
  .createOrReplaceTempView("my_view")

@dp.materialized_view
def mytable():
  return df

Enthält ohne eine Umgebungsversion mytable[(1, "Original Row")]. Enthält eine Umgebungsversion mytable[(2, "Replaced Row")].

Vorgeschlagene Lösung: Erstellen Sie jede temporäre Ansicht einzeln, und ersetzen Sie sie nicht. Wenn Sie mehrere Ansichten mit verwandten Daten benötigen, geben Sie jedem einen eindeutigen Namen.

UDF- und UDTF-Mutationen

Diese Probleme werden ausgegeben, wenn Pipelinecode eine UDF- oder UDTF-Datei so stummschaltet, dass sich das Verhalten unter einer Umgebungsversion ändert.

Beispielmuster, das ein Ereignis auslöst:

from pyspark import pipelines as dp
from pyspark.sql.functions import col, udf

suffix = "a"

@udf
def my_udf(s):
  return s + suffix

suffix = "b"

@dp.materialized_view
def my_mv():
  return spark.createDataFrame([("alex",)], ["name"]).select(my_udf(col("name")))

Enthält ohne eine Umgebungsversion my_mv[("alex_b",)]. Enthält eine Umgebungsversion my_mv[("alex_a",)].

Suggested fix: Übergeben Sie Werte als Argumente an die UDF, anstatt sie aus Python Globalen zu erfassen, oder legen Sie die Globale fest, bevor Sie die UDF definieren und sie danach nicht stummschalten.

from pyspark import pipelines as dp
from pyspark.sql.functions import col, lit, udf

@udf
def append_suffix(s, suffix):
  return s + suffix

@dp.materialized_view
def my_mv():
  return spark.createDataFrame([("alex",)], ["name"]).select(append_suffix(col("name"), lit("b")))

Eiferige Ausführung innerhalb von Flussfunktionen

Diese Probleme werden ausgegeben, wenn pipelinecode einen eifrigen Spark-Befehl in einer Funktion ausführt, die von einem Pipelinedekoror (@table, @materialized_viewusw.) eingerichtet wurde. Flussfunktionen werden erwartet, dass ein DataFrame definiert und zurückgegeben wird; Eifrige Befehle, die Daten schreiben, Streamingabfragen verwalten, Ressourcen registrieren oder ML-Vorgänge ausführen, sind in einer Flussfunktion mit einem Umgebungsversionssatz nicht zulässig.

Vorgeschlagene Lösung: Verschieben Sie den eifrigen Vorgang außerhalb der Flussfunktion, und geben Sie stattdessen einen DataFrame aus der Flussfunktion zurück. Nebeneffekte wie das Schreiben in eine Tabelle oder das Starten einer Streamingabfrage gehören außerhalb der Pipelinedefinition; das Pipelinemodul verarbeitet die Materialisierung des DataFrames, der von der Flussfunktion zurückgegeben wird.

Suchen nach Kompatibilitätsereignissen im Ereignisprotokoll

Die folgende Abfrage gibt alle Kompatibilitätsereignisse für eine Pipeline zurück, sortiert zuerst:

SELECT
  timestamp,
  message,
  details:behavior_change_in_spark_connect:issue AS issue
FROM event_log(<pipeline-id>)
WHERE event_type = 'behavior_change_in_spark_connect'
  AND level = 'WARN'
ORDER BY timestamp DESC;

So zählen Sie Ereignisse nach Problemcode in den letzten Updates:

SELECT
  details:behavior_change_in_spark_connect:issue AS issue,
  COUNT(*) AS occurrences
FROM event_log(<pipeline-id>)
WHERE event_type = 'behavior_change_in_spark_connect'
  AND level = 'WARN'
GROUP BY 1
ORDER BY occurrences DESC;

Informationen zum Abfragen des Ereignisprotokolls finden Sie unter Abfragen des Ereignisprotokolls.

Siehe auch