Compartir a través de


Caso de uso descendente de RAG

Importante

El conector de Microsoft SharePoint está en Beta.

Ahora que ha creado el flujo de trabajo de SharePoint, puede analizar los documentos sin procesar para convertirlos en texto, fragmentar los datos analizados, crear representaciones a partir de los fragmentos y más. Después, puede usar readStream en la tabla de salida directamente en el flujo de trabajo posterior.

Para acceder a los datos de archivo, proporcionamos las siguientes UDF de acceso a archivos. Puede ejecutar estas UDFs en la tabla de salida desde la canalización de ingesta.

Nombre Descripción
read_blob_as_file(columna de blob, columna de nombre de archivo) Descarga el archivo en el disco local y devuelve la ruta de acceso del archivo.
read_blob_as_bytes(columna de bloques) Descarga el archivo en el disco local y devuelve los datos como una matriz de bytes.

Configurar UDFs de acceso a archivos

Agregue la siguiente celda a la canalización de bajada:

# DO NOT MODIFY this cell.

from pyspark.sql.functions import udf, struct
from pyspark.sql.types import BinaryType

# Copy to local disk and get file path.

def copy_to_disk(blob, filename) -> str:
  fname = "/local_disk0/tmp/" + filename
  with open(fname, "wb") as f:
    f.write(blob.inline_content)
  return fname

read_blob_as_file = udf(copy_to_disk)

# Get bytes directly.

def get_bytes(blob) -> bytes:
  return blob.inline_content

read_blob_as_bytes = udf(get_bytes, BinaryType())

Ejemplos

Para devolver la ruta de acceso del archivo:

# Suppose you have a simple UDF that converts a file's raw bytes to a UTF-8 string.

def file_bytes_to_text(fname):
  with open(fname, "rb") as f:
    return f.read().decode("utf-8")
file_bytes_to_text_udf = udf(file_bytes_to_text)

# Chain your UDF with the file access UDF for the file path.

df.withColumn("text_content",
file_bytes_to_text_udf(read_blob_as_file("content",
"file_metadata.name"))).collect()

Para devolver los datos como una matriz de bytes:

# Suppose you have a simple UDF that converts a file's raw bytes to a UTF-8 string.

def bytes_to_text(bytes_data):
  return bytes_data.decode("utf-8")
bytes_to_text_udf = udf(bytes_to_text)

# Chain your UDF with the file access UDF for the byte array.

df.withColumn("text_content",
bytes_to_text_udf(read_blob_as_bytes("content"))).collect()

Nota:

Las UDF de acceso a archivos no pueden controlar el contenido de los archivos de más de 100 MB. Debe filtrar estas filas antes de usar las UDF de acceso a archivos.

Dado que la ruta de acceso de archivo UDF escribe en el disco local, solo funciona en clústeres de usuario único. Si desea ejecutar la canalización de bajada en clústeres clásicos o en computación sin servidor, puede actualizar la UDF para escribir en un volumen del Catálogo Unity en lugar de en el disco local. Sin embargo, esto ralentizará el rendimiento.

Para escribir en un volumen:

# Update the volume_path in the function below.
from pyspark.sql.functions import udf, struct


# copy to volume_path and get file path
def copy_to_disk(blob, filename) -> str:
  # UPDATE THIS VALUE
  volume_path = "/Volumes/<my_catalog>/<my schema>/<my volume name>/"


  fname = volume_path + filename
  with open(fname, "wb") as f:
    f.write(blob.inline_content)
  return fname


read_blob_as_file = udf(copy_to_disk)