Compartir vía


Carga incremental de datos de Data Warehouse a almacenes de lago

En este tutorial, aprenderá a cargar datos de forma incremental desde Data Warehouse a almacenes de lago.

Información general

Este es el diagrama de solución de alto nivel:

Diagram showing incrementally load data logic.

Estos son los pasos importantes para crear esta solución:

  1. Seleccione la columna de marca de agua. Seleccione una columna en la tabla de datos de origen que pueda usarse para segmentar registros nuevos o actualizados para cada ejecución. Normalmente, los datos de esta columna seleccionada (por ejemplo, last_modify_time o id.) siguen aumentando cuando se crean o se actualizan las filas. El valor máximo de esta columna se utiliza como una marca de agua.

  2. Prepare una tabla para almacenar el último valor de referencia en el almacenamiento de datos.

  3. Cree una canalización con el siguiente flujo de trabajo:

    La canalización de esta solución consta de las siguientes actividades:

    • Cree dos actividades de búsqueda. Use la primera actividad de búsqueda para recuperar el último valor de marca de agua y, la segunda actividad, para recuperar el nuevo valor de marca de agua. Estos valores de marca de agua se pasan a la actividad de copia.
    • Cree una actividad de copia que copie filas de la tabla de datos de origen con el valor de la columna de referencia que sea mayor que el valor anterior y menor que el nuevo. A continuación, copiará los datos de almacenamiento de datos al almacén de lago como un nuevo archivo.
    • Cree una actividad de procedimiento almacenado que actualice el valor de la última referencia de la siguiente ejecución de canalización.

Requisitos previos

  • Almacenamiento de datos. El Almacenamiento de datos se usa como almacén de datos de origen. Si no lo tuviera, consulte Crear un almacén de datos para conocer los pasos para crear uno.
  • Almacén de lago. Use el almacén de lago como almacén de datos de destino. Si no lo tuviera, consulte Creación de un almacén de lago para conocer los pasos para crear uno. Cree una carpeta denominada IncrementalCopy para almacenar los datos copiados.

Preparando el origen

Estas son algunas tablas y procedimientos almacenados que debe preparar en el almacenamiento de datos de origen antes de configurar la canalización de copia incremental.

1. Cree una tabla de origen de datos en el almacenamiento de datos

Ejecute el siguiente comando SQL en su almacén de datos para crear una tabla denominada data_source_table como tabla de origen de datos. En este tutorial, la usará como datos de ejemplo para realizar la copia incremental.

create table data_source_table
(
    PersonID int,
    Name varchar(255),
    LastModifytime DATETIME2(6)
);

INSERT INTO data_source_table
    (PersonID, Name, LastModifytime)
VALUES
    (1, 'aaaa','9/1/2017 12:56:00 AM'),
    (2, 'bbbb','9/2/2017 5:23:00 AM'),
    (3, 'cccc','9/3/2017 2:36:00 AM'),
    (4, 'dddd','9/4/2017 3:21:00 AM'),
    (5, 'eeee','9/5/2017 8:06:00 AM');

Los datos en la tabla de origen de datos se muestran a continuación:

PersonID | Name | LastModifytime
-------- | ---- | --------------
1        | aaaa | 2017-09-01 00:56:00.000
2        | bbbb | 2017-09-02 05:23:00.000
3        | cccc | 2017-09-03 02:36:00.000
4        | dddd | 2017-09-04 03:21:00.000
5        | eeee | 2017-09-05 08:06:00.000

En este tutorial, se usará LastModifytime como columna de referencia.

2. Cree otra tabla en el almacenamiento de datos para almacenar el último valor de referencia

  1. Ejecute el siguiente comando SQL en el almacenamiento de datos para crear una tabla denominada watermarktable para almacenar el último valor de referencia:

    create table watermarktable
    (
    TableName varchar(255),
    WatermarkValue DATETIME2(6),
    );
    
  2. Establezca el valor predeterminado de la última referencia con el nombre de la tabla de la tabla de datos de origen. En este tutorial, el nombre de la tabla es data_source_table y el valor predeterminado es 1/1/2010 12:00:00 AM.

    INSERT INTO watermarktable
    VALUES ('data_source_table','1/1/2010 12:00:00 AM')    
    
  3. Revise los datos de la tabla watermarktable.

    Select * from watermarktable
    

    Salida:

    TableName  | WatermarkValue
    ----------  | --------------
    data_source_table | 2010-01-01 00:00:00.000
    

