Nota
O acesso a esta página requer autorização. Podes tentar iniciar sessão ou mudar de diretório.
O acesso a esta página requer autorização. Podes tentar mudar de diretório.
Importante
O conector SharePoint gerido está em Beta. Os administradores do espaço de trabalho podem controlar o acesso a esse recurso na página Visualizações . Consulte Gerenciar visualizações do Azure Databricks.
Agora que você criou seu pipeline do SharePoint, pode analisar os documentos brutos em texto, fragmentar os dados analisados, criar incorporações a partir dos blocos e muito mais. Em seguida, você pode usar readStream na tabela de saída diretamente em seu pipeline downstream.
Analisar documentos não estruturados
Muitas tarefas RAG e de compreensão de documentos subsequentes exigem a conversão de ficheiros não estruturados (como PDFs, PPTX, documentos Word e imagens) em representações estruturadas e interativas. O Databricks fornece ai_parse_document, uma função incorporada que extrai automaticamente texto, tabelas, informação de layout, metadados e outros sinais estruturados do conteúdo binário dos ficheiros.
Pode aplicar ai_parse_document diretamente na coluna inline_content produzida pelo pipeline de ingestão do SharePoint. Esta é a abordagem recomendada para a maioria dos casos de uso não estruturados a jusante, incluindo geração aumentada por recuperação (RAG), classificação, extração de entidades e construção de agentes centrados em documentos.
Para mais informações, consulte ai_parse_document.
Exemplo: Transformar ficheiros SharePoint
Pode transformar incrementalmente os seus ficheiros SharePoint ingeridos em saídas estruturadas e analisadas usando Lakeflow Spark Declarative Pipelines (por exemplo, vistas materializadas ou tabelas de streaming). O exemplo seguinte mostra como criar uma vista materializada que analisa cada documento recém-chegado:
CREATE OR REFRESH MATERIALIZED VIEW documents_parsed
AS
SELECT
*,
ai_parse_document(content.inline_content) AS parsed
FROM <your_catalog>.<your_schema>.<your_destination_table>;
Esta vista mantém as representações dos seus documentos analisados atualizadas à medida que novos ficheiros chegam através do pipeline de ingestão do SharePoint. A parsed coluna pode então ser usada para os seus casos de uso posteriores.
Aceder ao conteúdo individual dos ficheiros
Se preferir trabalhar diretamente com ficheiros, por exemplo, ao integrar com bibliotecas ou ferramentas personalizadas, o Databricks fornece UDFs adicionais de acesso a ficheiros que pode executar na tabela de saída a partir do pipeline de ingestão.
| Nome | Descrição |
|---|---|
read_blob_as_file(coluna blob, coluna nome do arquivo) |
Baixa o arquivo para o disco local e retorna o caminho do arquivo. |
read_blob_as_bytes(coluna blob) |
Baixa o arquivo para o disco local e retorna os dados como uma matriz de bytes. |
Configurar UDFs de acesso a ficheiros
Para configurar UDFs de acesso a ficheiros, adicione a seguinte célula ao seu pipeline a jusante:
# DO NOT MODIFY this cell.
from pyspark.sql.functions import udf, struct
from pyspark.sql.types import BinaryType
# Copy to local disk and get file path.
def copy_to_disk(blob, filename) -> str:
fname = "/local_disk0/tmp/" + filename
with open(fname, "wb") as f:
f.write(blob.inline_content)
return fname
read_blob_as_file = udf(copy_to_disk)
# Get bytes directly.
def get_bytes(blob) -> bytes:
return blob.inline_content
read_blob_as_bytes = udf(get_bytes, BinaryType())
Exemplos de acesso a ficheiros
Para retornar o caminho do arquivo:
# Suppose you have a simple UDF that converts a file's raw bytes to a UTF-8 string.
def file_bytes_to_text(fname):
with open(fname, "rb") as f:
return f.read().decode("utf-8")
file_bytes_to_text_udf = udf(file_bytes_to_text)
# Chain your UDF with the file access UDF for the file path.
df.withColumn("text_content",
file_bytes_to_text_udf(read_blob_as_file("content",
"file_metadata.name"))).collect()
Para retornar os dados como uma matriz de bytes:
# Suppose you have a simple UDF that converts a file's raw bytes to a UTF-8 string.
def bytes_to_text(bytes_data):
return bytes_data.decode("utf-8")
bytes_to_text_udf = udf(bytes_to_text)
# Chain your UDF with the file access UDF for the byte array.
df.withColumn("text_content",
bytes_to_text_udf(read_blob_as_bytes("content"))).collect()
Observação
As UDFs de acesso a arquivos não podem manipular o conteúdo de arquivos maiores que 100 MB. Você deverá filtrar essas linhas antes de usar as funções UDFs de acesso ao arquivo.
Como o caminho do arquivo UDF grava no disco local, ele só funciona em clusters de usuário único. Se quiser executar o pipeline downstream em clusters clássicos ou computação sem servidor, você pode atualizar o UDF para gravar em um volume do Catálogo Unity em vez de no disco local. No entanto, isso diminuirá o desempenho.
Para gravar num volume:
# Update the volume_path in the function below.
from pyspark.sql.functions import udf, struct
# copy to volume_path and get file path
def copy_to_disk(blob, filename) -> str:
# UPDATE THIS VALUE
volume_path = "/Volumes/<my_catalog>/<my schema>/<my volume name>/"
fname = volume_path + filename
with open(fname, "wb") as f:
f.write(blob.inline_content)
return fname
read_blob_as_file = udf(copy_to_disk)