Carga incremental de datos de Data Warehouse a almacenes de lago
En este tutorial, aprenderá a cargar datos de forma incremental desde Data Warehouse a almacenes de lago.
Información general
Este es el diagrama de solución de alto nivel:
Estos son los pasos importantes para crear esta solución:
Seleccione la columna de marca de agua. Seleccione una columna en la tabla de datos de origen que pueda usarse para segmentar registros nuevos o actualizados para cada ejecución. Normalmente, los datos de esta columna seleccionada (por ejemplo, last_modify_time o id.) siguen aumentando cuando se crean o se actualizan las filas. El valor máximo de esta columna se utiliza como una marca de agua.
Prepare una tabla para almacenar el último valor de referencia en el almacenamiento de datos.
Cree una canalización con el siguiente flujo de trabajo:
La canalización de esta solución consta de las siguientes actividades:
- Cree dos actividades de búsqueda. Use la primera actividad de búsqueda para recuperar el último valor de marca de agua y, la segunda actividad, para recuperar el nuevo valor de marca de agua. Estos valores de marca de agua se pasan a la actividad de copia.
- Cree una actividad de copia que copie filas de la tabla de datos de origen con el valor de la columna de referencia que sea mayor que el valor anterior y menor que el nuevo. A continuación, copiará los datos de almacenamiento de datos al almacén de lago como un nuevo archivo.
- Cree una actividad de procedimiento almacenado que actualice el valor de la última referencia de la siguiente ejecución de canalización.
Requisitos previos
- Almacenamiento de datos. El Almacenamiento de datos se usa como almacén de datos de origen. Si no lo tuviera, consulte Crear un almacén de datos para conocer los pasos para crear uno.
- Almacén de lago. Use el almacén de lago como almacén de datos de destino. Si no lo tuviera, consulte Creación de un almacén de lago para conocer los pasos para crear uno. Cree una carpeta denominada IncrementalCopy para almacenar los datos copiados.
Preparando el origen
Estas son algunas tablas y procedimientos almacenados que debe preparar en el almacenamiento de datos de origen antes de configurar la canalización de copia incremental.
1. Cree una tabla de origen de datos en el almacenamiento de datos
Ejecute el siguiente comando SQL en su almacén de datos para crear una tabla denominada data_source_table como tabla de origen de datos. En este tutorial, la usará como datos de ejemplo para realizar la copia incremental.
create table data_source_table
(
PersonID int,
Name varchar(255),
LastModifytime DATETIME2(6)
);
INSERT INTO data_source_table
(PersonID, Name, LastModifytime)
VALUES
(1, 'aaaa','9/1/2017 12:56:00 AM'),
(2, 'bbbb','9/2/2017 5:23:00 AM'),
(3, 'cccc','9/3/2017 2:36:00 AM'),
(4, 'dddd','9/4/2017 3:21:00 AM'),
(5, 'eeee','9/5/2017 8:06:00 AM');
Los datos en la tabla de origen de datos se muestran a continuación:
PersonID | Name | LastModifytime
-------- | ---- | --------------
1 | aaaa | 2017-09-01 00:56:00.000
2 | bbbb | 2017-09-02 05:23:00.000
3 | cccc | 2017-09-03 02:36:00.000
4 | dddd | 2017-09-04 03:21:00.000
5 | eeee | 2017-09-05 08:06:00.000
En este tutorial, se usará LastModifytime como columna de referencia.
2. Cree otra tabla en el almacenamiento de datos para almacenar el último valor de referencia
Ejecute el siguiente comando SQL en el almacenamiento de datos para crear una tabla denominada watermarktable para almacenar el último valor de referencia:
create table watermarktable ( TableName varchar(255), WatermarkValue DATETIME2(6), );
Establezca el valor predeterminado de la última referencia con el nombre de la tabla de la tabla de datos de origen. En este tutorial, el nombre de la tabla es data_source_table y el valor predeterminado es
1/1/2010 12:00:00 AM
.INSERT INTO watermarktable VALUES ('data_source_table','1/1/2010 12:00:00 AM')
Revise los datos de la tabla watermarktable.
Select * from watermarktable
Salida:
TableName | WatermarkValue ---------- | -------------- data_source_table | 2010-01-01 00:00:00.000
3. Cree un procedimiento almacenado en el almacenamiento de datos
Ejecute el siguiente comando para crear un procedimiento almacenado en el almacenamiento de datos. Este procedimiento almacenado se usa para ayudar a actualizar el último valor de referencia después de la última ejecución de canalización.
CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS
BEGIN
UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName
END
Configuración de una canalización para copia incremental
Paso 1: crear una canalización
Vaya a Power BI.
Seleccione el icono de Power BI en la parte inferior izquierda de la pantalla y, después, seleccione Data Factory para abrir la página principal de Data Factory.
Vaya al área de trabajo de Microsoft Fabric.
Seleccione Canalización de datos y, a continuación, escriba un nombre de canalización para crear una nueva canalización.
Paso 2: agregar una actividad de búsqueda para la última referencia
En este paso, creará una actividad de búsqueda para obtener el último valor de referencia. Valor predeterminado 1/1/2010 12:00:00 AM
establecido antes de su obtención.
Seleccione Agregar actividad de canalización y seleccione Búsqueda en la lista desplegable.
En la pestaña General, cambie el nombre de esta actividad a LookupOldWaterMarkActivity.
En la pestaña Configuración, realice la siguiente configuración:
- Tipo de banco de datos : seleccione Espacio de trabajo.
- Tipo de almacén de datos del área de trabajo: seleccione Almacenamiento de datos.
- Almacenamiento de datos: seleccione su almacenamiento de datos.
- Usar consulta: elija Tabla.
- Tabla: elija dbo.watermarktable.
- Solo primera fila: seleccionado.
Paso 3: agregar una actividad de búsqueda para la referencia nueva
En este paso, creará una actividad de búsqueda para obtener el valor de referencia nuevo. Usará una consulta para obtener la nueva referencia de la tabla de datos de origen. Se obtendrá el valor máximo de la columna LastModifytime en data_source_table.
En la barra superior, seleccione Búsqueda en la pestaña Actividades para agregar la segunda actividad de búsqueda.
En la pestaña General, cambie el nombre de esta actividad a LookupNewWaterMarkActivity.
En la pestaña Configuración, realice la siguiente configuración:
Tipo de banco de datos : seleccione Espacio de trabajo.
Tipo de almacén de datos del área de trabajo: seleccione Almacenamiento de datos.
Almacenamiento de datos: seleccione su almacenamiento de datos.
Usar consulta: elija Consulta.
Consulta: escriba la siguiente consulta para elegir la hora máxima de la última modificación como nueva referencia:
select MAX(LastModifytime) as NewWatermarkvalue from data_source_table
Solo primera fila: seleccionado.
Paso 4: agregar la actividad de copia para copiar datos incrementales
En este paso, agregará una actividad de copia para copiar los datos incrementales entre la última y la nueva referencia del almacenamiento de datos al almacén de lago.
Seleccione Actividades en la barra superior y seleccione Copiar datos ->Agregar al lienzo para obtener la actividad de copia.
En la pestaña General, cambie el nombre de esta actividad a IncrementalCopyActivity.
Conecte ambas actividades de búsqueda con la actividad de copia. Para ello, arrastre el botón verde (en caso de éxito) de las actividades de búsqueda a la actividad de copia. Suelte el botón del mouse cuando vea el color del borde de la actividad de copia cambiar a verde.
En la pestaña Origen, realice la siguiente configuración:
Tipo de banco de datos : seleccione Espacio de trabajo.
Tipo de almacén de datos del área de trabajo: seleccione Almacenamiento de datos.
Almacenamiento de datos: seleccione su almacenamiento de datos.
Usar consulta: elija Consulta.
Consulta: escriba la siguiente consulta para copiar los datos incrementales entre la última y la nueva referencia.
select * from data_source_table where LastModifytime > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and LastModifytime <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'
En la pestaña Destino, realice la siguiente configuración:
- Tipo de banco de datos : seleccione Espacio de trabajo.
- Tipo de almacén de datos del área de trabajo: seleccione Almacén de lago.
- Almacén de lago: seleccione su almacén de lago.
- Carpeta raíz: elija Archivos.
- Ruta de acceso del archivo: especifique la carpeta en la que desee almacenar los datos copiados. Seleccione Examinar para seleccionar la carpeta. Para el nombre de archivo, abra Agregar contenido dinámico y escriba
@CONCAT('Incremental-', pipeline().RunId, '.txt')
en la ventana abierta para crear nombres de archivo para el archivo de datos copiado en el almacén de lago. - Formato de archivo: seleccione el tipo de formato de los datos.
Paso 5: añadir una actividad de procedimiento almacenado
En este paso, agregará una actividad de procedimiento almacenado para actualizar el último valor de referencia para la siguiente ejecución de canalización.
Seleccione Actividades en la barra superior y seleccione Procedimiento almacenado para agregar una actividad de procedimiento almacenado.
En la pestaña General, cambie el nombre de esta actividad por StoredProceduretoWriteWatermarkActivity.
Conecte la salida verde (en caso de éxito) de la actividad de copia con la actividad de procedimiento almacenado.
En la pestaña Configuración, realice la siguiente configuración:
Tipo de banco de datos : seleccione Espacio de trabajo.
Almacenamiento de datos: seleccione su almacenamiento de datos.
Nombre del procedimiento almacenado: especifique el procedimiento almacenado que creó en el almacenamiento de datos: [dbo].[usp_write_watermark].
Expanda Parámetros de procedimiento almacenado. Para especificar valores para los parámetros del procedimiento almacenado, seleccione Importar y escriba los valores siguientes para los parámetros:
Nombre Tipo Value LastModifiedtime DateTime @{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue} TableName String @{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}
Paso 6: ejecución de la canalización y supervisión del resultado
En la barra superior, seleccione Ejecutar, en la pestaña Inicio. A continuación, seleccione Guardar y ejecutar. La canalización iniciará la ejecución y podrá supervisar la canalización en la pestaña Salida.
Vaya al almacén de lago, donde encontrará que el archivo de datos está en la carpeta que especificó, y podrá seleccionar el archivo para obtener una vista previa de los datos copiados.
Agregar más datos para ver los resultados de la copia incremental
Después de finalizar la primera ejecución de canalización, intentaremos agregar más datos en la tabla de origen del almacenamiento de datos para ver si esta canalización puede los copiar datos incrementales.
Paso 1: incorporación de más datos al origen
Para insertar nuevos datos en el almacenamiento de datos, ejecute la consulta siguiente:
INSERT INTO data_source_table
VALUES (6, 'newdata','9/6/2017 2:23:00 AM')
INSERT INTO data_source_table
VALUES (7, 'newdata','9/7/2017 9:01:00 AM')
Los datos actualizados para data_source_table son:
PersonID | Name | LastModifytime
-------- | ---- | --------------
1 | aaaa | 2017-09-01 00:56:00.000
2 | bbbb | 2017-09-02 05:23:00.000
3 | cccc | 2017-09-03 02:36:00.000
4 | dddd | 2017-09-04 03:21:00.000
5 | eeee | 2017-09-05 08:06:00.000
6 | newdata | 2017-09-06 02:23:00.000
7 | newdata | 2017-09-07 09:01:00.000
Paso 2: desencadenar otra ejecución de canalización y supervisar el resultado
Volver a la página de la canalización. En la barra superior, seleccione Ejecutar de nuevo en la pestaña Inicio. La canalización iniciará la ejecución y podrá supervisar la canalización en Salida.
Vaya al almacén de lago, donde encontrará que el archivo de datos copiado nuevo está en la carpeta que especificó, y podrá seleccionar el archivo para obtener una vista previa de los datos copiados. Verá que los datos incrementales se muestran en este archivo.
Contenido relacionado
A continuación, avance para obtener más información sobre la copia de Azure Blob Storage al almacén de lago.