3. Cree un procedimiento almacenado en el almacenamiento de datos

Ejecute el siguiente comando para crear un procedimiento almacenado en el almacenamiento de datos. Este procedimiento almacenado se usa para ayudar a actualizar el último valor de referencia después de la última ejecución de canalización.

CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS

BEGIN

UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName

END

Configuración de una canalización para copia incremental

Paso 1: crear una canalización

  1. Vaya a Power BI.

  2. Seleccione el icono de Power BI en la parte inferior izquierda de la pantalla y, después, seleccione Data Factory para abrir la página principal de Data Factory.

    Screenshot with the data factory experience emphasized.

  3. Vaya al área de trabajo de Microsoft Fabric.

  4. Seleccione Canalización de datos y, a continuación, escriba un nombre de canalización para crear una nueva canalización.

    Screenshot showing the new data pipeline button in the newly created workspace.

    Screenshot showing the name of creating a new pipeline.

Paso 2: agregar una actividad de búsqueda para la última referencia

En este paso, creará una actividad de búsqueda para obtener el último valor de referencia. Valor predeterminado 1/1/2010 12:00:00 AM establecido antes de su obtención.

  1. Seleccione Agregar actividad de canalización y seleccione Búsqueda en la lista desplegable.

  2. En la pestaña General, cambie el nombre de esta actividad a LookupOldWaterMarkActivity.

  3. En la pestaña Configuración, realice la siguiente configuración:

    • Tipo de banco de datos : seleccione Espacio de trabajo.
    • Tipo de almacén de datos del área de trabajo: seleccione Almacenamiento de datos.
    • Almacenamiento de datos: seleccione su almacenamiento de datos.
    • Usar consulta: elija Tabla.
    • Tabla: elija dbo.watermarktable.
    • Solo primera fila: seleccionado.

    Screenshot showing lookup old watermark.

Paso 3: agregar una actividad de búsqueda para la referencia nueva

En este paso, creará una actividad de búsqueda para obtener el valor de referencia nuevo. Usará una consulta para obtener la nueva referencia de la tabla de datos de origen. Se obtendrá el valor máximo de la columna LastModifytime en data_source_table.

  1. En la barra superior, seleccione Búsqueda en la pestaña Actividades para agregar la segunda actividad de búsqueda.

  2. En la pestaña General, cambie el nombre de esta actividad a LookupNewWaterMarkActivity.

  3. En la pestaña Configuración, realice la siguiente configuración:

    • Tipo de banco de datos : seleccione Espacio de trabajo.

    • Tipo de almacén de datos del área de trabajo: seleccione Almacenamiento de datos.

    • Almacenamiento de datos: seleccione su almacenamiento de datos.

    • Usar consulta: elija Consulta.

    • Consulta: escriba la siguiente consulta para elegir la hora máxima de la última modificación como nueva referencia:

      select MAX(LastModifytime) as NewWatermarkvalue from data_source_table
      
    • Solo primera fila: seleccionado.

    Screenshot showing lookup new watermark.

Paso 4: agregar la actividad de copia para copiar datos incrementales

En este paso, agregará una actividad de copia para copiar los datos incrementales entre la última y la nueva referencia del almacenamiento de datos al almacén de lago.

  1. Seleccione Actividades en la barra superior y seleccione Copiar datos ->Agregar al lienzo para obtener la actividad de copia.

  2. En la pestaña General, cambie el nombre de esta actividad a IncrementalCopyActivity.

  3. Conecte ambas actividades de búsqueda con la actividad de copia. Para ello, arrastre el botón verde (en caso de éxito) de las actividades de búsqueda a la actividad de copia. Suelte el botón del mouse cuando vea el color del borde de la actividad de copia cambiar a verde.

    Screenshot showing connecting lookup and copy activities.

  4. En la pestaña Origen, realice la siguiente configuración:

    • Tipo de banco de datos : seleccione Espacio de trabajo.

    • Tipo de almacén de datos del área de trabajo: seleccione Almacenamiento de datos.

    • Almacenamiento de datos: seleccione su almacenamiento de datos.

    • Usar consulta: elija Consulta.

    • Consulta: escriba la siguiente consulta para copiar los datos incrementales entre la última y la nueva referencia.

      select * from data_source_table where LastModifytime > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and LastModifytime <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'
      

    Screenshot showing copy source configuration.

  5. En la pestaña Destino, realice la siguiente configuración:

    • Tipo de banco de datos : seleccione Espacio de trabajo.
    • Tipo de almacén de datos del área de trabajo: seleccione Almacén de lago.
    • Almacén de lago: seleccione su almacén de lago.
    • Carpeta raíz: elija Archivos.
    • Ruta de acceso del archivo: especifique la carpeta en la que desee almacenar los datos copiados. Seleccione Examinar para seleccionar la carpeta. Para el nombre de archivo, abra Agregar contenido dinámico y escriba @CONCAT('Incremental-', pipeline().RunId, '.txt') en la ventana abierta para crear nombres de archivo para el archivo de datos copiado en el almacén de lago.
    • Formato de archivo: seleccione el tipo de formato de los datos.

    Screenshot showing copy destination configuration.

