Compartir a través de


Patrón para amasar datos incrementalmente con Dataflow Gen2

Importante

Se trata de un patrón para amasar datos incrementalmente con Dataflow Gen2. Esto no es lo mismo que la actualización incremental. La actualización incremental es una característica que está actualmente en desarrollo. Esta característica es una de las ideas más votadas en nuestro sitio web de ideas. Puede votar por esta característica en el sitio de Fabric Ideas.

Este tutorial dura 15 minutos y describe cómo amasar datos de forma incremental en una instancia de LakeHouse mediante Dataflow Gen2.

El amasado incremental de datos en un destino de datos requiere una técnica para cargar solo datos nuevos o actualizados en el destino de datos. Esta técnica se puede realizar mediante una consulta para filtrar los datos en función del destino de datos. En este tutorial se muestra cómo crear un flujo de datos para cargar datos de un origen de OData en una instancia de LakeHouse y cómo agregar una consulta al flujo de datos para filtrar los datos en función del destino de datos.

Los pasos generales de este tutorial son los siguientes:

  • Cree un flujo de datos para cargar datos de un origen de OData en una instancia de LakeHouse.
  • Agregue una consulta al flujo de datos para filtrar los datos en función del destino de datos.
  • (Opcional) volver a cargar datos mediante cuadernos y canalizaciones.

Requisitos previos

Debe tener un área de trabajo habilitada para Microsoft Fabric. Si aún no tiene una, consulte el artículo Crear un área de trabajo. Además, en el tutorial se asume que utilizas la vista de diagrama en Dataflow Gen2. Para comprobar si usa la vista de diagrama, en la cinta superior vaya a Ver y asegúrese de que la vista de Diagrama está seleccionada.

Creación de un flujo de datos para cargar datos de un origen de OData en una instancia de LakeHouse

En esta sección, creará un flujo de datos para cargar datos de un origen de OData en un lago.

  1. Cree una nueva instancia de LakeHouse en el área de trabajo.

    Captura de pantalla que muestra el cuadro de diálogo Crear nueva tabla.

  2. Cree un nuevo flujo de datos Gen2 en el área de trabajo.

    Captura de pantalla que muestra la lista desplegable Crear flujo de datos.

  3. Agregue un nuevo origen al flujo de datos. Seleccione la fuente OData e introduzca la siguiente URL: https://services.OData.org/V4/Northwind/Northwind.svc

    Captura de pantalla que muestra la lista desplegable Obtener datos.

    Captura de pantalla del conector OData.

    Captura de pantalla de la configuración de OData.

  4. Seleccione la tabla Pedidos y seleccione Next.

    Captura de pantalla del cuadro de diálogo de selección de pedidos.

  5. Seleccione las columnas siguientes para mantener:

    • OrderID
    • CustomerID
    • EmployeeID
    • OrderDate
    • RequiredDate
    • ShippedDate
    • ShipVia
    • Freight
    • ShipName
    • ShipAddress
    • ShipCity
    • ShipRegion
    • ShipPostalCode
    • ShipCountry

    Captura de pantalla de la función de selección de columnas.

    Captura de pantalla de la tabla de pedidos de columnas.

  6. Cambie el tipo de datos de OrderDate, RequiredDate y ShippedDate a datetime.

    Captura de pantalla de la función de cambio de tipo de datos.

  7. Configure el destino de datos en lakehouse con la siguiente configuración:

    • destino de los datos: Lakehouse
    • Lakehouse: seleccione el lago que creó en el paso 1.
    • Nombre de la nueva tabla: Orders
    • Método de actualización: Replace

    Captura de pantalla que muestra la cinta de lakehouse de destino de datos.

    Captura de pantalla que muestra la tabla de pedidos de Lakehouse de destino de los datos.

    Captura de pantalla que muestra el reemplazo de la configuración de Lakehouse de destino de datos.

  8. Seleccione Siguiente y publique el flujo de datos.

    Captura de pantalla que muestra el diálogo de publicación de flujo de datos.

Ahora ha creado un flujo de datos para cargar datos de un origen de OData en una instancia de LakeHouse. Este flujo de datos se usa en la sección siguiente para agregar una consulta al flujo de datos para filtrar los datos en función del destino de los datos. Después, puede usar el flujo de datos para volver a cargar datos mediante cuadernos y canalizaciones.

Agregar una consulta al flujo de datos para filtrar los datos en función del destino de datos

En esta sección se agrega una consulta al flujo de datos para filtrar los datos en función de los datos del lago de destino. La consulta obtiene el máximo OrderID en el almacén al principio de la actualización del flujo de datos y utiliza el OrderId máximo para obtener únicamente los pedidos con un OrderId más alto del origen para añadirlos a su destino de datos. Se supone que los pedidos se agregan al origen en orden ascendente de OrderID. Si no es así, puede usar una columna diferente para filtrar los datos. Por ejemplo, puede usar la OrderDate columna para filtrar los datos.

Nota:

