Direct Lake のパフォーマンスは、セマンティック モデルの設計とクエリの複雑さとは別に、効率的で高速な列読み込み (コード変換) と最適なクエリ実行のために、適切に調整された Delta テーブルに特に依存します。 必ず V オーダー最適化を適用してください。 また、Parquet ファイルの数を小さくし、大きな行グループを使用し、Delta ログに対するデータ更新の影響を最小限に抑えるように努めます。 これらは一般的なベスト プラクティスであり、コールド、セミウォーム、ウォーム、ホットの Direct Lake モードの状態でクエリを高速に実行するのに役立ちます。
この記事では、Direct Lake のパフォーマンスが Delta テーブルの正常性と効率的なデータ更新にどのように依存するかについて説明します。 これらの依存関係を理解することは非常に重要です。 Parquet ファイル内のデータ レイアウトは、優れたセマンティック モデル設計と適切に調整されたデータ分析式 (DAX) メジャーと同様に、クエリのパフォーマンスにとって重要であることを学習します。
知っておくべきこと
この記事では、次の概念を既に理解していることを前提としています。
- OneLake のデルタ テーブル: OneLake の Delta テーブルに関する詳細情報は、 Microsoft Fabric の基礎ドキュメントで入手できます。
- Parquet ファイル、行グループ、およびデルタ ログ: Parquet ファイル形式 (データを行グループと列チャンクに編成する方法、メタデータの処理方法など) については、 Parquet ファイル形式のドキュメントで説明します。 デルタ トランザクション ログ プロトコルのドキュメントも参照してください。
- デルタテーブルの最適化とVオーダー: Deltaテーブルのメンテナンスについては、Delta Lakeテーブルの最適化やVオーダーなど、Fabric Lakehouseのドキュメントを参照してください。
- フレーミングとコード変換: Direct Lake セマンティック モデルの更新 (フレーミング) とオンデマンド列データ読み込み (コード変換) は重要な概念であり、 Direct Lake の概要に関する記事の入門レベルで説明されています。
- 数式およびストレージ エンジン: DAX クエリが実行されると、数式エンジンによってクエリ プランが生成され、ストレージ エンジンから必要なデータと初期集計が取得されます。 この記事で説明する最適化は、ストレージ エンジンに焦点を当てています。 数式エンジンとストレージ エンジンの詳細については、 Analysis Services 開発者向けドキュメントを参照してください。
- VertiPaq と VertiScan: インポート モードと Direct Lake モードでは、ストレージ エンジンは VertiPaq エンジンを使用して、列形式のメモリ内ストアを維持します。 VertiScan を使用すると、数式エンジンは VertiPaq と対話できます。 詳細については、 Analysis Services 開発者向けドキュメントを参照してください。
- 辞書エンコード: Parquet ファイルと VertiPaq の両方でディクショナリ エンコードが使用されます。これは、int、long、date、char などのさまざまなデータ型の個々の列に適用されるデータ圧縮手法です。 これは、実行長エンコード (RLE)/Bit-Packing ハイブリッド エンコードを使用して、メモリ内の各一意の列値を整数として格納することによって機能します。 VertiPaq では常にディクショナリ エンコードが使用されますが、Parquet は、Parquet File-Format ドキュメントのエンコードで説明されているように、状況によってはプレーン エンコードまたはデルタ エンコードに切り替わる場合があります。この場合、Direct Lake では、コード変換のパフォーマンスに対応する効果を持つデータを再エンコードする必要があります。
- 列チャンクと列セグメント: Parquet と VertiPaq が列データを格納して効率的なデータ取得を行う方法を参照してください。 テーブル内の各列は、個別に処理および圧縮できる小さなチャンクに分割されます。 VertiPaq は、これらのチャンク セグメントを呼び出します。 DISCOVER_STORAGE_TABLE_COLUMN_SEGMENTS スキーマ行セットを使用して、Direct Lake セマンティック モデルの列セグメントに関する情報を取得できます。
- Python と Jupyter Notebook: Jupyter Notebook は、Python コードを記述して実行するための対話型環境を提供します。 Python の基本的な知識は、この章の後半のコード スニペットに従う場合に役立ちます。 詳細については、 Python 言語リファレンスを参照してください。 Microsoft Fabric でノートブックを使用する方法については、「ノートブックの 使用方法 - Microsoft Fabric」を参照してください。
Direct Lake クエリのパフォーマンスに影響するもの
このセクションでは、Direct Lake のパフォーマンスに影響を与える主な要因について説明します。 以降のセクションでは、より詳細な説明を提供します。
- V オーダー圧縮: 圧縮の有効性がクエリのパフォーマンスに影響する可能性があります。これは、圧縮が向上すると、データの読み込みが高速化され、クエリ処理が効率的になります。 圧縮されたデータをストリーミングするとコード変換の効率が向上するため、データの読み込みは高速です。 V オーダー圧縮を使用すると、VertiScan は圧縮されたデータの上に直接結果を計算し、展開手順をスキップできるため、クエリのパフォーマンスも最適です。
- データ型: 列に適切なデータ型を使用すると、圧縮とパフォーマンスが向上します。 たとえば、可能な限り文字列の代わりに整数データ型を使用し、整数を文字列として格納しないようにします。
- セグメントのサイズと数: VertiPaq では、列データがセグメントに格納されます。 多数の小さいセグメントがクエリのパフォーマンスに悪影響を与える可能性があります。 大きなテーブルの場合、Direct Lake では、100 万から 1,600 万行の間など、大きなセグメント サイズが優先されます。
- 列のカーディナリティ: カーディナリティの高い列 (一意の値が多い列) では、パフォーマンスが低下する可能性があります。 カーディナリティを可能な限り減らすことが役立ちます。
- インデックスと集計: カーディナリティが低い列は、ディクショナリ エンコードの利点を活用します。これにより、スキャンする必要があるデータの量を減らすことでクエリを高速化できます。
- DirectQuery フォールバック: フォールバック操作では、Fabric データ ソースの SQL 分析エンドポイントからデータをフェッチする必要があるため、クエリのパフォーマンスが低下する可能性があります。 さらに、フォールバックは、Direct Lake がフォールバックする必要がない場合でも、DirectQuery と VertiScan の両方をサポートするためにハイブリッド クエリ プランに依存し、パフォーマンスに関するいくつかのトレードオフがあります。 可能であれば、ハイブリッド クエリ プランを回避するために DirectQuery フォールバックを無効にします。
-
メモリ常駐度: Direct Lake セマンティックモデルは、コールド、ウォーム、ホットの状態があり、コールドからホットに移行するにつれてパフォーマンスが向上します。 コールドからウォームへの移行は、Direct Lake の優れたパフォーマンスの鍵となります。
- コールド: VertiPaq ストアが空です。 DAX クエリに応答するために必要なすべてのデータは、Delta テーブルから読み込む必要があります。
- Semiwarm: Direct Lake は、削除された行グループに属するフレーム中に、それらの列セグメントのみを削除します。 つまり、更新または新しく追加されたデータのみを読み込む必要があります。 Direct Lake セマンティック モデルは、メモリ プレッシャーがかかったときに、セグメントをアンロードし、インデックスを結合する必要がある場合、半メモリー暖状態になる可能性もあります。
- ウォーム: DAX クエリに応答するために必要な列データは、既にメモリに完全に読み込まれています。
- ホット: 列データは既にメモリに完全に読み込まれ、VertiScan キャッシュが設定され、DAX クエリがキャッシュにヒットします。
- メモリ不足: Direct Lake は、DAX クエリに応答するために必要なすべての列データをメモリに読み込む必要があり、使用可能なメモリ リソースを使い果たす可能性があります。 メモリが不足している場合、Direct Lake は以前に読み込まれた列データをアンロードする必要があります。この場合、Direct Lake は後続の DAX クエリのために再度再読み込みする必要があります。 Direct Lake セマンティック モデルのサイズを適切に設定すると、頻繁な再読み込みを回避するのに役立ちます。
メモリの保存場所とクエリのパフォーマンス
Direct Lake は暖かい状態またはホットな状態で最高のパフォーマンスを発揮しますが、コールド状態の場合はパフォーマンスが低下します。 Direct Lake では、インクリメンタル フレームを使用することで、できるだけ寒さに戻らないようにします。
ブートストラップ
最初のセマンティック モデルの読み込み後、列データはまだメモリに存在しません。 Direct Lake は寒いです。 クライアントがコールド状態の Direct Lake セマンティック モデルに DAX クエリを送信する場合、Direct Lake は、DAX クエリを処理して回答できるように、次の主なタスクを実行する必要があります。
VertiPaq 辞書のコード変換。 Direct Lake では、列チャンクごとにローカル Parquet ディクショナリをマージして、列のグローバル VertiPaq ディクショナリを作成する必要があります。 この マージ 操作は、クエリの応答時間に影響します。
Parquet 列チャンクを列セグメントに読み込む。 ほとんどの場合、これは、両方の側で RLE/Bit-Packing ハイブリッド エンコードを使用できる場合に、Parquet データ ID を VertiPaq ID に直接再マップすることです。 Parquet ディクショナリでプレーン エンコードを使用する場合、VertiPaq は値を RLE/Bit-Packing ハイブリッド エンコードに変換する必要があります。これは時間がかかります。
- V オーダーにより RLE 圧縮の品質が向上するため、V オーダー Parquet ファイルでは Direct Lake のパフォーマンスが最適です。 Direct Lake では、圧縮データが少ないよりも高速に、密にパックされた V オーダーデータを読み込むことができます。
結合インデックスの生成。 DAX クエリが複数のテーブルの列にアクセスする場合、VertiScan がテーブルを正しく結合できるように、Direct Lake はテーブルのリレーションシップに従って結合インデックスを作成する必要があります。 結合インデックスを作成するには、リレーションシップに参加しているキー列のディクショナリと主キー列の列セグメント (テーブル リレーションシップの一方の列) を Direct Lake が読み込む必要があります。
デルタ削除ベクトルの適用。 ソース Delta テーブルで削除ベクトルが使用されている場合、削除されたデータがクエリ処理から除外されるように、Direct Lake はこれらの削除ベクターを読み込む必要があります。
手記
コールド状態は、
processClear
の後にprocessFull
XMLA コマンドをモデルに送信することによっても誘発できます。ProcessClear
コマンドは、すべてのデータと、フレーム化された Delta テーブルバージョンとの関連付けを削除します。ProcessFull
XMLA コマンドは、モデルを最新の利用可能なデルタコミットバージョンにバインドするための枠組みを行います。
インクリメンタルフレーミング
フレーム作成中、Direct Lake は各 Delta テーブルのデルタ ログを分析し、基になるデータが変更された場合にのみ、読み込まれた列セグメントと結合インデックスを削除します。 不要なコード変換を回避するためにディクショナリが保持され、新しい値が既存のディクショナリに追加されるだけです。 この増分フレーミングアプローチにより、リロードの負担が軽減され、コールド クエリのパフォーマンスが向上します。
DISCOVER_STORAGE_TABLE_COLUMN_SEGMENTS
スキーマ行セットをラップするINFO.STORAGETABLECOLUMNSEGMENTS()
DAX 関数を使用して、増分フレームの有効性を分析できます。 意味のある結果を得るには、次の手順に従います。
- Direct Lake セマンティック モデルにクエリを実行して、ウォーム状態またはホット状態であることを確認します。
- 調査する Delta テーブルを更新し、Direct Lake セマンティック モデルを更新してフレーミングを実行します。
- 次のスクリーンショットのように、
INFO.STORAGETABLECOLUMNSEGMENTS()
関数を使用して列セグメント情報を取得する DAX クエリを実行します。 このスクリーンショットでは、図の目的で小さなサンプル テーブルを使用しています。 各列には 1 つのセグメントしかありません。 セグメントはメモリ内にありません。 これは、真のコールド状態を示します。 フレームを作成する前にモデルがウォームだった場合は、差分テーブルが破壊的なデータ読み込みパターンを使用して更新され、増分フレームを使用できなくなることを意味します。 差分テーブルの更新パターンについては、この記事の後半で説明します。
手記
Delta テーブルが更新プログラムを受信しない場合、メモリに既に存在する列に再読み込みは必要ありません。 非破壊的更新パターンを使用する場合、インクリメンタル フレーミングでは基本的に Direct Lake が既存のインメモリ データのかなりの部分を更新できるため、フレーム作成後のパフォーマンスの影響はクエリではるかに少なくなります。
フル メモリ常駐
ディクショナリ、列セグメント、結合インデックスが読み込まれると、Direct Lake はインポート モードと同等のクエリ パフォーマンスでウォーム状態に達します。 どちらのモードでも、列セグメントの数とサイズは、クエリのパフォーマンスを最適化する上で重要な役割を果たします。
Deltaテーブルの差異
Parquet ファイルは、行ではなく、列によってデータを整理します。 Direct Lake では、列ごとにデータも整理されます。 配置によりシームレスな統合が容易になりますが、特に行グループとディクショナリに関する重要な違いがあります。
行グループと列セグメント
Parquet ファイル内の行グループは列チャンクで構成され、各チャンクには特定の列のデータが含まれます。 一方、セマンティック モデルの列セグメントには、列データのチャンクも含まれます。
Delta テーブルの行グループの合計数と、対応するセマンティック モデル テーブルの各列のセグメント数の間には、直接的な関係があります。 たとえば、現在のすべての Parquet ファイルの Delta テーブルに合計で 3 つの行グループがある場合、対応するセマンティック モデル テーブルには、次の図に示すように、列ごとに 3 つのセグメントがあります。 つまり、Delta テーブルに多数の小さな行グループがある場合、対応するセマンティック モデル テーブルにも多数の小さな列セグメントが含まれます。 これはクエリのパフォーマンスに悪影響を及ぼします。
手記
Direct Lake では大きな列セグメントが優先されるため、ソース Delta テーブルの行グループは大きい方が理想的です。
ローカル ディクショナリとグローバル ディクショナリ
Parquetファイルがローカルディクショナリを使用するのに対し、Direct Lakeセマンティックモデルは各列にグローバルディクショナリを使用するため、次の図に示すように、Deltaテーブルの行グループの合計数はディクショナリのトランスコーディングのパフォーマンスに直接影響を与えます。 行グループの数が多いほど、Direct Lake がグローバル ディクショナリを作成するためにマージする必要があるローカル ディクショナリの数が多くなり、コード変換の完了にかかる時間が長くなります。
デルタテーブルの更新パターン
Delta テーブルにデータを取り込む方法は、増分フレームの効率に大きく影響する可能性があります。 たとえば、既存のテーブルにデータを読み込むときに [上書き] オプションを使用すると、読み込みごとに差分ログが消去されます。 つまり、Direct Lake では増分フレームを使用できないため、すべてのデータ、ディクショナリ、結合インデックスを再読み込みする必要があります。 このような破壊的な更新パターンは、クエリのパフォーマンスに悪影響を与えます。
このセクションでは、Direct Lake で増分フレーミングを使用し、辞書、列セグメント、結合インデックスなどの VertiPaq 列ストア要素を保持してトランスコード効率を最大化し、コールド クエリのパフォーマンスを向上させる Delta テーブルの更新パターンについて説明します。
パーティション分割なしのバッチ処理
この更新パターンでは、週単位や月単位など、スケジュールされた間隔で大量のバッチでデータを収集して処理します。 新しいデータが到着すると、テーブルのサイズを制御できるように、古いデータがローリング ウィンドウまたはスライディング ウィンドウ形式で削除されることがよくあります。 ただし、データがほとんどの Parquet ファイルに分散している場合は、古いデータを削除することが難しい場合があります。 たとえば、30 日のうち 1 日を削除すると、5%ではなく、Parquet ファイルの 95% に影響する可能性があります。 この場合、Direct Lake では、比較的小さな 削除 操作でも、95% のデータを再読み込みする必要があります。 既存の行の更新にも同じ問題が適用されます。更新プログラムは削除と追加が組み合わされているためです。 この 記事の後半 で説明するように、Delta Analyzer を使用して削除操作と 更新 操作の影響を分析できます。
パーティション分割によるバッチ処理
デルタ テーブルのパーティション分割は、テーブルがパーティション列の個別の値に基づいてフォルダーに格納されている小さな Parquet ファイルに分割されるため、 削除 操作の影響を軽減するのに役立ちます。 一般的に使用されるパーティション列には、日付、領域、またはその他のディメンション カテゴリが含まれます。 30 日のうち 1 日を削除する前の例では、日付でパーティション分割された Delta テーブルは、その日のパーティションの Parquet ファイルのみに削除を制限します。 ただし、広範なパーティション分割により、Parquet ファイルと行グループの数が大幅に増え、Direct Lake セマンティック モデル内の列セグメントが過剰に増加し、クエリのパフォーマンスに悪影響を及ぼす可能性があることに注意してください。 クエリのパフォーマンスには、カーディナリティの低いパーティション列を選択することが重要です。 ベスト プラクティスとして、列の個別の値は 100 から 200 未満にする必要があります。
段階的な読み込み
増分読み込みでは、更新プロセスは、既存の Parquet ファイルと行グループに影響を与えずに、新しいデータのみを Delta テーブルに挿入します。 削除はありません。 Direct Lake は、既存の VertiPaq 列ストア要素を破棄して再読み込みすることなく、新しいデータを増分的に読み込むことができます。 この方法は、Direct Lake の増分フレームに対して適切に機能します。 Delta テーブルのパーティションは必要ないです。
ストリーム処理
データをほぼリアルタイムで処理すると、小さな Parquet ファイルや行グループが急増する可能性があり、Direct Lake のパフォーマンスに悪影響を及ぼす可能性があります。 増分読み込みパターンと同様に、Delta テーブルをパーティション分割する必要はありません。 ただし、テーブルのメンテナンスが頻繁に行われるのは、Parquet ファイルと行グループの数が 、Direct Lake の概要に関する記事で指定されているガードレールの制限内に留まるようにするために不可欠です。 言い換えると、Spark Optimize を定期的に実行することを忘れないでください (毎日、さらに頻繁など)。 Spark Optimize については、次のセクションでもう一度説明します。
手記
実際のリアルタイム分析は、Eventstreams、KQL データベース、Eventhouse を使用して最適に実装されます。 ガイダンスについては、 Microsoft Fabric のReal-Time Intelligence のドキュメント を参照してください。
Delta テーブルのメンテナンス
主なメンテナンス タスクには、Delta テーブルのバキュームと最適化が含まれます。 メンテナンス操作を自動化するには、Microsoft Fabric REST API を使用した Lakehouse の管理に関するドキュメントで説明されているように 、Lakehouse API を使用できます。
掃除 機
バキューム処理により、現在の差分コミットバージョンに含まれていない Parquet ファイルが削除され、設定された保持しきい値を超えて古いファイルが削除されます。 Direct Lake は現在のコミット バージョンの Parquet ファイルのみを読み込むため、これらの Parquet ファイルを削除しても Direct Lake のパフォーマンスには影響しません。 既定値を使用して VACUUM を毎日実行すると、過去 7 日間のデルタコミットバージョンがタイムトラベルのために保持されます。
重要
フレーム化された Direct Lake セマンティック モデルは特定の差分コミット バージョンを参照するため、モデルを現在のバージョンに移動するには、モデルをもう一度更新 (フレーム化) するまで、Delta テーブルがこのバージョンを保持していることを確認する必要があります。 それ以外の場合、Direct Lake セマンティック モデルが存在しなくなった Parquet ファイルにアクセスしようとすると、クエリ エラーが発生します。
Spark Optimize
Deltaテーブルの最適化では、複数の小さな Parquet ファイルが少数の大きなファイルにマージされます。 これがコールド Direct Lake のパフォーマンスに影響を与える可能性があるため、週末や月末といったあまり頻繁でないときに最適化することをお勧めします。 Delta テーブルがガードレールの制限内に収まるように、小さな Parquet ファイルが迅速に蓄積される (頻度の高い小さな更新プログラム) 場合は、より頻繁に最適化します。
パーティション分割は、データを効果的に照合するため、増分フレーミングに対する最適化効果を最小限に抑えるのに役立ちます。 たとえば、カーディナリティの低いdate_key列に基づいて大きな Delta テーブルをパーティション分割すると、毎週のメンテナンスが最大 7 つのパーティションに制限されます。 Delta テーブルでは、既存の Parquet ファイルの大部分が保持されます。 Direct Lake では、7 日間のデータを再読み込みするだけで済みます。
Delta テーブルの更新の分析
Delta Analyzer または同様のツールを使用して、Delta テーブルの更新が Parquet ファイルと行グループに与える影響を調査します。 Delta Analyzer を使用すると、 追加、 更新、 削除 の操作に応答して、Parquet ファイル、行グループ、列チャンク、列の進化を追跡できます。 Delta Analyzer は、 スタンドアロンの Jupyter Notebook として使用できます。
セマンティック リンク ラボ ライブラリでも使用できます。 次のセクションでは、semantic-link-labs を使用します。 このライブラリは、 %pip install semantic-link-labs
コマンドを使用してノートブックに簡単にインストールできます。
行グループのサイズ
Direct Lake セマンティック モデルの理想的な行グループ サイズは 100 万行から 1,600 万行ですが、データが圧縮可能な場合、Fabric では大きなテーブルに大きな行グループ サイズが使用される可能性があります。 一般に、既定の行グループ サイズを変更することはお勧めしません。 Fabric で Delta テーブル のレイアウトを管理することをお勧めします。 ただし、再確認することをお勧めします。
次の Python コードは、Fabric ノートブックに接続された lakehouse 内の Delta テーブルの行グループ のサイズやその他の詳細を分析するための開始点として機能します。 次の表は、10 億行のサンプル テーブルの出力を示しています。
import sempy_labs as labs
from IPython.display import HTML
from IPython.display import clear_output
table_name = "<Provide your table name>"
# Load the Delta table and run Delta Analyzer
df = spark.read.format("delta").load(f"Tables/{table_name}")
da_results = labs.delta_analyzer(table_name)
# Display the table summary in an HTML table.
clear_output()
df1 = da_results['Summary'].iloc[0]
html_table = "<table border='1'><tr><th>Column Name</th><th>{table_name}</th></tr>"
for column in da_results['Summary'].columns:
html_table += f"<tr><td>{column}</td><td>{df1[column]}</td></tr>"
html_table += "</table>"
display(HTML(html_table))
出力:
パラメーター | 価値 |
---|---|
テーブル名 | sales_1 |
[行数] | 1000000000 |
行グループ | 二十四 |
Parquet ファイル | 8 |
行グループあたりの最大行数 | 51210000 |
行グループあたりの最小行数 | 22580000 |
行グループあたりの平均行数 | 41666666.666666664 |
VOrder が有効 | 正しい |
合計サイズ | 7700808430 |
タイムスタンプ | 2025-03-24 03:01:02.794979 |
Delta Analyzer の概要には、約 4,000 万行の平均行グループ サイズが表示されます。 これは、推奨される最大行グループ サイズである 1,600 万行を超えています。 さいわい、行グループのサイズが大きいほど、Direct Lake に大きな問題は発生しません。 行グループを大きくすると、ストレージ エンジンのオーバーヘッドを最小限に抑えながら、継続的セグメント ジョブが容易になります。 逆に、100 万行以下の小さな行グループは、パフォーマンスの問題を引き起こす可能性があります。
前の例でさらに重要なのは、Fabric が行グループを 8 つの Parquet ファイルに分散させることです。 これは、効率的な並列 読み取 り操作をサポートするために、Fabric 容量のコア数に合わせて調整されます。 また、個々の行グループのサイズが平均から大きく逸脱しすぎないようにすることも重要です。 バリエーションが大きいと、一様でない VertiScan の読み込みが発生し、最適なクエリ パフォーマンスが低下する可能性があります。
ローリング ウィンドウの更新
説明のために、次の Python コード サンプルはローリング ウィンドウの更新をシミュレートします。 このコードは、最も古い DateID を持つ行をサンプルの Delta テーブルから削除します。 次に、これらの行の DateID を更新し、最新の行としてサンプル テーブルに再度挿入します。
from pyspark.sql.functions import lit
table_name = "<Provide your table name>"
table_df = spark.read.format("delta").load(f"Tables/{table_name}")
# Get the rows of the oldest DateID.
rows_df = table_df[table_df["DateID"] == 20200101]
rows_df = spark.createDataFrame(rows_df.rdd, schema=rows_df.schema)
# Delete these rows from the table
table_df.createOrReplaceTempView(f"{table_name}_view")
spark.sql(f"DELETE From {table_name}_view WHERE DateID = '20200101'")
# Update the DateID and append the rows as new data
rows_df = rows_df.withColumn("DateID", lit(20250101))
rows_df.write.format("delta").mode("append").save(f"Tables/{table_name}")
semantic-link-labs ライブラリのget_delta_table_history
関数は、このローリング ウィンドウ更新の効果を分析するのに役立ちます。 次の Python コード サンプルを参照してください。 コード スニペットの後の出力を含む表も参照してください。
import sempy_labs as labs
from IPython.display import HTML
from IPython.display import clear_output
table_name = "<Provide your table name>"
da_results = labs.get_delta_table_history(table_name)
# Create a single HTML table for specified columns
html_table = "<table border='1'>"
# Add data rows for specified columns
for index, row in da_results.iterrows():
for column in ['Version', 'Operation', 'Operation Parameters', 'Operation Metrics']:
if column == 'Version':
html_table += f"<tr><td><b>Version</b></td><td><b>{row[column]}</b></td></tr>"
else:
html_table += f"<tr><td>{column}</td><td>{row[column]}</td></tr>"
html_table += "</table>"
# Display the HTML table
display(HTML(html_table))
出力:
バージョン | 説明 | 価値 |
---|---|---|
2 | 操作 | 書き込む |
操作パラメーター | {'mode': 'Append', 'partitionBy': '[]'} | |
操作メトリック | {'numFiles': '1', 'numOutputRows': '548665', 'numOutputBytes': '4103076'} | |
1 | 操作 | 削除 |
操作パラメーター | {'predicate': '["(DateID#3910 = 20200101)"]'} | |
操作メトリック | {'numRemovedFiles': '8', 'numRemovedBytes': '7700855198', 'numCopiedRows': '999451335', 'numDeletionVectorsAdded': '0', 'numDeletionVectorsRemoved': '0', 'numAddedChangeFiles': '0', 'executionTimeMs': '123446', 'numDeletionVectorsUpdated': '0', 'numDeletedRows': '548665', 'scanTimeMs': '4820', 'numAddedFiles': '18', 'numAddedBytes': '7696900084', 'rewriteTimeMs': '198625'} | |
0 | 操作 | 書き込み |
操作パラメーター | {'mode': 'Overwrite', 'partitionBy': '[]'} | |
操作メトリック | {'numFiles': '8', 'numOutputRows': '1000000000', 'numOutputBytes': '7700892169'} |
上記の Delta Analyzer の履歴は、この Delta テーブルに次の 3 つのバージョンがあることを示しています。
- バージョン 0: これは、前のセクションで説明したように、8 つの Parquet ファイルと 24 行グループを含む元のバージョンです。
-
バージョン 1: このバージョンは 削除 操作を反映しています。 1 日分のデータ (DateID = '20200101') のみが 5 年間の売上トランザクションを含むサンプル テーブルから削除されましたが、8 つの Parquet ファイルすべてが影響を受けます。 操作メトリックでは、
numRemovedFiles
は 8 つの [Parquet ファイル] で、numAddedFiles
は 18 個の [Parquet ファイル] です。 つまり、 削除 操作によって元の 8 つの Parquet ファイルが 18 個の新しい Parquet ファイルに置き換えられました。 - バージョン 3: 操作メトリックにより、548,665 行の Parquet ファイルが Delta テーブルに追加されたことが明らかになりました。
ローリング ウィンドウの更新後、最新のデルタコミットバージョンには、19個のParquetファイルと21個の行グループが含まれており、サイズは500,000から5,000万行です。 548,665 行のローリング ウィンドウの更新は、10 億行の Delta テーブル全体に影響を与えました。 すべての Parquet ファイルと行グループを置き換えました。 この場合、増分フレーミングではコールド パフォーマンスを向上させる必要はありません。また、行グループ のサイズの変動が増えると、ウォーム パフォーマンスが向上する可能性は低くなります。
Deltaテーブルの更新
次の Python コードは、前のセクションで説明したのと基本的に同じ方法で Delta テーブルを更新します。 表面上、更新関数は、指定された DateID と一致する既存の行の DateID 値のみを変更します。 ただし、Parquet ファイルは不変であり、変更することはできません。 画面の下では、 更新 操作によって既存の Parquet ファイルが削除され、新しい Parquet ファイルが追加されます。 結果と効果は、ローリング ウィンドウの更新の場合と同じです。
from pyspark.sql.functions import col, when
from delta.tables import DeltaTable
# Load the Delta table
table_name = "<Provide your table name>"
delta_table = DeltaTable.forPath(spark, f"Tables/{table_name}")
# Define the condition and the column to update
condition = col("DateID") == 20200101
column_name = "DateID"
new_value = 20250101
# Update the DateID column based on the condition
delta_table.update(
condition,
{column_name: when(condition, new_value).otherwise(col(column_name))}
)
パーティション化されたローリングウィンドウの更新
パーティション分割は、テーブルの更新の影響を減らすのに役立ちます。 日付キーを使いたいと思うかもしれませんが、カーディナリティチェックを簡単に行うと、これが最適な選択ではないことが明らかになる可能性があります。 たとえば、これまでに説明したサンプル テーブルには、過去 5 年間の売上トランザクションが含まれています。これは、約 1800 個の個別の日付値に相当します。 このカーディナリティが高すぎます。 パーティション列の個別の値は 200 未満にする必要があります。
column_name = 'DateID'
table_name = "<Provide your table name>"
table_df = spark.read.format("delta").load(f"Tables/{table_name}")
distinct_count = table_df.select(column_name).distinct().count()
print(f"The '{column_name}' column has {distinct_count} distinct values.")
if distinct_count <= 200:
print(f"The '{column_name}' column is a good partitioning candidate.")
table_df.write.format("delta").partitionBy(column_name).save(f"Tables/{table_name}_by_date_id")
print(f"Table '{table_name}_by_date_id' partitioned and saved successfully.")
else:
print(f"The cardinality of the '{column_name}' column is possibly too high.")
出力:
The 'DateID' column has 1825 distinct values.
The cardinality of the 'DateID' column is possibly too high.
適切なパーティション列がない場合は、既存の列のカーディナリティを減らすことで、人為的に作成できます。 次の Python コードでは、DateID の最後の 2 桁を削除して Month 列を追加します。 これにより、60 個の個別の値が生成されます。 その後、サンプル コードは、Month 列でパーティション分割された Delta テーブルを保存します。
from pyspark.sql.functions import col, expr
column_name = 'DateID'
table_name = "sales_1"
table_df = spark.read.format("delta").load(f"Tables/{table_name}")
partition_column = 'Month'
partitioned_table = f"{table_name}_by_month"
table_df = table_df.withColumn(partition_column, expr(f"int({column_name} / 100)"))
distinct_count = table_df.select(partition_column).distinct().count()
print(f"The '{partition_column}' column has {distinct_count} distinct values.")
if distinct_count <= 200:
print(f"The '{partition_column}' column is a good partitioning candidate.")
table_df.write.format("delta").partitionBy(partition_column).save(f"Tables/{partitioned_table}")
print(f"Table '{partitioned_table}' partitioned and saved successfully.")
else:
print(f"The cardinality of the '{partition_column}' column is possibly too high.")
出力:
The 'Month' column has 60 distinct values.
The 'Month' column is a good partitioning candidate.
Table 'sales_1_by_month' partitioned and saved successfully.
Delta Analyzer の概要に、Delta テーブルのレイアウトが Direct Lake と適切に揃っていることを示すようになりました。 平均行グループ サイズは約 1,600 万行であり、行グループ サイズの平均絶対偏差であるため、セグメント サイズは 100 万行未満です。
パラメーター | 価値 |
---|---|
テーブル名 | 月別販売_1 |
[行数] | 1000000000 |
行グループ | 六十 |
Parquet ファイル | 六十 |
行グループあたりの最大行数 | 16997436 |
行グループあたりの最小行数 | 15339311 |
行グループあたりの平均行数 | 16666666.666666666 |
VOrder が有効 | 正しい |
合計サイズ | 7447946016 |
タイムスタンプ | 2025-03-24 03:01:02.794979 |
パーティション分割されたサンプル テーブルに対するローリング ウィンドウの更新後、Delta Analyzer の履歴には、影響を受けた Parquet ファイルが 1 つだけ表示されます。 次の出力表を参照してください。 バージョン 2 には、古い Parquet ファイルから置き換えの Parquet ファイルにコピーされた行が 16,445,655 行とまったく同じで、バージョン 3 では 548,665 行の新しい Parquet ファイルが追加されます。 合計で、Direct Lake は約 1,700 万行を再読み込みするだけで済み、パーティション分割なしで 10 億行の再読み込みよりも大きな改善を実現します。
バージョン | 説明 | 価値 |
---|---|---|
2 | 操作 | 書き込み |
操作パラメーター | {'mode': 'Append', 'partitionBy': '["Month"]'} | |
操作メトリック | {'numFiles': '1', 'numOutputRows': '548665', 'numOutputBytes': '4103076'} | |
1 | 操作 | 削除 |
操作パラメーター | {'predicate': '["(DateID#3910 = 20200101)"]'} | |
操作メトリック | {'numRemovedFiles': '1', 'numRemovedBytes': '126464179', 'numCopiedRows': '16445655', 'numDeletionVectorsAdded': '0', 'numDeletionVectorsRemoved': '0', 'numAddedChangeFiles': '0', 'executionTimeMs': '19065', 'numDeletionVectorsUpdated': '0', 'numDeletedRows': '548665', 'scanTimeMs': '1926', 'numAddedFiles': '1', 'numAddedBytes': '121275513', 'rewriteTimeMs': '17138'} | |
0 | 操作 | 書き込み |
操作パラメーター | {'mode': 'Overwrite', 'partitionBy': '["Month"]'} | |
操作メトリック | {'numFiles': '60', 'numOutputRows': '1000000000', 'numOutputBytes': '7447681467'} |
追加のみのパターンの後に Spark Optimize が続く
追加のみのパターンは、既存の Parquet ファイルには影響しません。 Direct Lake の増分フレームで適切に動作します。 ただし、この記事で前述したように、Parquet ファイルと行グループを統合するために Delta テーブルを最適化することを忘れないでください。 小さく頻繁に追加すると、ファイルが迅速に蓄積され、行グループのサイズの均一性がゆがむ可能性があります。
次の出力は、パーティション テーブルと比較した非パーティション テーブルの Delta Analyzer 履歴を示しています。 履歴には、7 つの追加と 1 つの後続の 最適化 操作が含まれます。
バージョン | 説明 | 既定のレイアウトの値 | パーティションされたレイアウトの値 |
---|---|---|---|
8 | 操作 | 最適化 | 最適化 |
操作パラメーター | {'predicate': '[]', 'auto': 'false', 'clusterBy': '[]', 'vorder': 'true', 'zOrderBy': '[]'} | {'predicate': '["('Month >= 202501)"]', 'auto': 'false', 'clusterBy': '[]', 'vorder': 'true', 'zOrderBy': '[]'} | |
操作メトリック | {'numRemovedFiles': '8', 'numRemovedBytes': '991234561', 'p25FileSize': '990694179', 'numDeletionVectorsRemoved': '0', 'minFileSize': '990694179', 'numAddedFiles': '1', 'maxFileSize': '990694179', 'p75FileSize': '990694179', 'p50FileSize': '990694179', 'numAddedBytes': '990694179'} | {'numRemovedFiles': '7', 'numRemovedBytes': '28658548', 'p25FileSize': '28308495', 'numDeletionVectorsRemoved': '0', 'minFileSize': '28308495', 'numAddedFiles': '1', 'maxFileSize': '28308495', 'p75FileSize': '28308495', 'p50FileSize': '28308495', 'numAddedBytes': '28308495'} | |
7 | 操作 | 書き込み | 書き込み |
操作パラメーター | {'mode': 'Append', 'partitionBy': '[]'} | {'mode': 'Append', 'partitionBy': '["Month"]'} | |
操作メトリック | {'numFiles': '1', 'numOutputRows': '547453', 'numOutputBytes': '4091802'} | {'numFiles': '1', 'numOutputRows': '547453', 'numOutputBytes': '4091802'} | |
6 | 操作 | 書き込み | 書き込み |
操作パラメーター | {'mode': 'Append', 'partitionBy': '[]'} | {'mode': 'Append', 'partitionBy': '["Month"]'} | |
操作メトリック | {'numFiles': '1', 'numOutputRows': '548176', 'numOutputBytes': '4095497'} | {'numFiles': '1', 'numOutputRows': '548176', 'numOutputBytes': '4095497'} | |
5 | 操作 | 書き込み | 書き込み |
操作パラメーター | {'mode': 'Append', 'partitionBy': '[]'} | {'mode': 'Append', 'partitionBy': '["Month"]'} | |
操作メトリック | {'numFiles': '1', 'numOutputRows': '547952', 'numOutputBytes': '4090107'} | {'numFiles': '1', 'numOutputRows': '547952', 'numOutputBytes': '4093015'} | |
4 | 操作 | 書き込み | 書き込み |
操作パラメーター | {'mode': 'Append', 'partitionBy': '[]'} | {'mode': 'Append', 'partitionBy': '["Month"]'} | |
操作メトリック | {'numFiles': '1', 'numOutputRows': '548631', 'numOutputBytes': '4093134'} | {'numFiles': '1', 'numOutputRows': '548631', 'numOutputBytes': '4094376'} | |
3 | 操作 | 書き込み | 書き込み |
操作パラメーター | {'mode': 'Append', 'partitionBy': '[]'} | {'mode': 'Append', 'partitionBy': '["Month"]'} | |
操作メトリック | {'ファイル数': '1', '出力行数': '548671', '出力バイト数': '4101221'} | {'ファイル数': '1', '出力行数': '548671', '出力バイト数': '4101221'} | |
2 | 操作 | 書き込み | 書き込み |
操作パラメーター | {'mode': 'Append', 'partitionBy': '[]'} | {'mode': 'Append', 'partitionBy': '["Month"]'} | |
操作メトリック | {'numFiles': '1', 'numOutputRows': '546530', 'numOutputBytes': '4081589'} | {'numFiles': '1', 'numOutputRows': '546530', 'numOutputBytes': '4081589'} | |
1 | 操作 | 書き込み | 書き込み |
操作パラメーター | {'mode': 'Append', 'partitionBy': '[]'} | {'mode': 'Append', 'partitionBy': '["Month"]'} | |
操作メトリック | {'numFiles': '1', 'numOutputRows': '548665', 'numOutputBytes': '4101048'} | {'numFiles': '1', 'numOutputRows': '548665', 'numOutputBytes': '4101048'} | |
0 | 操作 | 書き込み | 書き込み |
操作パラメーター | {'mode': 'Overwrite', 'partitionBy': '[]'} | {'mode': '上書き', 'partitionBy': '["月"]'} | |
操作メトリック | {'numFiles': '8', 'numOutputRows': '1000000000', 'numOutputBytes': '7700855198'} | {'numFiles': '60', 'numOutputRows': '1000000000', 'numOutputBytes': '7447681467'} |
バージョン8の操作メトリクスを見ると、非パーティション テーブルの最適化操作では約1 GBのデータに影響する8つのParquetファイルがマージされ、パーティション テーブルの最適化操作では約25 MBのデータのみに影響する7つのParquetファイルがマージされたことを指摘する価値があります。 パーティション化されたテーブルを使用することで、Direct Lake のパフォーマンスが向上することが考えられます。
考慮事項と制限事項
Direct Lake のパフォーマンスを最適化するための考慮事項と制限事項は次のとおりです。
- 増分フレーミングを維持するために、Direct Lake では大規模な Delta テーブルでの破壊的な更新パターンを避けることをお勧めします。
- 小さいデルタ テーブルは、増分フレーム用に最適化する必要はありません。
- Direct Lakeで列セグメントを作成するために、行グループのサイズを100万行から1600万行に設定します。 Direct Lake では、大きな列セグメントが優先されます。
- 大きな Parquet ファイルと行グループの数が少ない場合よりも、多数の小さな Parquet ファイルと行グループで Direct Lake のコード変換の効率が低下するため、カーディナリティの高いパーティション列は避けてください。
- コンピューティング リソースとメモリ リソースに対する予期しない需要により、セマンティック モデルがコールド状態の別の Fabric クラスター ノードに再読み込みされる可能性があります。
- Direct Lake では、データの読み込みを最適化するために行グループ\ファイルのスキップに delta\Parquet 統計は使用されません。