Paso 5: añadir una actividad de procedimiento almacenado

En este paso, agregará una actividad de procedimiento almacenado para actualizar el último valor de referencia para la siguiente ejecución de canalización.

  1. Seleccione Actividades en la barra superior y seleccione Procedimiento almacenado para agregar una actividad de procedimiento almacenado.

  2. En la pestaña General, cambie el nombre de esta actividad por StoredProceduretoWriteWatermarkActivity.

  3. Conecte la salida verde (en caso de éxito) de la actividad de copia con la actividad de procedimiento almacenado.

  4. En la pestaña Configuración, realice la siguiente configuración:

    • Tipo de banco de datos : seleccione Espacio de trabajo.

    • Almacenamiento de datos: seleccione su almacenamiento de datos.

    • Nombre del procedimiento almacenado: especifique el procedimiento almacenado que creó en el almacenamiento de datos: [dbo].[usp_write_watermark].

    • Expanda Parámetros de procedimiento almacenado. Para especificar valores para los parámetros del procedimiento almacenado, seleccione Importar y escriba los valores siguientes para los parámetros:

      Nombre Tipo Value
      LastModifiedtime DateTime @{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}
      TableName String @{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}

    Screenshot showing stored procedure activity configuration.

Paso 6: ejecución de la canalización y supervisión del resultado

En la barra superior, seleccione Ejecutar, en la pestaña Inicio. A continuación, seleccione Guardar y ejecutar. La canalización iniciará la ejecución y podrá supervisar la canalización en la pestaña Salida.

Screenshot showing pipeline run results.

Vaya al almacén de lago, donde encontrará que el archivo de datos está en la carpeta que especificó, y podrá seleccionar el archivo para obtener una vista previa de los datos copiados.

Screenshot showing lakehouse data for the first pipeline run.

Screenshot showing lakehouse data preview for the first pipeline run.

Agregar más datos para ver los resultados de la copia incremental

Después de finalizar la primera ejecución de canalización, intentaremos agregar más datos en la tabla de origen del almacenamiento de datos para ver si esta canalización puede los copiar datos incrementales.

Paso 1: incorporación de más datos al origen

Para insertar nuevos datos en el almacenamiento de datos, ejecute la consulta siguiente:

INSERT INTO data_source_table
VALUES (6, 'newdata','9/6/2017 2:23:00 AM')

INSERT INTO data_source_table
VALUES (7, 'newdata','9/7/2017 9:01:00 AM')

Los datos actualizados para data_source_table son:

PersonID | Name | LastModifytime
-------- | ---- | --------------
1 | aaaa | 2017-09-01 00:56:00.000
2 | bbbb | 2017-09-02 05:23:00.000
3 | cccc | 2017-09-03 02:36:00.000
4 | dddd | 2017-09-04 03:21:00.000
5 | eeee | 2017-09-05 08:06:00.000
6 | newdata | 2017-09-06 02:23:00.000
7 | newdata | 2017-09-07 09:01:00.000

Paso 2: desencadenar otra ejecución de canalización y supervisar el resultado

Volver a la página de la canalización. En la barra superior, seleccione Ejecutar de nuevo en la pestaña Inicio. La canalización iniciará la ejecución y podrá supervisar la canalización en Salida.

Vaya al almacén de lago, donde encontrará que el archivo de datos copiado nuevo está en la carpeta que especificó, y podrá seleccionar el archivo para obtener una vista previa de los datos copiados. Verá que los datos incrementales se muestran en este archivo.

Screenshot showing lakehouse data for the second pipeline run.

Screenshot showing lakehouse data preview for the second pipeline run.

A continuación, avance para obtener más información sobre la copia de Azure Blob Storage al almacén de lago.