次の方法で共有


SharePoint からファイルを取り込む

Important

標準の SharePoint コネクタはベータ版です。 ワークスペース管理者は、[ プレビュー] ページからこの機能へのアクセスを制御できます。 Azure Databricks プレビューの管理を参照してください。

Microsoft SharePoint から Delta テーブルに構造化ファイル、半構造化ファイル、非構造化ファイルを取り込む方法について説明します。 SharePoint コネクタは、バッチ API とストリーミング API (自動ローダー、 spark.readCOPY INTOなど) を使用した SharePoint ファイルの増分インジェストを、すべて Unity カタログ ガバナンスでサポートします。

SharePoint コネクタを選択する

Lakeflow Connect には、2 つの補完的な SharePoint コネクタが用意されています。 どちらも SharePoint のデータにアクセスしますが、個別の目標をサポートしています。

考慮事項 マネージド SharePoint コネクタ Standard SharePoint コネクタ
管理とカスタマイズ フル マネージド コネクタ。
Delta テーブルにデータを取り込み、ソースとの同期を維持するエンタープライズ アプリケーション向けのシンプルでメンテナンスの少ないコネクタ。 Lakeflow Connect のマネージド コネクタを参照してください。
バッチ API とストリーミング API ( read_filesspark.readCOPY INTO、自動ローダーなど) を使用して、SQL、PySpark、または Lakeflow Spark 宣言パイプラインを使用してカスタム インジェスト パイプラインを構築します。
インジェスト中に複雑な変換を柔軟に実行できる一方で、パイプラインの管理と保守に対する責任が高くなります。
出力形式 均一バイナリ コンテンツ テーブル。 各ファイルをバイナリ形式 (1 行に 1 つのファイル) でインジェストし、ファイルメタデータも含めて、
追加の列。
構造化デルタ テーブル。 構造化ファイル (CSV や Excel など) をデルタ テーブルとして取り込みます。 取り込みにも使用できます
バイナリ形式の非構造化ファイル。
粒度、フィルター処理、および選択 現在、サブフォルダーまたはファイル レベルは選択されていません。 パターン ベースのフィルター処理はありません。
指定した SharePoint ドキュメント ライブラリ内のすべてのファイルを取り込みます。
きめ細かくカスタム。
ドキュメント ライブラリ、サブフォルダー、または個々のファイルから取り込む URL ベースの選択。 また、 pathGlobFilter オプションを使用したパターン ベースのフィルター処理もサポートしています。

主な機能

標準の SharePoint コネクタには、次のものが用意されています。

  • 構造化ファイル、半構造化ファイル、および非構造化ファイルの取り込み
  • 詳細インジェスト: 特定のサイト、サブサイト、ドキュメントライブラリ、フォルダー、または1つのファイルをインジェストする
  • バッチおよびストリーミング取り込みをspark.read、オートローダー、そしてCOPY INTOを使用して行う。
  • CSV や Excel などの構造化形式と半構造化形式の自動スキーマ推論と進化
  • Unity カタログ接続を使用して資格情報ストレージをセキュリティで保護する
  • パターン マッチングを使用したファイルの選択 pathGlobFilter

Requirements

SharePoint からファイルを取り込むには、次のものが必要です。

  • Unity カタログが有効になっているワークスペース
  • CREATE CONNECTION SharePoint 接続を作成するための特権 (または既存の接続を使用する USE CONNECTION )
  • Databricks Runtime バージョン 17.3 LTS 以降を使用するコンピューティング
  • Sites.Read.Allアクセス許可スコープを使用して設定された OAuth 認証
  • SharePoint ベータ機能は 、[プレビュー ] ページから有効になります。 Azure Databricks プレビューの管理を参照してください
  • 省略可能: Excel ファイルを解析するための Excel ベータ機能を有効にします。 Excel ファイルの読み取りを参照してください

接続の作成

SharePoint 資格情報を格納するための Unity カタログ接続を作成します。 接続のセットアップ プロセスは、標準の SharePoint コネクタとマネージド SharePoint コネクタの両方で共有されます。

OAuth 認証オプションを含む完全な接続セットアップ手順については、「 SharePoint インジェストセットアップの概要」を参照してください。

SharePoint からファイルを読み取る

