Nota
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
SE APLICA A:
Azure Data Factory
Azure Synapse Analytics
Sugerencia
Data Factory en Microsoft Fabric es la próxima generación de Azure Data Factory, con una arquitectura más sencilla, inteligencia artificial integrada y nuevas características. Si no está familiarizado con la integración de datos, comience con Fabric Data Factory. Las cargas de trabajo de ADF existentes pueden actualizarse a Fabric para acceder a nuevas funcionalidades en ciencia de datos, análisis en tiempo real e informes.
En este tutorial, creará una instancia de Azure Data Factory con una canalización que carga los datos diferenciales de una tabla de Azure SQL Database a una instancia de Azure Blob Storage.
En este tutorial, realizará los siguientes pasos:
- Preparación del almacén de datos para almacenar el valor de marca de agua
- Creación de una factoría de datos.
- Cree servicios vinculados.
- Crear conjuntos de datos de origen, destino y marca de agua.
- Creación de una canalización
- Ejecución de la canalización
- Supervisión de la ejecución de la canalización
- Revisión de los resultados
- Añadir más datos al origen.
- Repetición de la ejecución de la canalización
- Supervisión de la segunda ejecución de la canalización
- Revisión de los resultados de la segunda ejecución
Información general
Este es el diagrama de solución de alto nivel:
Estos son los pasos importantes para crear esta solución:
Seleccione la columna de marca de agua. Seleccione una columna en el almacén de datos de origen, que pueda usarse para segmentar los registros nuevos o actualizados para cada ejecución. Normalmente, los datos de esta columna seleccionada (por ejemplo, last_modify_time o id.) siguen aumentando cuando se crean o se actualizan las filas. El valor máximo en esta columna se utiliza como indicador de referencia.
Prepare el almacén de datos para almacenar el valor de marca de agua. En este tutorial, el valor de marca de agua se almacena en una base de datos SQL.
Cree una canalización con el siguiente flujo de trabajo:
La canalización de esta solución consta de las siguientes actividades:
- Cree dos actividades de búsqueda. Use la primera actividad de búsqueda para recuperar el último valor de marca de agua. Utilice la segunda actividad de búsqueda para recuperar el nuevo valor de watermark. Estos valores de marca de agua se pasan a la actividad de copia.
- Cree una actividad de copia que copie las filas del almacén de datos de origen con un valor en la columna de marca de agua mayor que el valor de marca de agua anterior y menor que el nuevo valor de marca de agua. A continuación, copia los datos diferenciales del almacén de datos de origen en Blob Storage como un archivo nuevo.
- Cree un procedimiento almacenado que actualice el valor de marca de agua de la canalización que se ejecute la próxima vez.
Si no tiene una suscripción de Azure, cree una cuenta free antes de comenzar.
Requisitos previos
- Azure SQL Database. La base de datos se usa como almacén de datos de origen. Si no tiene una base de datos en Azure SQL Database, consulte Crear una base de datos en Azure SQL Database para ver los pasos para crear una.
- Azure Storage. Blob Storage se usa como almacén de datos receptor. Si no tiene una cuenta de almacenamiento, consulte la sección Crear una cuenta de almacenamiento para ver los pasos para su creación. Cree un contenedor llamado adftutorial.
Creación de una tabla de origen de datos en la base de datos SQL
Abrir SQL Server Management Studio. En el Explorador de servidores, haga clic con el botón derecho en la base de datos y elija Nueva consulta.
Ejecute el siguiente comando SQL en la base de datos SQL para crear una tabla denominada
data_source_tablecomo el almacén de origen de datos:create table data_source_table ( PersonID int, Name varchar(255), LastModifytime datetime ); INSERT INTO data_source_table (PersonID, Name, LastModifytime) VALUES (1, 'aaaa','9/1/2017 12:56:00 AM'), (2, 'bbbb','9/2/2017 5:23:00 AM'), (3, 'cccc','9/3/2017 2:36:00 AM'), (4, 'dddd','9/4/2017 3:21:00 AM'), (5, 'eeee','9/5/2017 8:06:00 AM');En este tutorial, utilizará LastModifytime como columna de marca de agua. Los datos del almacén de origen de datos se muestran en la tabla siguiente:
PersonID | Name | LastModifytime -------- | ---- | -------------- 1 | aaaa | 2017-09-01 00:56:00.000 2 | bbbb | 2017-09-02 05:23:00.000 3 | cccc | 2017-09-03 02:36:00.000 4 | dddd | 2017-09-04 03:21:00.000 5 | eeee | 2017-09-05 08:06:00.000
Crea otra tabla en tu base de datos SQL para almacenar el valor de high watermark
Ejecute el siguiente comando SQL en la base de datos SQL para crear una tabla denominada
watermarktabley almacenar el valor de marca de agua:create table watermarktable ( TableName varchar(255), WatermarkValue datetime, );Establezca el valor predeterminado de límite máximo con el nombre de la tabla del almacén de datos de origen. En este tutorial, el nombre de tabla es data_source_table.
INSERT INTO watermarktable VALUES ('data_source_table','1/1/2010 12:00:00 AM')Revise los datos de la tabla
watermarktable.Select * from watermarktableSalida:
TableName | WatermarkValue ---------- | -------------- data_source_table | 2010-01-01 00:00:00.000
Creación de un procedimiento almacenado en la base de datos SQL
Ejecute el siguiente comando para crear un procedimiento almacenado en la base de datos SQL:
CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS
BEGIN
UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName
END
Crear una factoría de datos
Inicie Microsoft Edge o Google Chrome explorador web. Actualmente, la interfaz de usuario de Data Factory solo se admite en exploradores web de Microsoft Edge y Google Chrome.
En el menú superior, seleccione Crear un recurso>Analytics>Data Factory :
En la página Nueva factoría de datos, escriba ADFIncCopyTutorialDF para el nombre.
El nombre del Azure Data Factory debe ser globalmente único. Si ve un signo de exclamación rojo con el siguiente error, cambie el nombre de la factoría de datos (por ejemplo, yournameADFIncCopyTutorialDF) e intente crearla de nuevo. Consulte el artículo Azure Data Factory: reglas de nomenclatura para conocer las reglas de nomenclatura de los artefactos de Data Factory.
El nombre "ADFIncCopyTutorialDF" de factoría de datos no está disponible
Seleccione el Azure subscription en el que desea crear la factoría de datos.
Para el grupo de recursos, realice uno de los siguientes pasos:
Seleccione en primer lugar Usar existente y después un grupo de recursos de la lista desplegable.
Seleccione Crear nuevoy escriba el nombre de un grupo de recursos.
Para obtener información sobre los grupos de recursos, consulte Uso de grupos de recursos para administrar los recursos de Azure.
Seleccione V2 para la versión.
Seleccione la ubicación de Data Factory. En la lista desplegable solo se muestran las ubicaciones que se admiten. Los almacenes de datos (Azure Storage, Azure SQL Database, Azure SQL Managed Instance, etc.) y los procesos (HDInsight, etc.) usados por la factoría de datos pueden estar en otras regiones.
Haga clic en Crear.
Una vez completada la creación, verá la página Data Factory tal como se muestra en la imagen.
Página de inicio del Azure Data Factory, con el recuadro de Open Azure Data Factory Studio.
Seleccione Open en el icono Abrir Azure Data Factory Studio para iniciar la interfaz de usuario (UI) de Azure Data Factory en una pestaña independiente.
Crear una canalización
En este tutorial, crearás una canalización con dos actividades de búsqueda, una actividad de copia y una actividad de procedimiento almacenado, todas encadenadas en una canalización.
En la página principal de la interfaz de usuario de Data Factory, haga clic en el icono Orquestar.
En el panel General, en Propiedades, especifique IncrementalCopyPipeline en Nombre. A continuación, contraiga el panel; para ello, haga clic en el icono Propiedades en la esquina superior derecha.
Agreguemos la primera actividad de búsqueda para recuperar el valor de marca de agua anterior. En el cuadro de herramientas Activities (Actividades), expanda General (General), arrastre la actividad Lookup (Búsqueda) y colóquela en la superficie del diseñador de canalizaciones. Cambie el nombre de la actividad a LookupOldWaterMarkActivity.
Cambie a la pestaña Settings (Configuración) y haga clic en + New (+ Nuevo) para Source Dataset (Conjunto de datos de origen). En este paso, creará conjuntos de datos que representarán los datos de watermarktable. Esta tabla contiene la antigua marca de agua que se utilizó en la operación de copia anterior.
En la ventana Nuevo conjunto de datos, seleccione Azure SQL Database y haga clic en Continue. Verá que se abre una nueva ventana para el conjunto de datos.
En la ventana Set properties (Establecer propiedades) para el conjunto de datos, escriba WatermarkDataset en Name (Nombre).
En Linked Service (Servicio vinculado), seleccione New (Nuevo) y siga estos pasos:
Escriba AzureSqlDatabaseLinkedService en Nombre.
Seleccione su servidor en Server name (Nombre del servidor).
Seleccione el nombre de la base de datos en el menú desplegable.
Escriba su nombre de usuario y contraseña.
Para probar la conexión a la base de datos de SQL, haga clic en Test connection (Prueba de conexión).
Haga clic en Finalizar
Confirme que AzureSqlDatabaseLinkedService está seleccionado en Linked service (Servicio vinculado).
Seleccione Finalizar.
En la pestaña Connection (Conexión), seleccione [dbo].[watermarktable] en Table (Tabla). Si desea una vista previa de los datos de la tabla, haga clic en Preview data (Vista previa de los datos).
Cambie al editor de canalización; para ello, haga clic en la pestaña de la canalización de la parte superior o en el nombre de esta de la vista de árbol de la izquierda. En la ventana de propiedades de la actividad de búsqueda, confirme que WatermarkDataset está seleccionado en el campo Source Dataset (Conjunto de datos de origen).
En el cuadro de herramientas Activities (Actividades), expanda General (General), y arrastre otra actividad Lookup (Búsqueda) y colóquela en la superficie del diseñador de canalizaciones; después, establezca el nombre en LookupNewWaterMarkActivity en la pestaña General (General) de la ventana de propiedades. Esta actividad de búsqueda obtiene el nuevo valor de marca de agua de la tabla y copia los datos de origen en el destino.
En la ventana de propiedades de la segunda actividad de búsqueda, cambie a la pestaña Settings (Configuración) y haga clic en New (Nuevo). Creará un conjunto de datos que apuntará a la tabla de origen con el nuevo valor de marca de agua (valor máximo de LastModifyTime).
En la ventana Nuevo conjunto de datos, seleccione Azure SQL Database y haga clic en Continue.
En la ventana Set Properties (Establecer propiedades), escriba SourceDataset (Conjunto de datos de origen) en Name (Nombre). Seleccione AzureSqlDatabaseLinkedService como Linked service (Servicio vinculado).
Seleccione la tabla [dbo].[data_source_table]. Más adelante en este tutorial especificará una consulta en este conjunto de datos. La consulta tiene prioridad sobre la tabla que se especifica en este paso.
Seleccione Finalizar.
Cambie al editor de canalización; para ello, haga clic en la pestaña de la canalización de la parte superior o en el nombre de esta de la vista de árbol de la izquierda. En la ventana de propiedades de la actividad de búsqueda, confirme que SourceDataset está seleccionado en el campo Source Dataset (Conjunto de datos de origen).
Seleccione Query (Consulta) en el campo Use Query (Usar consulta) y escriba la siguiente consulta: solo se selecciona el valor máximo de LastModifytime de data_source_table. Asegúrese de haber activado también First row only (Solo la primera fila).
select MAX(LastModifytime) as NewWatermarkvalue from data_source_table
En el cuadro de herramientas Activities (Actividades), expanda Move & Transform (Mover y transformar) y arrastre la actividad Copy (Copiar) del cuadro de herramientas Activities (Actividades); después, establezca el nombre en IncrementalCopyActivity.
Conecte ambas actividades de búsqueda a la actividad de copia arrastrando el botón verde adjunto a las actividades de búsqueda hacia la actividad de copia. Suelte el botón del ratón cuando vea que el color del borde de la actividad de Copia cambia a azul.
Seleccione el actividad de copia y confirme que ve las propiedades de la actividad en la ventana Properties.
Cambie a la pestaña Source (Origen) de la ventana Properties (Propiedades) y realice los pasos siguientes:
Seleccione SourceDataset en el campo Source Dataset (Conjunto de datos de origen).
Seleccione Query (Consulta) en el campo Use Query (Usar consulta).
Escriba la siguiente consulta SQL en el campo Query (Consulta).
select * from data_source_table where LastModifytime > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and LastModifytime <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'
Cambie a la pestaña Sink (Receptor) y haga clic en + New (+ Nuevo) en el campo Sink Dataset (Conjunto de datos receptor).
En este tutorial, el almacén de datos receptor es de tipo Azure Blob Storage. Por lo tanto, seleccione Azure Blob Storage y haga clic en Continue en la ventana Nuevo conjunto de datos.
En la página Select Format (Seleccionar formato), elija el tipo de formato de los datos y, después, seleccione Continue (Continuar).
En la ventana Set Properties (Establecer propiedades), escriba SinkDataset (Conjunto de datos de receptor) en Name (Nombre). En Linked Service (Servicio vinculado), seleccione + New (+ Nuevo). En este paso, creas una conexión (servicio vinculado) a tu Azure Blob Storage.
En la ventana Nuevo servicio vinculado (Azure Blob Storage), siga estos pasos:
- Escriba AzureStorageLinkedService en Nombre.
- Seleccione su cuenta de almacenamiento de Azure para nombre de la cuenta de almacenamiento.
- Pruebe la conexión y, a continuación, haga clic en Finalizar.
En la ventana Set Properties (Establecer propiedades), confirme que AzureStorageLinkedService está seleccionado para Linked service (Servicio vinculado). Después, seleccione Finalizar.
Vaya a la pestaña Connection (Conexión) de SinkDataset y realice los pasos siguientes:
- En el campo File path (Ruta de acceso de archivo), escriba adftutorial/incrementalcopy. adftutorial es el nombre del contenedor de blobs; incrementalcopy, el de la carpeta. Con este fragmento de código se da por supuesto que tiene un contenedor de blob denominado adftutorial en Blob Storage. Cree el contenedor si no existe o asígnele el nombre de uno existente. Azure Data Factory crea automáticamente la carpeta de salida incrementalcopy si no existe. También puede usar el botón Browse (Examinar) para ir a una carpeta del contenedor de blobs mediante la ruta de acceso del archivo.
- En la parte File (Archivo) del campo File path (Ruta de acceso de archivo), seleccione Add dynamic content [ALT+P] y, después, escriba
@CONCAT('Incremental-', pipeline().RunId, '.txt')en la ventana abierta. Después, seleccione Finalizar. El nombre de archivo se genera dinámicamente mediante la expresión. Cada ejecución de canalización tiene un identificador único. El actividad de copia usa el identificador de ejecución para generar el nombre de archivo.
Cambie al editor de canalización; para ello, haga clic en la pestaña de la canalización de la parte superior o en el nombre de esta de la vista de árbol de la izquierda.
En el cuadro de herramientas Actividades, despliegue General, y arrastre la actividad Stored Procedure (Procedimiento almacenado) del cuadro de herramientas Actividades y colóquela en la superficie del diseñador de canalizaciones. Conecte el resultado verde (correcto) de la actividad Copy (Copiar) con la actividad Stored Procedure (Procedimiento almacenado).
Seleccione Actividad de Procedimiento Almacenado en el diseñador de canalizaciones y cambie su nombre a StoredProceduretoWriteWatermarkActivity.
Cambie a la pestaña SQL Account (Cuenta de SQL) y seleccione AzureSqlDatabaseLinkedService en Linked service (Servicio vinculado).
Cambie a la pestaña Stored Procedure (Procedimiento almacenado) y realice los pasos siguientes:
En Stored procedure name (Nombre del procedimiento almacenado), seleccione usp_write_watermark.
Para especificar valores para los parámetros del procedimiento almacenado, haga clic en Import parameter (Importar parámetro) y escriba los valores siguientes:
Nombre Tipo Importancia Hora de última modificación Fecha y hora @{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue} TableName (Nombre de la tabla) Cuerda @{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}
Para validar la configuración de canalización, haga clic en Validate (Validar) en la barra de herramientas. Confirme que no haya errores de comprobación. Para cerrar la ventana Pipeline Validation Report (Informe de validación de la canalización), haga clic en >>.
Publique entidades (servicios vinculados, conjuntos de datos y canalizaciones) en el servicio Azure Data Factory seleccionando el botón Publish All. Espere hasta que vea un mensaje de que la publicación se completó exitosamente.
Desencadenamiento de una ejecución de la canalización
Haga clic en Add Trigger (Agregar disparador) en la barra de herramientas y luego en Trigger Now (Disparar ahora).
En la ventana Pipeline Run (Ejecución de canalización), seleccione Finish (Finalizar).
Supervisión de la ejecución de la canalización
Cambie a la pestaña Monitor a la izquierda. Verá el estado de ejecución de la canalización desencadenada por un desencadenador manual. Puede usar los vínculos de la columna NOMBRE DE CANALIZACIÓN para ver los detalles de la ejecución y volver a ejecutar la canalización.
Para ver las ejecuciones de actividad asociadas a la ejecución de la canalización, seleccione el vínculo en la columna NOMBRE DE CANALIZACIÓN. Para más información sobre las ejecuciones de actividad, seleccione el vínculo Detalles (icono de gafas) en la columna NOMBRE DE ACTIVIDAD. Para volver a la vista Ejecuciones de canalización, seleccione All pipeline runs (Todas las ejecuciones de canalización) en la parte superior. Para actualizar la vista, seleccione Refresh (Actualizar).
Revisión del resultado
Conéctese a la cuenta de Azure Storage mediante herramientas como Explorador de Azure Storage. Compruebe que se crea un archivo de salida en la carpeta incrementalcopy del contenedor adftutorial.
Abra el archivo de salida y observe que todos los datos se hayan copiado de data_source_table en el archivo de blobs.
1,aaaa,2017-09-01 00:56:00.0000000 2,bbbb,2017-09-02 05:23:00.0000000 3,cccc,2017-09-03 02:36:00.0000000 4,dddd,2017-09-04 03:21:00.0000000 5,eeee,2017-09-05 08:06:00.0000000Compruebe el valor más reciente de
watermarktable. Verá que se ha actualizado el valor de marca de agua.Select * from watermarktableEste es el resultado:
| TableName | WatermarkValue | | --------- | -------------- | | data_source_table | 2017-09-05 8:06:00.000 |
Incorporación de más datos al origen
Inserte nuevos datos en la base de datos (almacén de origen de datos).
INSERT INTO data_source_table
VALUES (6, 'newdata','9/6/2017 2:23:00 AM')
INSERT INTO data_source_table
VALUES (7, 'newdata','9/7/2017 9:01:00 AM')
Los datos actualizados de la base de datos son los siguientes:
PersonID | Name | LastModifytime
-------- | ---- | --------------
1 | aaaa | 2017-09-01 00:56:00.000
2 | bbbb | 2017-09-02 05:23:00.000
3 | cccc | 2017-09-03 02:36:00.000
4 | dddd | 2017-09-04 03:21:00.000
5 | eeee | 2017-09-05 08:06:00.000
6 | newdata | 2017-09-06 02:23:00.000
7 | newdata | 2017-09-07 09:01:00.000
Desencadenamiento de otra ejecución de canalización
Cambie a la pestaña Edit (Editar). Si no está abierta en el diseñador, haga clic en la canalización en la vista de árbol.
Haga clic en Add Trigger (Agregar disparador) en la barra de herramientas y luego en Trigger Now (Disparar ahora).
Supervisión de la segunda ejecución de la canalización
Cambie a la pestaña Monitor a la izquierda. Verá el estado de ejecución de la canalización desencadenada por un desencadenador manual. Puede usar los vínculos de la columna PIPELINE NAME (Nombre de la canalización) para ver los detalles de la actividad y volver a ejecutar la canalización.
Para ver las ejecuciones de actividad asociadas a la ejecución de la canalización, seleccione el vínculo en la columna NOMBRE DE CANALIZACIÓN. Para más información sobre las ejecuciones de actividad, seleccione el vínculo Detalles (icono de gafas) en la columna NOMBRE DE ACTIVIDAD. Para volver a la vista Ejecuciones de canalización, seleccione All pipeline runs (Todas las ejecuciones de canalización) en la parte superior. Para actualizar la vista, seleccione Refresh (Actualizar).
Comprobación de la segunda salida
En la instancia de Blob Storage, verá que se ha creado otro archivo. En este tutorial, el nombre de archivo nuevo es
Incremental-<GUID>.txt. Abra ese archivo y verá que contiene dos filas de registros.6,newdata,2017-09-06 02:23:00.0000000 7,newdata,2017-09-07 09:01:00.0000000Compruebe el valor más reciente de
watermarktable. Verá que se ha vuelto a actualizar el valor de marca de agua.Select * from watermarktableSalida del ejemplo:
| TableName | WatermarkValue | | --------- | -------------- | | data_source_table | 2017-09-07 09:01:00.000 |
Contenido relacionado
En este tutorial, realizó los pasos siguientes:
- Preparación del almacén de datos para almacenar el valor de marca de agua
- Creación de una factoría de datos.
- Cree servicios vinculados.
- Crear conjuntos de datos de origen, destino y marca de agua.
- Creación de una canalización
- Ejecución de la canalización
- Supervisión de la ejecución de la canalización
- Revisión de los resultados
- Añadir más datos al origen.
- Repetición de la ejecución de la canalización
- Supervisión de la segunda ejecución de la canalización
- Revisión de los resultados de la segunda ejecución
En este tutorial, la canalización copió datos de una única tabla de SQL Database a Blob Storage. Pase al siguiente tutorial para aprender a copiar datos de varias tablas de una base de datos de SQL Server a SQL Database.