Conector de Spark para Microsoft Fabric Data Warehouse

El conector de Spark para Fabric Data Warehouse permite a los desarrolladores y científicos de datos de Spark acceder a datos y trabajar con datos desde un almacenamiento y el punto de conexión de análisis SQL de un almacén de lago. El conector ofrece las siguientes capacidades:

  • Se puede trabajar con datos de un almacén o de un punto de conexión de análisis SQL dentro del mismo espacio de trabajo o a través de varios espacios de trabajo.
  • El punto de conexión del análisis SQL de un Lakehouse se detecta automáticamente en función del contexto del espacio de trabajo.
  • El conector cuenta con una API de Spark simplificada, limita la complejidad subyacente y funciona con una sola línea de código.
  • Mientras accede a una tabla o una vista, el conector mantiene los modelos de seguridad definidos en el nivel del motor de SQL. Estos modelos incluyen seguridad de nivel de objeto (OLS), seguridad de nivel de fila (RLS) y seguridad de nivel de columna (CLS).
  • El conector viene preinstalado en tiempo de ejecución de Fabric, lo que elimina la necesidad de realizar una instalación independiente.

Autenticación

La autenticación de Microsoft Entra es un enfoque de autenticación integrada. Los usuarios pueden iniciar sesión en el área de trabajo de Microsoft Fabric y sus credenciales se pasan automáticamente al motor de SQL para la autenticación y autorización. Las credenciales se asignan automáticamente y no es necesario que los usuarios proporcionen opciones de configuración específicas.

Nota:

El conector de Spark para Fabric Data Warehouse solo admite la autenticación interactiva de usuarios de Microsoft Entra. No se admite la autenticación de entidad de servicio.

Permisos

Para conectarse al motor SQL, los usuarios necesitan tener, al menos, permiso de lectura (similar al permiso CONNECT en SQL Server) en el almacén de datos o en el punto final de análisis SQL, con permiso a nivel de elemento. Los usuarios también deben tener permisos pormenorizados de nivel de objeto para leer datos de tablas o de vistas específicas. Para obtener más información, consulte Seguridad para el almacenamiento de datos en Microsoft Fabric.

Plantillas de código y ejemplos

Utilice una firma de método

El siguiente comando muestra la firma del synapsesql método para la solicitud Read. El argumento de tres partes tableName es necesario para acceder a tablas o vistas desde un almacén y el punto de conexión de análisis SQL de un almacén de lago. Actualice el argumento con los siguientes nombres, en función de su escenario:

  • Parte 1: Nombre del almacén o del almacén de lago.
  • Parte 2: Nombre del esquema.
  • Parte 3: Nombre de la tabla o vista.
synapsesql(tableName:String="<Part 1.Part 2.Part 3>") => org.apache.spark.sql.DataFrame

Además de leer directamente desde una tabla o vista, este conector también le permite especificar una consulta personalizada o de acceso directo, que se pasa al motor de SQL y el resultado se devuelve a Spark.

spark.read.option(Constants.DatabaseName, "<warehouse/lakeshouse name>").synapsesql("<T-SQL Query>") => org.apache.spark.sql.DataFrame

Aunque este conector descubre automáticamente el punto de conexión para el almacén o repositorio de datos especificado, si desea indicarlo explícitamente, puede hacerlo.

//For warehouse
spark.conf.set("spark.datawarehouse.<warehouse name>.sqlendpoint", "<sql endpoint,port>")
//For lakehouse
spark.conf.set("spark.lakehouse.<lakeshouse name>.sqlendpoint", "<sql endpoint,port>")
//Read from table
spark.read.synapsesql("<warehouse/lakeshouse name>.<schema name>.<table or view name>") 

Lectura de datos dentro de la misma área de trabajo

Importante

Ejecute estas instrucciones de importación al principio del cuaderno o antes de empezar a usar el conector:

import com.microsoft.spark.fabric
from com.microsoft.spark.fabric.Constants import Constants  

El código siguiente es un ejemplo que sirve para leer datos de una tabla o vista en un dataframe de Spark:

df = spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>")

El código siguiente es un ejemplo que sirve para leer datos de una tabla o vista en un dataframe de Spark con un límite de recuento de filas de 10:

df = spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>").limit(10)

El código siguiente es un ejemplo que sirve para leer datos de una tabla o vista en un dataframe de Spark después de aplicar un filtro:

df = spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>").filter("column name == 'value'")

El código siguiente es un ejemplo que sirve para leer datos de una tabla o vista en un dataframe de Spark solo para las columnas seleccionadas:

df = spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>").select("column A", "Column B")

Leer datos entre áreas de trabajo

Para acceder y leer datos desde un almacén o almacén de lago entre áreas de trabajo, puede especificar el identificador de área de trabajo donde existe el almacén o el almacén de lago y, después, el identificador de elemento del almacén de lago o almacén. En la línea siguiente se proporciona un ejemplo de lectura de datos de una tabla o vista en un DataFrame de Spark desde el almacén o almacén de lago con el identificador de área de trabajo y el identificador de almacén de lago/almacén especificados:

# For lakehouse
df = spark.read.option(Constants.WorkspaceId, "<workspace id>").synapsesql("<lakehouse name>.<schema name>.<table or view name>")
df = spark.read.option(Constants.WorkspaceId, "<workspace id>").option(Constants.LakehouseId, "<lakehouse item id>").synapsesql("<lakehouse name>.<schema name>.<table or view name>")