Spark を使用して SharePoint からファイルを読み取る場合は、 databricks.connection データ ソース オプションを使用して前の手順で作成した接続を指定し、アクセスする SharePoint リソースの URL を指定します。 この URL は、特定のファイル、フォルダー、ドキュメント ライブラリ (ドライブ)、またはサイト全体を参照できます。 たとえば、次のようになります。

  • https://mytenant.sharepoint.com/sites/test-site/
  • https://mytenant.sharepoint.com/sites/test-site/test-subsite
  • https://mytenant.sharepoint.com/sites/test-site/test-drive
  • https://mytenant.sharepoint.com/sites/test-site/Shared%20Documents/Forms/AllItems.aspx
  • https://mytenant.sharepoint.com/sites/test-site/test-drive/test-folder
  • https://mytenant.sharepoint.com/sites/test-site/test-drive/test-folder/test.csv
  • https://mytenant.sharepoint.com/sites/test-site/another-subsite/another-drive/test.csv

例示

標準の SharePoint コネクタを使用してファイルを読み取るコード例を見つけます。

自動ローダーを使用して SharePoint ファイルをストリーム配信する

自動ローダーは、SharePoint から構造化ファイルを増分的に取り込むための最も効率的な方法を提供します。 新しいファイルが自動的に検出され、到着すると処理されます。 また、自動スキーマ推論と進化を使用して、CSV や JSON などの構造化された半構造化ファイルを取り込むこともできます。 自動ローダーの使用方法の詳細については、「 一般的なデータ読み込みパターン」を参照してください。

# Incrementally ingest new PDF files
df = (spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", "binaryFile")
    .option("databricks.connection", "my_sharepoint_conn")
    .option("cloudFiles.schemaLocation", <path to a schema location>)
    .option("pathGlobFilter", "*.pdf")
    .load("https://mytenant.sharepoint.com/sites/Marketing/Shared%20Documents")
)

# Incrementally ingest CSV files with automatic schema inference and evolution
df = (spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .option("databricks.connection", "my_sharepoint_conn")
    .option("pathGlobFilter", "*.csv")
    .option("inferColumnTypes", True)
    .option("header", True)
    .load("https://mytenant.sharepoint.com/sites/Engineering/Data/IoT_Logs")
)

Spark バッチ読み取りを使用して SharePoint ファイルを読み取る

# Read unstructured data as binary files
df = (spark.read
        .format("binaryFile")
        .option("databricks.connection", "my_sharepoint_conn")
        .option("recursiveFileLookup", True)
        .option("pathGlobFilter", "*.pdf") # optional. Example: only ingest PDFs
        .load("https://mytenant.sharepoint.com/sites/Marketing/Shared%20Documents"))

# Read a batch of CSV files, infer the schema, and load the data into a DataFrame
df = (spark.read
        .format("csv")
        .option("databricks.connection", "my_sharepoint_conn")
        .option("pathGlobFilter", "*.csv")
        .option("recursiveFileLookup", True)
        .option("inferSchema", True)
        .option("header", True)
        .load("https://mytenant.sharepoint.com/sites/Engineering/Data/IoT_Logs"))

# Read a specific Excel file from SharePoint, infer the schema, and load the data into a DataFrame
df = (spark.read
        .format("excel")
        .option("databricks.connection", "my_sharepoint_conn")
        .option("headerRows", 1)                   # optional
        .option("dataAddress", "'Sheet1'!A1:M20")  # optional
        .load("https://mytenant.sharepoint.com/sites/Finance/Shared%20Documents/Monthly/Report-Oct.xlsx"))

Spark SQL を使用して SharePoint ファイルを読み取る

次の例は、テーブル値関数を使用して SQL に SharePoint ファイル read_files 取り込む方法を示しています。 read_filesの使用方法の詳細については、テーブル値関数read_files参照してください。

-- Read pdf files
CREATE TABLE my_table AS
SELECT * FROM read_files(
  "https://mytenant.sharepoint.com/sites/Marketing/Shared%20Documents",
  `databricks.connection` => "my_sharepoint_conn",
  format => "binaryFile",
  pathGlobFilter => "*.pdf", -- optional. Example: only ingest PDFs
  schemaEvolutionMode => "none"
);

