Importar módulos Python de pastas Git ou arquivos de workspace
Você pode armazenar código Python em Pastas Git do Databricks ou em arquivos de workspace e importar esse código Python para os pipelines do Delta Live Tables. Para obter mais informações sobre como trabalhar com módulos em pastas Git e arquivos de workspace, consulte Trabalhar com módulos Python e R.
Observação
Você não pode importar o código-fonte de um notebook armazenado em uma pasta Git do Databricks ou em um arquivo de workspace. Em vez disso, adicione o notebook diretamente ao criar ou editar um pipeline. Consulte Configurar um pipeline do Delta Live Tables.
Importar um módulo Python para um pipeline do Delta Live Tables
O exemplo a seguir demonstra a importação de consultas de um conjunto de dados como módulos Python de arquivos de workspace. Embora este exemplo descreva o uso de arquivos de workspace para armazenar o código-fonte do pipeline, você pode usá-lo com o código-fonte armazenado em uma pasta Git.
Para executar esse exemplo, use as etapas a seguir:
Clique em Workspace na barra lateral do seu workspace do Azure Databricks para abrir o navegador do workspace.
Use o navegador do workspace para selecionar um diretório para os módulos Python.
Clique em na coluna mais à direita do diretório selecionado e clique em Criar > Arquivo.
Insira um nome para o arquivo, por exemplo,
clickstream_raw_module.py
. O editor de arquivos é aberto. Para criar um módulo para ler dados de origem em uma tabela, insira o seguinte na janela do editor:from dlt import * json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json" def create_clickstream_raw_table(spark): @table def clickstream_raw(): return ( spark.read.json(json_path) )
Para criar um módulo que cria uma nova tabela contendo dados preparados, crie um novo arquivo no mesmo diretório, insira um nome para o arquivo, por exemplo,
clickstream_prepared_module.py
, e insira o seguinte na nova janela do editor:from clickstream_raw_module import * from dlt import read from pyspark.sql.functions import * from pyspark.sql.types import * def create_clickstream_prepared_table(spark): create_clickstream_raw_table(spark) @table @expect("valid_current_page_title", "current_page_title IS NOT NULL") @expect_or_fail("valid_count", "click_count > 0") def clickstream_prepared(): return ( read("clickstream_raw") .withColumn("click_count", expr("CAST(n AS INT)")) .withColumnRenamed("curr_title", "current_page_title") .withColumnRenamed("prev_title", "previous_page_title") .select("current_page_title", "click_count", "previous_page_title") )
Em seguida, crie um notebook de pipeline. Acesse a página de aterrissagem do Azure Databricks e selecione Criar um Notebook ou clique no ícone N Novo na barra lateral e selecione Notebook. Você também pode criar o notebook no navegador do workspace clicando em e clicando em Criar > Notebook.
Nomeie seu notebook e confirme se Python é a linguagem padrão.
Clique em Criar.
Inserir o código de exemplo no notebook.
Observação
Se o notebook importar módulos ou pacotes de um caminho de arquivos de workspace ou um caminho de pastas Git diferente do diretório do notebook, você deverá acrescentar manualmente o caminho aos arquivos usando
sys.path.append()
.Se você estiver importando um arquivo de uma pasta Git, será necessário anexar
/Workspace/
ao caminho. Por exemplo,sys.path.append('/Workspace/...')
. Omitir/Workspace/
do caminho resulta em um erro.Se os módulos ou pacotes forem armazenados no mesmo diretório do notebook, você não precisará acrescentar o caminho manualmente. Você também não precisa acrescentar manualmente o caminho ao importar do diretório raiz de uma pasta Git porque o diretório raiz é acrescentado automaticamente ao caminho.
import sys, os # You can omit the sys.path.append() statement when the imports are from the same directory as the notebook. sys.path.append(os.path.abspath('<module-path>')) import dlt from clickstream_prepared_module import * from pyspark.sql.functions import * from pyspark.sql.types import * create_clickstream_prepared_table(spark) @dlt.table( comment="A table containing the top pages linking to the Apache Spark page." ) def top_spark_referrers(): return ( dlt.read("clickstream_prepared") .filter(expr("current_page_title == 'Apache_Spark'")) .withColumnRenamed("previous_page_title", "referrer") .sort(desc("click_count")) .select("referrer", "click_count") .limit(10) )
Substitua
<module-path>
pelo caminho para o diretório que contém os módulos do Python a serem importados.Criar um pipeline usando o notebook novo.
Para executar o pipeline, na página Detalhes do pipeline, clique em Iniciar.
Você também pode importar o código Python como um pacote. O snippet de código a seguir de um notebook do Delta Live Tables importa o pacote test_utils
do diretório dlt_packages
dentro do mesmo diretório do notebook. O diretório dlt_packages
contém os arquivos test_utils.py
e __init__.py
, e test_utils.py
define a função create_test_table()
:
import dlt
@dlt.table
def my_table():
return dlt.read(...)
# ...
import dlt_packages.test_utils as test_utils
test_utils.create_test_table(spark)