Los filtros de OData se aplican dentro de Fabric después de recibir los datos del origen de datos; sin embargo, para orígenes de base de datos como SQL Server, el filtro se aplica en la consulta enviada al origen de datos back-end y solo se devuelven filas filtradas al servicio.

  1. Una vez que se actualice el flujo de datos, vuelva a abrir el flujo de datos que creó en la sección anterior.

    Captura de pantalla que muestra el cuadro de diálogo de flujo de datos abierto.

  2. Cree una nueva consulta llamada IncrementalOrderID y obtenga los datos de la tabla Pedidos en el lago que creó en la sección anterior.

    Captura de pantalla que muestra la lista desplegable Obtener datos.

    Captura de pantalla en la que se muestra la tarjeta almacén de lago de datos.

    Captura de pantalla de la tabla de pedidos de Lakehouse.

    Captura de pantalla de la función de consulta de cambio de nombre.

    Captura de pantalla de la consulta renombrada.

  3. Deshabilite el almacenamiento provisional de esta consulta.

    Captura de pantalla que muestra la función de desactivación de la puesta en escena.

  4. En la vista previa de datos, haga clic con el botón derecho en la OrderID columna y seleccione Explorar en profundidad.

    Captura de pantalla que muestra la función de exploración en profundidad.

  5. En la cinta de opciones, seleccione Herramientas de lista ->Estadísticas ->Máximo.

    Captura de pantalla que muestra la función estadística orderid máximo.

Ahora tiene una consulta que devuelve el valor máximo de OrderID en lakehouse. Esta consulta se usa para filtrar los datos del origen de OData. En la sección siguiente se agrega una consulta al flujo de datos para filtrar los datos del origen de OData según el valor máximo de OrderID en el lago.

  1. Volver a la consulta Pedidos y agregue un nuevo paso para filtrar los datos. Use la configuración siguiente:

    • Columna: OrderID
    • Operación de Greater than:
    • Valor: parámetro IncrementalOrderID

    Captura de pantalla que muestra la función de filtro orderid greater than.

    Captura de pantalla con la configuración del filtro.

  2. Permite combinar los datos del origen de OData y lakehouse mediante la confirmación del cuadro de diálogo siguiente:

    Captura de pantalla que muestra el cuadro de diálogo para permitir la combinación de datos.

  3. Actualice el destino de datos para usar la siguiente configuración:

    • Método de actualización: Append

    Captura de pantalla de la función de edición de los ajustes de salida.

    Captura de pantalla de la tabla de pedidos existente.

    Captura de pantalla que muestra los datos de destino lakehouse settings append.

  4. Publicar el flujo de datos.

    Captura de pantalla que muestra el diálogo de publicación de flujo de datos.

El flujo de datos ahora contiene una consulta que filtra los datos del origen de OData según el valor máximo de OrderID en la instancia de LakeHouse. Esto significa que solo se cargan datos nuevos o actualizados en el lago. En la sección siguiente se usa el flujo de datos para volver a cargar los datos mediante cuadernos y canalizaciones.

(Opcional) volver a cargar datos mediante cuadernos y canalizaciones

Opcionalmente, puede volver a cargar datos específicos mediante cuadernos y canalizaciones. Con el código de Python personalizado en el cuaderno, quite los datos antiguos de lakehouse. Después, cree una canalización en la que ejecute primero el cuaderno y ejecute secuencialmente el flujo de datos, vuelva a cargar los datos del origen de OData en el lago. Los cuadernos admiten varios lenguajes, pero en este tutorial se usa PySpark. Pyspark es una API de Python para Spark y se usa en este tutorial para ejecutar consultas de Spark SQL.

  1. Creación de un nuevo cuaderno en el área de trabajo.

    Captura de pantalla que muestra el cuadro de diálogo del nuevo cuaderno.

  2. Añade el siguiente código PySpark a tu cuaderno:

    ### Variables
    LakehouseName = "YOURLAKEHOUSE"
    TableName = "Orders"
    ColName = "OrderID"
    NumberOfOrdersToRemove = "10"
    
    
    ### Remove Old Orders
    Reload = spark.sql("SELECT Max({0})-{1} as ReLoadValue FROM {2}.{3}".format(ColName,NumberOfOrdersToRemove,LakehouseName,TableName)).collect()
    Reload = Reload[0].ReLoadValue
    spark.sql("Delete from {0}.{1} where {2} > {3}".format(LakehouseName, TableName, ColName, Reload))
    
  3. Ejecute el cuaderno para comprobar que los datos se quitan de lakehouse.

  4. Cree una canalización en el área de trabajo.

    Captura de pantalla del nuevo cuadro de diálogo.

  5. Agregue una nueva actividad de cuaderno a la canalización y seleccione el cuaderno que creó en el paso anterior.

    Captura de pantalla que muestra el cuadro de diálogo de añadir actividad de cuaderno.

    Captura de pantalla que muestra el diálogo de selección de cuaderno.

  6. Agregue una nueva actividad de flujo de datos a la canalización y seleccione el flujo de datos que creó en la sección anterior.

    Captura de pantalla que muestra el cuadro de diálogo de añadir actividad de flujo de datos.

    Captura de pantalla del cuadro de diálogo de selección de flujo de datos.

  7. Vincule la actividad del cuaderno a la actividad de flujo de datos con un desencadenador correcto.

    Captura de pantalla que muestra el diálogo de conexión de actividades.

  8. Guarde y ejecute la canalización.

    Captura de pantalla del cuadro de diálogo de ejecución.

Ahora tiene una canalización que quita los datos antiguos de lakehouse y vuelve a cargar los datos del origen de OData en lakehouse. Con esta configuración, puede volver a cargar los datos desde el origen de OData al almacén de lago de datos con regularidad.