Nota
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
SE APLICA A: Azure Data Factory
Azure Synapse Analytics
Sugerencia
Pruebe Data Factory en Microsoft Fabric, una solución de análisis todo en uno para empresas. Microsoft Fabric abarca todo, desde el movimiento de datos hasta la ciencia de datos, el análisis en tiempo real, la inteligencia empresarial y los informes. ¡Obtenga más información sobre cómo iniciar una nueva evaluación gratuita!
Nota:
Esta característica está en versión preliminar pública. El Administrador de orquestación de flujo de trabajo cuenta con la tecnología de Apache Airflow.
Un paquete de Python es una manera de organizar los módulos de Python relacionados en una sola jerarquía de directorios. Normalmente, un paquete se representa como un directorio que contiene un archivo especial denominado __init__.py
. Dentro de un directorio de paquetes, se pueden tener varios archivos de módulos de Python (archivos.py) que definen funciones, clases y variables.
En el contexto de Administrador de orquestación de flujo de trabajo puede crear paquetes para agregar el código personalizado.
En esta guía se proporcionan instrucciones paso a paso sobre cómo instalar el archivo (Wheel) .whl
, que sirve como formato de distribución binaria para el paquete de Python en el Administrador de orquestaciones de flujos de trabajo.
Para fines ilustrativos, creo un operador personalizado simple como paquete de Python que se puede importar como un módulo dentro del archivo dags.
Paso 1: Desarrollar un operador personalizado y un archivo para probarlo.
- Cree un archivo
sample_operator.py
.
from airflow.models.baseoperator import BaseOperator
class SampleOperator(BaseOperator):
def __init__(self, name: str, **kwargs) -> None:
super().__init__(**kwargs)
self.name = name
def execute(self, context):
message = f"Hello {self.name}"
return message
Para crear un paquete de Python para este archivo, consulte la guía: Creación de un paquete en Python.
Cree un archivo dag
sample_dag.py
para probar el operador definido en el paso 1.
from datetime import datetime
from airflow import DAG
from airflow_operator.sample_operator import SampleOperator
with DAG(
"test-custom-package",
tags=["example"]
description="A simple tutorial DAG",
schedule_interval=None,
start_date=datetime(2021, 1, 1),
) as dag:
task = SampleOperator(task_id="sample-task", name="foo_bar")
task
Paso 2: Crear un contenedor de almacenamiento
Siga los pasos descritos en Administración de contenedores de blobs mediante Azure Portal para crear una cuenta de almacenamiento para cargar el archivo DAG y el archivo de paquete.
Paso 3: Cargar el paquete privado en la cuenta de almacenamiento
- Vaya al contenedor designado donde va a almacenar los archivos DAG y de complementos de Airflow.
- Cargue el archivo de paquete privado en el contenedor. Los formatos de archivo más comunes son
zip
,.whl
ytar.gz
. Coloque el archivo en la carpeta "Dags" o "Plugins", según corresponda.
Paso 4: Agregar el paquete privado como requisito
Agregue el paquete privado como requisito en el archivo requirements.txt. Agregue este archivo si aún no existe. Para la sincronización de Git, debe agregar todos los requisitos en la propia interfaz de usuario.
Blob Storage: Asegúrese de anteponer el prefijo "/opt/airflow/" a la ruta de acceso del paquete. Por ejemplo, si el paquete privado reside en "/dags/test/private.whl", el archivo requirements.txt debe tener el requisito "/opt/airflow/dags/test/private.whl".
Sincronización de Git: para todos los servicios de Git, anteponga "/opt/airflow/git/
<repoName>
.git/" a la ruta de acceso del paquete. Por ejemplo, si el paquete privado está en "/dags/test/private.whl" en un repositorio de GitHub, debe agregar el requisito "/opt/airflow/git/<repoName>
.git/dags/test/private.whl" al entorno de Airflow.ADO: para ADO, anteponga "/opt/airflow/git/
<repoName>
/" a la ruta de acceso del paquete.
Paso 5: Importar la carpeta a un entorno de tiempo de ejecución integrado (IR) de Airflow
Al realizar la importación de la carpeta en un entorno IR de Airflow, asegúrese de marcar la casilla requisitos de importación para cargar los requisitos en el entorno de Airflow.