Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
È possibile archiviare il codice Python nelle cartelle Git di Databricks o nei file dell'area di lavoro e quindi importare il codice Python nelle pipeline dichiarative di Lakeflow. Per altre informazioni sull'uso dei moduli nelle cartelle Git o nei file dell'area di lavoro, vedere Usare moduli Python e R.
Nota
Non è possibile importare codice sorgente da un notebook archiviato in una cartella Git di Databricks o in un file dell'area di lavoro. Invece, aggiungi direttamente il notebook quando crei o modifichi una pipeline. Vedere Configurare le pipeline dichiarative di Lakeflow.
Importare un modulo Python in un notebook di Pipeline dichiarative di Lakeflow
L'esempio seguente illustra l'importazione di query del set di dati come moduli Python dai file dell'area di lavoro. Anche se questo esempio descrive l'uso dei file dell'area di lavoro per archiviare il codice sorgente della pipeline, è possibile usarlo con il codice sorgente archiviato in una cartella Git.
Per eseguire questo esempio, seguire questa procedura:
Fare clic
Area di lavoro nella barra laterale dell'area di lavoro di Azure Databricks per aprire il browser dell'area di lavoro.
Usare il browser dell'area di lavoro per selezionare una directory per i moduli Python.
Fare clic
Nella colonna più a destra della directory selezionata fare clic su Crea > file.
Immettere un nome per il file, ad esempio
clickstream_raw_module.py
. Verrà aperto l'editor di file. Per creare un modulo per leggere i dati di origine in una tabella, immettere quanto segue nella finestra dell'editor:from dlt import * json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json" def create_clickstream_raw_table(spark): @table def clickstream_raw(): return ( spark.read.json(json_path) )
Per creare un modulo che crea una nuova tabella contenente dati preparati, creare un nuovo file nella stessa directory, immettere un nome per il file, ad esempio
clickstream_prepared_module.py
e immettere quanto segue nella nuova finestra dell'editor:from clickstream_raw_module import * from dlt import read from pyspark.sql.functions import * from pyspark.sql.types import * def create_clickstream_prepared_table(spark): create_clickstream_raw_table(spark) @table @expect("valid_current_page_title", "current_page_title IS NOT NULL") @expect_or_fail("valid_count", "click_count > 0") def clickstream_prepared(): return ( read("clickstream_raw") .withColumn("click_count", expr("CAST(n AS INT)")) .withColumnRenamed("curr_title", "current_page_title") .withColumnRenamed("prev_title", "previous_page_title") .select("current_page_title", "click_count", "previous_page_title") )
Prossimamente, crea un notebook di pipeline. Passare alla pagina di destinazione di Azure Databricks e selezionare Crea un notebookoppure fare clic su
Nuovo nella barra laterale e selezionare Notebook. È anche possibile creare il notebook nel browser dell'area di lavoro facendo clic
Fare clic su Crea > notebook.
Assegna un nome al notebook e conferma che Python è il linguaggio predefinito.
Fare clic su Crea.
Immettere il codice di esempio seguente nel notebook.
Nota
Se il notebook importa moduli o pacchetti da un percorso di file dell'area di lavoro o da un percorso di cartelle Git diverso dalla directory del notebook, è necessario aggiungere manualmente il percorso ai file usando
sys.path.append()
.Se si importa un file da una cartella Git, è necessario anteporre
/Workspace/
al percorso. Ad esempio,sys.path.append('/Workspace/...')
. L'omissione di/Workspace/
dal percorso genera un errore.Se i moduli o i pacchetti vengono archiviati nella stessa directory del notebook, non è necessario aggiungere manualmente il percorso. Non è inoltre necessario aggiungere manualmente il percorso durante l'importazione dalla directory radice di una cartella Git perché la directory radice viene aggiunta automaticamente al percorso.
import sys, os # You can omit the sys.path.append() statement when the imports are from the same directory as the notebook. sys.path.append(os.path.abspath('<module-path>')) import dlt from clickstream_prepared_module import * from pyspark.sql.functions import * from pyspark.sql.types import * create_clickstream_prepared_table(spark) @dlt.table( comment="A table containing the top pages linking to the Apache Spark page." ) def top_spark_referrers(): return ( spark.read.table("catalog_name.schema_name.clickstream_prepared") .filter(expr("current_page_title == 'Apache_Spark'")) .withColumnRenamed("previous_page_title", "referrer") .sort(desc("click_count")) .select("referrer", "click_count") .limit(10) )
Sostituire
<module-path>
con il percorso della directory contenente i moduli Python da importare.Creare una pipeline utilizzando il nuovo notebook.
Per eseguire la pipeline, nella pagina dettagli della pipeline, fare clic su Avvia.
È anche possibile importare il codice Python come pacchetto. Il frammento di codice seguente di un notebook Lakeflow Declarative Pipelines importa il pacchetto test_utils
dalla directory dlt_packages
all'interno della stessa directory del notebook. La directory dlt_packages
contiene i file test_utils.py
e __init__.py
e test_utils.py
definisce la funzione create_test_table()
:
import dlt
@dlt.table
def my_table():
return spark.read.table(...)
# ...
import dlt_packages.test_utils as test_utils
test_utils.create_test_table(spark)