-- Read a specific Excel sheet and range
CREATE TABLE my_sheet_table AS
SELECT * FROM read_files(
  "https://mytenant.sharepoint.com/sites/Finance/Shared%20Documents/Monthly/Report-Oct.xlsx",
  `databricks.connection` => "my_sharepoint_conn",
  format => "excel",
  headerRows => 1,  -- optional
  dataAddress => "'Sheet1'!A2:D10", -- optional
  schemaEvolutionMode => "none"
);

増分インジェストCOPY INTO

COPY INTO は、Delta テーブルへのファイルのべき等増分読み込みを提供します。 COPY INTOの使用方法の詳細については、「COPY INTOを使用した一般的なデータ読み込みパターン」を参照してください。

CREATE TABLE IF NOT EXISTS sharepoint_pdf_table;
CREATE TABLE IF NOT EXISTS sharepoint_csv_table;
CREATE TABLE IF NOT EXISTS sharepoint_excel_table;

# Incrementally ingest new PDF files
COPY INTO sharepoint_pdf_table
  FROM "https://mytenant.sharepoint.com/sites/Marketing/Shared%20Documents"
  FILEFORMAT = BINARYFILE
  PATTERN = '*.pdf'
  FORMAT_OPTIONS ('databricks.connection' = 'my_sharepoint_conn')
  COPY_OPTIONS ('mergeSchema' = 'true');

# Incrementally ingest CSV files with automatic schema inference and evolution
COPY INTO sharepoint_csv_table
  FROM "https://mytenant.sharepoint.com/sites/Engineering/Data/IoT_Logs"
  FILEFORMAT = CSV
  PATTERN = '*.csv'
  FORMAT_OPTIONS ('databricks.connection' = 'my_sharepoint_conn', 'header' = 'true', 'inferSchema' = 'true')
  COPY_OPTIONS ('mergeSchema' = 'true');

# Ingest a single Excel file
COPY INTO sharepoint_excel_table
  FROM "https://mytenant.sharepoint.com/sites/Finance/Shared%20Documents/Monthly/Report-Oct.xlsx"
  FILEFORMAT = EXCEL
  FORMAT_OPTIONS ('databricks.connection' = 'my_sharepoint_conn', 'headerRows' = '1')
  COPY_OPTIONS ('mergeSchema' = 'true');

Lakeflow Spark 宣言パイプラインで SharePoint ファイルを取り込む

SharePoint コネクタには、Databricks Runtime 17.3 以降が必要です。 これは、Lakeflow Spark 宣言パイプライン リリースではまだ使用できません。 Lakeflow Spark 宣言パイプライン リリースで使用される Databricks ランタイムバージョンを確認するには、そのリリースの リリース ノート を参照してください。

次の例では、Lakeflow Spark 宣言パイプラインで自動ローダーを使用して SharePoint ファイルを読み取る方法を示します。

Python

from pyspark import pipelines as dp

# Incrementally ingest new PDF files
@dp.table
def sharepoint_pdf_table():
  return (spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", "binaryFile")
    .option("databricks.connection", "my_sharepoint_conn")
    .option("pathGlobFilter", "*.pdf")
    .load("https://mytenant.sharepoint.com/sites/Marketing/Shared%20Documents")
  )

# Incrementally ingest CSV files with automatic schema inference and evolution
@dp.table
def sharepoint_csv_table():
  return (spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .option("databricks.connection", "my_sharepoint_conn")
      .option("pathGlobFilter", "*.csv")
      .option("inferColumnTypes", True)
      .option("header", True)
      .load("https://mytenant.sharepoint.com/sites/Engineering/Data/IoT_Logs")
  )

