共用方式為


下游RAG應用案例

這很重要

受管理的 SharePoint 連接器目前還在測試階段。 工作區管理員可以從 「預覽 」頁面控制對此功能的存取。 請參閱 管理 Azure Databricks 預覽。

現在您已建立 SharePoint 管線,您可以將原始檔剖析為文字、將剖析的數據區塊化、從區塊建立內嵌等等。 然後 readStream ,您可以直接在下游管線的輸出資料表上使用 。

解析非結構化文件

許多下游的 RAG 與文件理解工作負載需要將原始非結構化檔案(如 PDF、PPTX、Word 文件和圖片)轉換成結構化且可查詢的表示。 Databricks 提供 ai_parse_document內建函式,能自動從二進位檔案內容中擷取文字、表格、版面資訊、元資料及其他結構化訊號。

你可以直接套用 ai_parse_documentinline_content SharePoint 匯入管線產生的欄位。 這是大多數非結構化下游使用情境的推薦方法,包括檢索增強生成(RAG)、分類、實體擷取及建置以文件為中心的代理。

欲了解更多資訊,請參見 ai_parse_document

範例:轉換 SharePoint 檔案

你可以利用 Lakeflow Spark 宣告式管線(例如實體化視圖或串流資料表)逐步將擷取的 SharePoint 檔案轉換成解析後的結構化輸出。 以下範例展示了如何建立一個具體化的視圖,解析每份新到的文件:

CREATE OR REFRESH MATERIALIZED VIEW documents_parsed
AS
SELECT
  *,
  ai_parse_document(content.inline_content) AS parsed
FROM <your_catalog>.<your_schema>.<your_destination_table>;

此檢視能在新的檔案透過 SharePoint 匯入管道抵達時,不斷更新解析後的文件表示。 parsed該欄位然後可以用於您的後端應用實例。

存取個別檔案內容

如果你偏好直接處理檔案,例如整合自定義函式庫或工具時,Databricks 提供其他檔案存取 UDF,你可以在資料攝取流程的輸出資料表上執行。

名稱 說明
read_blob_as_file(Blob 欄、檔案名欄) 將檔案下載到本機磁碟,並傳回檔案路徑。
read_blob_as_bytesBlob 欄位 將檔案下載到本機磁碟,並以位元組陣列的形式傳回數據。

設定檔案存取 使用者定義函數 (UDF)

要設定檔案存取 UDF,請在下游管線中新增以下儲存格:

# DO NOT MODIFY this cell.

from pyspark.sql.functions import udf, struct
from pyspark.sql.types import BinaryType

# Copy to local disk and get file path.

def copy_to_disk(blob, filename) -> str:
  fname = "/local_disk0/tmp/" + filename
  with open(fname, "wb") as f:
    f.write(blob.inline_content)
  return fname

read_blob_as_file = udf(copy_to_disk)

# Get bytes directly.

def get_bytes(blob) -> bytes:
  return blob.inline_content

read_blob_as_bytes = udf(get_bytes, BinaryType())

檔案存取範例

若要傳回檔案路徑:

# Suppose you have a simple UDF that converts a file's raw bytes to a UTF-8 string.

def file_bytes_to_text(fname):
  with open(fname, "rb") as f:
    return f.read().decode("utf-8")
file_bytes_to_text_udf = udf(file_bytes_to_text)

# Chain your UDF with the file access UDF for the file path.

df.withColumn("text_content",
file_bytes_to_text_udf(read_blob_as_file("content",
"file_metadata.name"))).collect()

若要以位元組陣列的形式傳回資料:

# Suppose you have a simple UDF that converts a file's raw bytes to a UTF-8 string.

def bytes_to_text(bytes_data):
  return bytes_data.decode("utf-8")
bytes_to_text_udf = udf(bytes_to_text)

# Chain your UDF with the file access UDF for the byte array.

df.withColumn("text_content",
bytes_to_text_udf(read_blob_as_bytes("content"))).collect()

備註

檔案存取 UDF 無法處理大於 100 MB 的檔案檔案內容。 您必須先篩選掉這些數據列,才能使用檔案存取 UDF。

因為檔案路徑 UDF 會寫入本機磁碟,所以它只適用於單一使用者叢集。 如果您想要改為在傳統叢集或無伺服器計算上執行下游管線,您可以更新 UDF 以寫入 Unity 目錄磁碟區,而不是寫入本機磁碟。 不過,這會降低效能。

若要寫入磁碟區:

# Update the volume_path in the function below.
from pyspark.sql.functions import udf, struct


# copy to volume_path and get file path
def copy_to_disk(blob, filename) -> str:
  # UPDATE THIS VALUE
  volume_path = "/Volumes/<my_catalog>/<my schema>/<my volume name>/"


  fname = volume_path + filename
  with open(fname, "wb") as f:
    f.write(blob.inline_content)
  return fname


read_blob_as_file = udf(copy_to_disk)