Inicio rápido: Creación de una factoría de datos y una canalización con Python
SE APLICA A: Azure Data Factory Azure Synapse Analytics
Sugerencia
Pruebe Data Factory en Microsoft Fabric, una solución de análisis todo en uno para empresas. Microsoft Fabric abarca todo, desde el movimiento de datos hasta la ciencia de datos, el análisis en tiempo real, la inteligencia empresarial y los informes. Obtenga más información sobre cómo iniciar una nueva evaluación gratuita.
En este inicio rápido, se crea una factoría de datos mediante Python. La canalización de esta factoría de datos copia datos de una carpeta a otra en Azure Blob Storage.
Azure Data Factory es un servicio de integración de datos basado en la nube que le permite crear flujos de trabajo basados en datos, con el fin de coordinar y automatizar el movimiento y la transformación de datos. Mediante Azure Data Factory se pueden crear y programar flujos de trabajo basados en datos, llamados canalizaciones.
Las canalizaciones pueden ingerir datos de distintos almacenes de datos. Las canalizaciones procesan o transforman los datos mediante servicios de proceso como Azure HDInsight Hadoop, Spark, Azure Data Lake Analytics y Azure Machine Learning. Las canalizaciones publican datos de salida en almacenes de datos como Azure Synapse Analytics para las aplicaciones de inteligencia empresarial.
Prerrequisitos
Una cuenta de Azure con una suscripción activa. cree una de forma gratuita.
Explorador de Azure Storage (opcional).
Una aplicación en el identificador de Entra de Microsoft. Cree la aplicación según los pasos descritos en este vínculo con la opción de autenticación 2 (secreto de la aplicación) y asígnela al rol Colaborador según las instrucciones del mismo artículo. Tome nota de los siguientes valores tal como se muestra en el artículo para usarlos en pasos posteriores: identificador de aplicación (cliente), valor del secreto de cliente e identificador de inquilino.
Crear y cargar un archivo de entrada
Inicie el Bloc de notas. Copie el texto siguiente y guárdelo como archivo input.txt en el disco.
John|Doe Jane|Doe
Use herramientas como Explorador de Azure Storage para crear el contenedor adfv2tutorial y la carpeta input en el contenedor. A continuación, cargue el archivo input.txt en la carpeta input.
Instalar el paquete de Python
Abra un terminal o símbolo del sistema con privilegios de administrador.
En primer lugar, instale el paquete de Python para los recursos de administración de Azure:
pip install azure-mgmt-resource
Para instalar el paquete de Python para Data Factory, ejecute el siguiente comando:
pip install azure-mgmt-datafactory
El SDK de Python para Data Factory admite Python 2.7 y 3.6, y posteriores.
Para instalar el paquete de Python para la autenticación de Azure Identity, ejecute el siguiente comando:
pip install azure-identity
Nota
El paquete "azure-identity" puede tener conflictos con "azure-cli" en algunas dependencias comunes. Si encuentra algún problema de autenticación, elimine el paquete "azure-cli" y sus dependencias, o bien use un equipo limpio sin una instalación del paquete "azure-cli" para que funcione. Para las nubes soberanas, debe usar las constantes específicas de la nube adecuadas. Consulte Conexión a todas las regiones con las bibliotecas de Azure para múltiples nubes de Python | Microsoft Docs para instrucciones de conexión con Python en nubes soberanas.
Creación de un cliente de factoría de datos
Cree un archivo denominado datafactory.py. Agregue las siguientes instrucciones para agregar referencias a espacios de nombres.
from azure.identity import ClientSecretCredential from azure.mgmt.resource import ResourceManagementClient from azure.mgmt.datafactory import DataFactoryManagementClient from azure.mgmt.datafactory.models import * from datetime import datetime, timedelta import time
Agregue las siguientes funciones que imprimen información.
def print_item(group): """Print an Azure object instance.""" print("\tName: {}".format(group.name)) print("\tId: {}".format(group.id)) if hasattr(group, 'location'): print("\tLocation: {}".format(group.location)) if hasattr(group, 'tags'): print("\tTags: {}".format(group.tags)) if hasattr(group, 'properties'): print_properties(group.properties) def print_properties(props): """Print a ResourceGroup properties instance.""" if props and hasattr(props, 'provisioning_state') and props.provisioning_state: print("\tProperties:") print("\t\tProvisioning State: {}".format(props.provisioning_state)) print("\n\n") def print_activity_run_details(activity_run): """Print activity run details.""" print("\n\tActivity run details\n") print("\tActivity run status: {}".format(activity_run.status)) if activity_run.status == 'Succeeded': print("\tNumber of bytes read: {}".format(activity_run.output['dataRead'])) print("\tNumber of bytes written: {}".format(activity_run.output['dataWritten'])) print("\tCopy duration: {}".format(activity_run.output['copyDuration'])) else: print("\tErrors: {}".format(activity_run.error['message']))
Agregue el código siguiente al método main que crea una instancia de la clase DataFactoryManagementClient. Este objeto se usa para crear la factoría de datos, el servicio vinculado, los conjuntos de datos y la canalización. También se usa para supervisar los detalles de ejecución de la canalización. Establezca la variable subscription_id en el identificador de su suscripción a Azure. Para una lista de las regiones de Azure en las que Data Factory está disponible actualmente, seleccione las regiones que le interesen en la página siguiente y expanda Análisis para poder encontrar Data Factory: Productos disponibles por región. Los almacenes de datos (Azure Storage, Azure SQL Database, etc.) y los procesos (HDInsight, etc.) que usa la factoría de datos pueden encontrarse en otras regiones.
def main(): # Azure subscription ID subscription_id = '<subscription ID>' # This program creates this resource group. If it's an existing resource group, comment out the code that creates the resource group rg_name = '<resource group>' # The data factory name. It must be globally unique. df_name = '<factory name>' # Specify your Active Directory client ID, client secret, and tenant ID credentials = ClientSecretCredential(client_id='<Application (client) ID>', client_secret='<client secret value>', tenant_id='<tenant ID>') # Specify following for Soverign Clouds, import right cloud constant and then use it to connect. # from msrestazure.azure_cloud import AZURE_PUBLIC_CLOUD as CLOUD # credentials = DefaultAzureCredential(authority=CLOUD.endpoints.active_directory, tenant_id=tenant_id) resource_client = ResourceManagementClient(credentials, subscription_id) adf_client = DataFactoryManagementClient(credentials, subscription_id) rg_params = {'location':'westus'} df_params = {'location':'westus'}
Crear una factoría de datos
Agregue el código siguiente al método main que crea una factoría de datos. Si ya existe el grupo de recursos, convierta en comentario la primera instrucción create_or_update
.
# create the resource group
# comment out if the resource group already exits
resource_client.resource_groups.create_or_update(rg_name, rg_params)
#Create a data factory
df_resource = Factory(location='westus')
df = adf_client.factories.create_or_update(rg_name, df_name, df_resource)
print_item(df)
while df.provisioning_state != 'Succeeded':
df = adf_client.factories.get(rg_name, df_name)
time.sleep(1)
Creación de un servicio vinculado
Agregue el código siguiente al método main que crea un servicio vinculado de Azure Storage.
Los servicios vinculados se crean en una factoría de datos para vincular los almacenes de datos y los servicios de proceso con la factoría de datos. En esta guía de inicio rápido, basta con crear un servicio vinculado de Azure Storage como almacén de origen de copia y receptor. En el ejemplo, se denomina "AzureStorageLinkedService". Reemplace <storageaccountname>
y <storageaccountkey>
por el nombre de la cuenta de Azure Storage y su clave.
# Create an Azure Storage linked service
ls_name = 'storageLinkedService001'
# IMPORTANT: specify the name and key of your Azure Storage account.
storage_string = SecureString(value='DefaultEndpointsProtocol=https;AccountName=<account name>;AccountKey=<account key>;EndpointSuffix=<suffix>')
ls_azure_storage = LinkedServiceResource(properties=AzureStorageLinkedService(connection_string=storage_string))
ls = adf_client.linked_services.create_or_update(rg_name, df_name, ls_name, ls_azure_storage)
print_item(ls)
Creación de conjuntos de datos
En esta sección, se crean dos conjuntos de datos: uno para el origen y otro para el receptor.
Creación de un conjunto de datos para el blob de Azure de origen
Agregue el código siguiente al método main que crea un conjunto de datos de blob de Azure. Para obtener más información sobre las propiedades del conjunto de datos de blob, consulte el artículo Conector de blob de Azure.
Se define un conjunto de datos que representa los datos de origen del blob de Azure. Este conjunto de datos de blob hace referencia al servicio vinculado de Azure Storage que creó en el paso anterior.
# Create an Azure blob dataset (input)
ds_name = 'ds_in'
ds_ls = LinkedServiceReference(type="LinkedServiceReference",reference_name=ls_name)
blob_path = '<container>/<folder path>'
blob_filename = '<file name>'
ds_azure_blob = DatasetResource(properties=AzureBlobDataset(
linked_service_name=ds_ls, folder_path=blob_path, file_name=blob_filename))
ds = adf_client.datasets.create_or_update(
rg_name, df_name, ds_name, ds_azure_blob)
print_item(ds)
Creación de un conjunto de datos para el blob de Azure receptor
Agregue el código siguiente al método main que crea un conjunto de datos de blob de Azure. Para obtener más información sobre las propiedades del conjunto de datos de blob, consulte el artículo Conector de blob de Azure.
Se define un conjunto de datos que representa los datos de origen del blob de Azure. Este conjunto de datos de blob hace referencia al servicio vinculado de Azure Storage que creó en el paso anterior.
# Create an Azure blob dataset (output)
dsOut_name = 'ds_out'
output_blobpath = '<container>/<folder path>'
dsOut_azure_blob = DatasetResource(properties=AzureBlobDataset(linked_service_name=ds_ls, folder_path=output_blobpath))
dsOut = adf_client.datasets.create_or_update(
rg_name, df_name, dsOut_name, dsOut_azure_blob)
print_item(dsOut)
Crear una canalización
Agregue el código siguiente al método main que crea una canalización con una actividad de copia.
# Create a copy activity
act_name = 'copyBlobtoBlob'
blob_source = BlobSource()
blob_sink = BlobSink()
dsin_ref = DatasetReference(reference_name=ds_name)
dsOut_ref = DatasetReference(reference_name=dsOut_name)
copy_activity = CopyActivity(name=act_name,inputs=[dsin_ref], outputs=[dsOut_ref], source=blob_source, sink=blob_sink)
#Create a pipeline with the copy activity
#Note1: To pass parameters to the pipeline, add them to the json string params_for_pipeline shown below in the format { “ParameterName1” : “ParameterValue1” } for each of the parameters needed in the pipeline.
#Note2: To pass parameters to a dataflow, create a pipeline parameter to hold the parameter name/value, and then consume the pipeline parameter in the dataflow parameter in the format @pipeline().parameters.parametername.
p_name = 'copyPipeline'
params_for_pipeline = {}
p_name = 'copyPipeline'
params_for_pipeline = {}
p_obj = PipelineResource(activities=[copy_activity], parameters=params_for_pipeline)
p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, p_obj)
print_item(p)
Creación de una ejecución de canalización
Agregue el código siguiente al método Main que desencadena una ejecución de canalización.
# Create a pipeline run
run_response = adf_client.pipelines.create_run(rg_name, df_name, p_name, parameters={})
Supervisar una ejecución de canalización
Agregue el código siguiente al método Main para supervisar la ejecución de la canalización:
# Monitor the pipeline run
time.sleep(30)
pipeline_run = adf_client.pipeline_runs.get(
rg_name, df_name, run_response.run_id)
print("\n\tPipeline run status: {}".format(pipeline_run.status))
filter_params = RunFilterParameters(
last_updated_after=datetime.now() - timedelta(1), last_updated_before=datetime.now() + timedelta(1))
query_response = adf_client.activity_runs.query_by_pipeline_run(
rg_name, df_name, pipeline_run.run_id, filter_params)
print_activity_run_details(query_response.value[0])
Ahora, agregue la siguiente instrucción para invocar el método Main cuando se ejecute el programa:
# Start the main method
main()
Script completo
Este es el código de Python completo:
from azure.identity import ClientSecretCredential
from azure.mgmt.resource import ResourceManagementClient
from azure.mgmt.datafactory import DataFactoryManagementClient
from azure.mgmt.datafactory.models import *
from datetime import datetime, timedelta
import time
def print_item(group):
"""Print an Azure object instance."""
print("\tName: {}".format(group.name))
print("\tId: {}".format(group.id))
if hasattr(group, 'location'):
print("\tLocation: {}".format(group.location))
if hasattr(group, 'tags'):
print("\tTags: {}".format(group.tags))
if hasattr(group, 'properties'):
print_properties(group.properties)
def print_properties(props):
"""Print a ResourceGroup properties instance."""
if props and hasattr(props, 'provisioning_state') and props.provisioning_state:
print("\tProperties:")
print("\t\tProvisioning State: {}".format(props.provisioning_state))
print("\n\n")
def print_activity_run_details(activity_run):
"""Print activity run details."""
print("\n\tActivity run details\n")
print("\tActivity run status: {}".format(activity_run.status))
if activity_run.status == 'Succeeded':
print("\tNumber of bytes read: {}".format(activity_run.output['dataRead']))
print("\tNumber of bytes written: {}".format(activity_run.output['dataWritten']))
print("\tCopy duration: {}".format(activity_run.output['copyDuration']))
else:
print("\tErrors: {}".format(activity_run.error['message']))
def main():
# Azure subscription ID
subscription_id = '<subscription ID>'
# This program creates this resource group. If it's an existing resource group, comment out the code that creates the resource group
rg_name = '<resource group>'
# The data factory name. It must be globally unique.
df_name = '<factory name>'
# Specify your Active Directory client ID, client secret, and tenant ID
credentials = ClientSecretCredential(client_id='<service principal ID>', client_secret='<service principal key>', tenant_id='<tenant ID>')
resource_client = ResourceManagementClient(credentials, subscription_id)
adf_client = DataFactoryManagementClient(credentials, subscription_id)
rg_params = {'location':'westus'}
df_params = {'location':'westus'}
# create the resource group
# comment out if the resource group already exits
resource_client.resource_groups.create_or_update(rg_name, rg_params)
# Create a data factory
df_resource = Factory(location='westus')
df = adf_client.factories.create_or_update(rg_name, df_name, df_resource)
print_item(df)
while df.provisioning_state != 'Succeeded':
df = adf_client.factories.get(rg_name, df_name)
time.sleep(1)
# Create an Azure Storage linked service
ls_name = 'storageLinkedService001'
# IMPORTANT: specify the name and key of your Azure Storage account.
storage_string = SecureString(value='DefaultEndpointsProtocol=https;AccountName=<account name>;AccountKey=<account key>;EndpointSuffix=<suffix>')
ls_azure_storage = LinkedServiceResource(properties=AzureStorageLinkedService(connection_string=storage_string))
ls = adf_client.linked_services.create_or_update(rg_name, df_name, ls_name, ls_azure_storage)
print_item(ls)
# Create an Azure blob dataset (input)
ds_name = 'ds_in'
ds_ls = LinkedServiceReference(type="LinkedServiceReference",reference_name=ls_name)
blob_path = '<container>/<folder path>'
blob_filename = '<file name>'
ds_azure_blob = DatasetResource(properties=AzureBlobDataset(
linked_service_name=ds_ls, folder_path=blob_path, file_name=blob_filename))
ds = adf_client.datasets.create_or_update(
rg_name, df_name, ds_name, ds_azure_blob)
print_item(ds)
# Create an Azure blob dataset (output)
dsOut_name = 'ds_out'
output_blobpath = '<container>/<folder path>'
dsOut_azure_blob = DatasetResource(properties=AzureBlobDataset(linked_service_name=ds_ls, folder_path=output_blobpath))
dsOut = adf_client.datasets.create_or_update(
rg_name, df_name, dsOut_name, dsOut_azure_blob)
print_item(dsOut)
# Create a copy activity
act_name = 'copyBlobtoBlob'
blob_source = BlobSource()
blob_sink = BlobSink()
dsin_ref = DatasetReference(reference_name=ds_name)
dsOut_ref = DatasetReference(reference_name=dsOut_name)
copy_activity = CopyActivity(name=act_name, inputs=[dsin_ref], outputs=[
dsOut_ref], source=blob_source, sink=blob_sink)
# Create a pipeline with the copy activity
p_name = 'copyPipeline'
params_for_pipeline = {}
p_obj = PipelineResource(
activities=[copy_activity], parameters=params_for_pipeline)
p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, p_obj)
print_item(p)
# Create a pipeline run
run_response = adf_client.pipelines.create_run(rg_name, df_name, p_name, parameters={})
# Monitor the pipeline run
time.sleep(30)
pipeline_run = adf_client.pipeline_runs.get(
rg_name, df_name, run_response.run_id)
print("\n\tPipeline run status: {}".format(pipeline_run.status))
filter_params = RunFilterParameters(
last_updated_after=datetime.now() - timedelta(1), last_updated_before=datetime.now() + timedelta(1))
query_response = adf_client.activity_runs.query_by_pipeline_run(
rg_name, df_name, pipeline_run.run_id, filter_params)
print_activity_run_details(query_response.value[0])
# Start the main method
main()
Ejecución del código
Compile e inicie la aplicación y, a continuación, compruebe la ejecución de la canalización.
La consola imprime el progreso de la creación de la factoría de datos, el servicio vinculado, los conjuntos de datos, la canalización y la ejecución de canalización. Espere hasta que vea los detalles de ejecución de actividad de copia con el tamaño de los datos leídos/escritos. A continuación, use herramientas como Explorador de Azure Storage para comprobar que los blobs se copian a "outputBlobPath" desde "inputBlobPath", como se especificó en las variables.
Este es la salida de ejemplo:
Name: <data factory name>
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>
Location: eastus
Tags: {}
Name: storageLinkedService
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/linkedservices/storageLinkedService
Name: ds_in
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/datasets/ds_in
Name: ds_out
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/datasets/ds_out
Name: copyPipeline
Id: /subscriptions/<subscription ID>/resourceGroups/<resource group name>/providers/Microsoft.DataFactory/factories/<data factory name>/pipelines/copyPipeline
Pipeline run status: Succeeded
Datetime with no tzinfo will be considered UTC.
Datetime with no tzinfo will be considered UTC.
Activity run details
Activity run status: Succeeded
Number of bytes read: 18
Number of bytes written: 18
Copy duration: 4
Limpieza de recursos
Para eliminar la factoría de datos, agregue el siguiente código al programa:
adf_client.factories.delete(rg_name, df_name)
Contenido relacionado
La canalización de este ejemplo copia los datos de una ubicación a otra en una instancia de Azure Blob Storage. Consulte los tutoriales para obtener información acerca del uso de Data Factory en otros escenarios.