# Read a specific Excel file from SharePoint in a materialized view
@dp.table
def sharepoint_excel_table():
  return (spark.read.format("excel")
    .option("databricks.connection", "my_sharepoint_conn")
    .option("headerRows", 1)                   # optional
    .option("inferColumnTypes", True)            # optional
    .option("dataAddress", "'Sheet1'!A1:M20")  # optional
    .load("https://mytenant.sharepoint.com/sites/Finance/Shared%20Documents/Monthly/Report-Oct.xlsx")

SQL

-- Incrementally ingest new PDF files
CREATE OR REFRESH STREAMING TABLE sharepoint_pdf_table
AS SELECT * FROM STREAM read_files(
  "https://mytenant.sharepoint.com/sites/Marketing/Shared%20Documents",
  format => "binaryFile",
  `databricks.connection` => "my_sharepoint_conn",
  pathGlobFilter => "*.pdf");

-- Incrementally ingest CSV files with automatic schema inference and evolution
CREATE OR REFRESH STREAMING TABLE sharepoint_csv_table
AS SELECT * FROM STREAM read_files(
  "https://mytenant.sharepoint.com/sites/Engineering/Data/IoT_Logs",
  format => "csv",
  `databricks.connection` => "my_sharepoint_conn",
  pathGlobFilter => "*.csv",
  "header", "true");

-- Read a specific Excel file from SharePoint in a materialized view
CREATE OR REFRESH MATERIALIZED VIEW sharepoint_excel_table
AS SELECT * FROM read_files(
  "https://mytenant.sharepoint.com/sites/Finance/Shared%20Documents/Monthly/Report-Oct.xlsx",
  `databricks.connection` => "my_sharepoint_conn",
  format => "excel",
  headerRows => 1,  -- optional
  dataAddress => "'Sheet1'!A2:D10", -- optional
  `cloudFiles.schemaEvolutionMode` => "none"
);

非構造化ファイルを解析する

binaryFile形式の標準 SharePoint コネクタを使用して SharePoint から非構造化ファイル (PDF、Word 文書、PowerPoint ファイルなど) を取り込む場合、ファイルの内容は生のバイナリ データとして格納されます。 RAG、検索、分類、ドキュメントの理解などの AI ワークロード用にこれらのファイルを準備するには、 ai_parse_documentを使用して、バイナリ コンテンツを構造化されたクエリ可能な出力に解析できます。

次の例は、 documents という名前のブロンズ Delta テーブルに格納されている非構造化ドキュメントを解析し、解析されたコンテンツを含む新しい列を追加する方法を示しています。

CREATE TABLE documents AS
SELECT * FROM read_files(
  "https://mytenant.sharepoint.com/sites/Marketing/Shared%20Documents",
  `databricks.connection` => "my_sharepoint_conn",
  format => "binaryFile",
  pathGlobFilter => "*.pdf,*.docx",
  schemaEvolutionMode => "none"
);
SELECT *, ai_parse_document(content) AS parsed_content
FROM documents;

parsed_content列には、抽出されたテキスト、テーブル、レイアウト情報、およびダウンストリーム AI パイプラインに直接使用できるメタデータが含まれています。

Lakeflow Spark 宣言パイプラインを使用した増分解析

また、Lakeflow Spark 宣言パイプライン内の ai_parse_document を使用して、増分解析を有効にすることもできます。 SharePoint から新しいファイルが流れてくると、パイプラインの更新時に自動的に解析されます。

たとえば、新しく取り込まれたドキュメントを継続的に解析する具体化されたビューを定義できます。

CREATE OR REFRESH STREAMING TABLE sharepoint_documents_table
AS SELECT * FROM STREAM read_files(
  "https://mytenant.sharepoint.com/sites/Marketing/Shared%20Documents",
  format => "binaryFile",
  `databricks.connection` => "my_sharepoint_conn",
  pathGlobFilter => "*.pdf,*.docx");

CREATE OR REFRESH MATERIALIZED VIEW documents_parsed
AS
SELECT *, ai_parse_document(content) AS parsed_content
FROM sharepoint_documents_table;

この方法により、次のことが保証されます。

  • 新しく取り込まれた SharePoint ファイルは、具体化されたビューが更新されるたびに自動的に解析されます
  • 解析された出力と受信データの同期が維持される
  • ダウンストリーム AI パイプラインは、常に最新のドキュメントの表現で動作します。

詳細情報: サポートされている形式と詳細オプションについては、 ai_parse_document を参照してください。

制限事項

標準の SharePoint コネクタには、次の制限があります。

  • マルチサイト インジェストなし: 同じクエリを使用して複数のサイトを取り込むことはできません。 2 つのサイトから取り込むには、2 つの個別のクエリを記述する必要があります。
  • フィルター処理: pathGlobFilter オプションを使用して、名前でファイルをフィルター処理できます。 フォルダー パスベースのフィルター処理はサポートされていません。
  • サポートされていない形式: SharePoint リストと .aspx サイト ページはサポートされていません。 ドキュメント ライブラリ内のファイルのみがサポートされています。
  • SharePoint サーバーへの書き戻しはサポートされていません。
  • 自動ローダー cleanSource (インジェスト後のソースでのファイルの削除またはアーカイブ) はサポートされていません。

次のステップ