# For warehouse
df = spark.read.option(Constants.WorkspaceId, "<workspace id>").synapsesql("<warehouse name>.<schema name>.<table or view name>")
df = spark.read.option(Constants.WorkspaceId, "<workspace id>").option(Constants.DatawarehouseId, "<warehouse item id>").synapsesql("<warehouse name>.<schema name>.<table or view name>")

Nota:

Al ejecutar el cuaderno, el conector busca de manera predeterminada el almacén o el almacén de lago especificados en el área de trabajo del almacén de lago adjunto al cuaderno. Para hacer referencia a un almacén o lago de datos desde otra área de trabajo, especifica el identificador del área de trabajo y el identificador del elemento del lago de datos o almacén, como se indica arriba.

Creación de una tabla de almacén de lago basada en datos de un almacenamiento

Estas líneas de código proporcionan un ejemplo para leer datos de una tabla o vista en un DataFrame de Spark y usarlos para crear una tabla de almacén de lago:

df = spark.read.synapsesql("<warehouse/lakehouse name>.<schema name>.<table or view name>")
df.write.format("delta").saveAsTable("<Lakehouse table name>")

Escribir un dataframe de Spark en una tabla del almacén de datos

Este conector emplea un proceso de escritura en dos fases en una tabla de Fabric DW. Inicialmente, coloca los datos de dataframe de Spark en un almacenamiento intermedio, seguido del uso del comando COPY INTO para ingerir los datos en la tabla Fabric DW. Este enfoque garantiza la escalabilidad con el aumento del volumen de datos.

Modos de guardado de DataFrame compatibles

Se admiten los siguientes modos de guardado al escribir datos de origen de un dataframe en una tabla de destino en el almacenamiento:

  • ErrorIfExists (modo de guardado predeterminado): Si existe una tabla de destino, la escritura se anula con una excepción devuelta al destinatario. En caso contrario, se crea una nueva tabla con datos.
  • Omitir: si la tabla de destino existe, la escritura omite la solicitud de escritura sin devolver un error. En caso contrario, se crea una nueva tabla con datos.
  • Sobrescribir: Si la tabla de destino existe, los datos existentes en el destino se reemplazan por los datos. En caso contrario, se crea una nueva tabla con datos.
  • Anexar: si la tabla de destino existe, los nuevos datos se anexan a ella. En caso contrario, se crea una nueva tabla con datos.

En el código siguiente se muestran ejemplos de escritura de los datos de la trama de datos de Spark en una tabla de Fabric DW:

df.write.synapsesql("<warehouse/lakehouse name>.<schema name>.<table name>") # this uses default mode - errorifexists

df.write.mode("errorifexists").synapsesql("<warehouse/lakehouse name>.<schema name>.<table name>")
df.write.mode("ignore").synapsesql("<warehouse/lakehouse name>.<schema name>.<table name>")
df.write.mode("append").synapsesql("<warehouse/lakehouse name>.<schema name>.<table name>")
df.write.mode("overwrite").synapsesql("<warehouse/lakehouse name>.<schema name>.<table name>")

Nota:

El conector solo admite la escritura en una tabla de Fabric DW, ya que el punto de conexión de análisis SQL de un almacén de lago es de solo lectura.

Lecturas en paralelo para mejorar el rendimiento

Este conector admite lecturas en paralelo para mejorar el rendimiento de las consultas al cargar tablas grandes. De forma similar a spark.read.jdbc, puede habilitar el paralelismo especificando una columna de partición y su intervalo de valores. A continuación, Spark dividirá la operación de lectura en varias particiones que se procesan simultáneamente.

En el código siguiente se muestra cómo habilitar lecturas paralelas mediante un cuaderno de Spark:

import com.microsoft.spark.fabric.tds.implicits.read.FabricSparkTDSImplicits._
import com.microsoft.spark.fabric.tds.implicits.write.FabricSparkTDSImplicits._
import com.microsoft.spark.fabric.Constants
import org.apache.spark.sql.SaveMode 

val df = spark.read.option("partitionColumn", <SomeColumn>)

        .option("lowerBound",  <ColumnValuesLowerLimit>)

        .option("upperBound", <ColumnValuesUpperLimit>)

        .option("numPartitions", <NumberOfPartitionsDesired>).synapsesql(<Table>)

Solución de problemas

Tras la finalización, el fragmento de código de respuesta de lectura aparece en la salida de la celda. El error en la celda actual también cancela las ejecuciones posteriores de celdas del cuaderno. La información detallada del error está disponible en los registros de la aplicación Spark.

Consideraciones para usar este conector

Actualmente, el conector:

  • Admite la recuperación o lectura de datos de almacenes de Fabric y puntos de conexión de análisis de SQL de elementos de almacén de lago.
  • Admite la escritura de datos en una tabla de almacenamiento mediante diferentes modos de guardado: solo está disponible con el entorno de ejecución de disponibilidad general más reciente, es decir, Runtime 1.3.
  • Con Private Link la operación de escritura habilitada en el nivel de inquilino no se admite y con Private Link habilitado en el nivel de área de trabajo, no se admiten las operaciones de lectura y escritura.
  • Fabric DW ahora admite Time Travel, sin embargo, este conector no funciona para una consulta con sintaxis de viaje en el tiempo.
  • Conserva la firma de uso como la que se incluye con Apache Spark para Azure Synapse Analytics para mantener la coherencia. Sin embargo, no es retrocompatible para conectarse y trabajar con un grupo de SQL dedicado en Azure Synapse Analytics.
  • Los nombres de columna con caracteres especiales se controlarán agregando caracteres de escape antes de la consulta, según el nombre de la tabla o vista de 3 partes, se envía. En una lectura personalizada o basada en consultas de paso a través, los usuarios deben escapar los nombres de columna que contengan caracteres especiales.