如果您的應用程式使用 Azure Data Lake Storage (ADLS) 或 Blob 儲存體 API, 而且需要連線到 OneLake,您可以繼續使用現有的 API。
我們會透過真實世界的鏡像範例示範如何搭配 OneLake 使用 Blob 和 ADLS API,並分享來自 OneLake 的開發人員見解。 我們探討了何時以及為何可以選擇一個 API 而不是另一個 API,以及如何充分利用每個 API。 我們涵蓋的所有模式也適用於 Azure 儲存體。
在此案例中,我們涵蓋:
- 什麼是 開放鏡像
- 如何使用 .NET Azure Blob 儲存體 和 分散式檔案系統 (DFS) 用戶端,將資料寫入開啟的鏡像登陸區域。
- 如何結合 Blob 儲存和 DFS 用戶端,以在效能非常重要的情況下上傳資料並管理 OneLake 中的資料夾。
- 如何處理從 .NET 將 parquet 資料寫入 Blob 儲存體時可能遇到的區塊 Blob 問題
- 如何使用 Azurite 模擬器在本地測試所有內容。 (是的,您針對 OneLake 撰寫的程式碼也適用於儲存體模擬器!
使用 Blob API 將 parquet 流式傳輸至 OneLake
在本節中,我們將示範如何有效率地將鑲木地板資料串流至 OneLake,特別是開放式鏡像登陸區域。 開放鏡像是一種強大的方法,可以將資料從無法使用快捷方式的專屬系統引入 Microsoft Fabric。 它處理繁重的工作,將原始數據轉換為 Delta Lake 格式、管理 更新插入、刪除向量、 優化、 清理等。 您需要做的就是將資料上傳到登陸區域,包含一個行標記,然後鏡像就會從那裡獲取它。
團隊通常會編寫自訂程式碼來從專有系統中提取資料並以開放格式輸出。 雖然開放式鏡像會將 CSV 和 Parquet 擷取到 Delta 資料表中,但如果您已經在撰寫程式碼,不妨使用 Parquet,上傳和處理更有效率。
Parquet 是一種專為分析而設計的儲存 檔案格式 。 另一方面,Delta 是建立在 Parquet 之上的表格協議。 它新增了交易保證、結構描述強制執行以及對更新和刪除的支援。 當您將 Parquet 檔案上傳至開放鏡像著陸區時,這些檔案會被匯入至 Delta 資料表,提供 ACID 語意和查詢效能優化,且無需自行管理這些複雜性。 列標記會指出每筆記錄應該如何合併到表格中,這可讓鏡像處理程序知道何時插入、更新或刪除列。
讓我們來看看一個具體(但虛構的)例子。
想像一下,您有一個 .NET 應用程式,可從內部部署稅務系統提取英國房價資料。 (需要明確的是,這是一個虛構的場景。據我所知,英國稅務局並沒有這樣做,但該數據集是 公開的 ,是一個很好的例子。此應用程式每月以 Azure 函式的形式執行,為了降低成本,它必須快速並避免使用本機磁碟。 目標是將資料以 Parquet 格式直接從來源串流到開放的鏡像登陸區域。 來自英國稅務局的 “Price Paid” 資料集包含 RecordStatus 欄位,該欄位必須對應至開放鏡像格式所需的列標記。
此案例與 Blob 儲存體 API 非常一致。 它支援串流寫入,可讓您將資料直接推送至 OneLake,而不需要在本機暫存。 這使得它變得簡單、高效且符合成本效益,特別是對於 Azure Functions 等無伺服器工作負載。 它的速度也很快:Blob API 支援平行區塊上傳,這可以在寫入大型檔案時顯著提高輸送量,但在使用 DFS 端點時不受支援。
為此,我們使用開放原始碼 Parquet.NET 程式庫,這是一個完全受控的 .NET 元件,可讓您輕鬆地即時撰寫 Parquet 資料。 此外,使用 Blob 儲存等物件存放區會引入一些細微差別,尤其是在串流和緩衝方面,這讓我們有機會在使用 Blob 儲存時探索一些細微差別。
此方法可讓您的 Azure 函式保持輕量型、快速且符合成本效益,無需本機磁碟、無預備,只需串流、序列化和上傳。
開啟映射著陸區摘要
開放鏡像著陸區就像鏡像資料表的收件匣一樣,只需新增檔案,Fabric 就會負責匯入處理。 但在這種簡單性的背後,是您的應用程式需要遵循的明確協議,以確保正確發現和處理資料。
資料夾結構
每個鏡像資料表在 OneLake 中都有專用路徑:
<workspace>/mirrored-database/Files/LandingZone/<table-name>/
這是您將寫入中繼資料和資料檔案的地方。 您不需要明確建立資料夾,只需寫入具有適當前綴的 Blob 格式的資料,結構即可被推斷出來。
步驟 1:定義表格鍵
在寫入任何資料之前,您必須在資料表的登陸區域資料夾中建立檔案 _metadata.json 。 此檔案定義用於插入或更新及刪除的主鍵欄位:
public async Task CreateTableAsync(OpenMirroredTableId table, params string[] keyColumns)
{
await using var metadataFile = await OpenWriteAsync(table, "_metadata.json");
var json = new { keyColumns };
await JsonSerializer.SerializeAsync(metadataFile, json);
}
此中繼資料檔案會告知 Fabric 如何唯一識別資料列。 如果沒有它,Fabric 將不會內嵌您的資料。
步驟 2:建立具有正確名稱的資料檔案
中繼資料就緒後,您就可以開始寫入資料了。 檔案必須使用零填補數字(例如 00000000000000000001.parquet、 00000000000000000002.parquet等)依序命名。這確保了確定性排序並避免了衝突。
清單 API 會依字母順序傳回 Blob,因此我們的邏輯可以透過處理具有平面清單的著陸區資料夾來快速找到下一個序號。 開啟鏡像會將已處理的檔案移至以 _ 為前置詞的資料夾,這些資料夾會以數值以下排序。 當您使用 Azure 儲存體清單 Blob API 時,看到所有 parquet 檔案後結束迴圈可改善效能,因為這樣可在子資料夾中列舉符合前置詞的 Blob。 如果您使用 ADLS 路徑清單 API,您可以選擇執行遞迴清單,以控制是否列出子資料夾的內容,這是階層式命名空間的明顯優勢。
判斷下一個檔案名稱的邏輯如下所示:
public async Task<MirrorDataFile> CreateNextTableDataFileAsync(OpenMirroredTableId table)
{
var (containerClient, path) = GetTableLocation(table);
var listBlobs = containerClient.GetBlobsAsync(prefix: path);
var tableFound = false;
BlobItem? lastDataFile = null;
// Parquet files will be first in the folder because other files and folders all start with an underscore.
// So we can just take the last Parquet file to get our sequence number.
await foreach (var blob in listBlobs)
{
tableFound = true;
if (blob.Name.EndsWith(".parquet") && blob.Properties.ContentLength > 0)
{
lastDataFile = blob;
}
else
{
break;
}
}
if (!tableFound)
{
throw new ArgumentException($"Table not found.", nameof(table));
}
long lastFileNumber = 0;
if (lastDataFile is not null)
{
var dataFileName = Path.GetFileName(lastDataFile.Name);
lastFileNumber = long.Parse(dataFileName.Split('.')[0]);
}
var stream = await OpenWriteAsync(table, $"{++lastFileNumber:D20}.parquet");
return new MirrorDataFile(stream)
{
FileSequenceNumber = lastFileNumber
};
}
您可能想知道 MirrorDataFile 類別的用途,我們很快就會在介紹如何可靠地使用區塊 blob 時回到這一點。
第 3 步:上傳文件內容
實際寫入是藉由開啟 Blob 的資料流來處理的:
private async Task<Stream> OpenWriteAsync(OpenMirroredTableId table, string fileName)
{
var (containerClient, path) = GetTableLocation(table);
path += fileName;
var blobClient = containerClient.GetBlobClient(path);
var stream = await blobClient.OpenWriteAsync(true);
return stream;
}
備註
鏡像資料庫支援結構描述,因此登陸區域可以包含 選擇性的結構描述資料夾 ,以表示資料表位於結構描述內。 此外,在 OneLake 中,Fabric 工作區 會對應至儲存 容器 和 ADLS 檔案系統。
public record OpenMirroredTableId(string WorkspaceName, string MirroredDatabaseName, string TableName)
{
public string? Schema { get; init; } = null;
public string GetTablePath() => Schema == null
? $"{MirroredDatabaseName}/Files/LandingZone/{TableName}/"
: $"{MirroredDatabaseName}/Files/LandingZone/{Schema}.schema/{TableName}/";
}
private (BlobContainerClient ContainerClient, string TablePath) GetTableLocation(OpenMirroredTableId table)
{
var containerClient = blobServiceClient.GetBlobContainerClient(table.WorkspaceName);
var path = table.GetTablePath();
return (containerClient, path);
}
從稅務系統讀取「已支付價格」資料的商業邏輯會在從來源串流時將每筆記錄轉換為 Parquet 格式。 每一行會直接寫入 Parquet 檔案,該檔案同時使用 CreateNextTableDataFileAsync 提供的資料流逐位元組串流至 Azure 儲存空間。 此方法可避免本機暫存,並支援有效率的無伺服器擷取。
程式碼的工作原理如下:
public async Task SeedMirrorAsync(OpenMirroredTableId tableId, CancellationToken cancellationToken = default)
{
await openMirroringWriter.CreateTableAsync(tableId, PricePaidMirroredDataFormat.KeyColumns);
var data = pricePaidDataReader.ReadCompleteData(cancellationToken);
await using var mirrorDataFile = await openMirror.CreateNextTableDataFileAsync(tableId);
await mirrorDataFile.WriteData(async stream => await data.WriteAsync(stream, Settings.RowsPerRowGroup, cancellationToken));
}
此 WriteAsync 方法會將資料序列化為 Parquet 格式,逐個列群組:
public static async Task WriteAsync(this IAsyncEnumerable<PricePaid> data, Stream resultStream, int rowsPerRowGroup = 10000, CancellationToken cancellationToken = default)
{
await using var parquetWriter = await ParquetWriter.CreateAsync(
PricePaidMirroredDataFormat.CreateSchema(),
resultStream,
cancellationToken: cancellationToken);
await foreach (var chunk in data
.Select(PricePaidMirroredDataFormat.Create)
.ChunkAsync(rowsPerRowGroup, cancellationToken))
{
await ParquetSerializer.SerializeRowGroupAsync(parquetWriter, chunk, cancellationToken);
}
}
每筆 PricePaid 記錄都會轉換為一個 PricePaidMirroredDataFormat 物件,其中包含必填 __rowMarker__ 欄位:
public static PricePaidMirroredDataFormat Create(PricePaid pricePaid)
{
var recordMarker = pricePaid.RecordStatus.Value switch
{
RecordStatus.Added => 0,
RecordStatus.Changed => 1,
RecordStatus.Deleted => 2,
_ => throw new InvalidEnumArgumentException("Unexpected RecordStatus value")
};
return new PricePaidMirroredDataFormat
{
TransactionId = pricePaid.TransactionId,
Price = pricePaid.Price,
...
__rowMarker__ = recordMarker
};
}
該協議簡單且確定性。 它避免了不必要的 API 調用,與批次和流管線無縫協作,並與 Azure Functions 等無服務器環境順利集成。 將 blob 上傳至前綴會自動建立父資料夾,並保持與存儲模擬器的相容性,詳情請參閱測試章節。
第四步:自己清理
開啟鏡像成功處理資料後,它會將原始檔案移至_ProcessedFiles並_FilesReadyToDelete,並新增_FilesReadyToDelete.json檔案到特殊資料夾中。 雖然 Fabric 會在 7 天後自動刪除這些檔案,但如果您鏡像大量資料,該保留期限可能會導致顯著的儲存成本。
為了降低成本,一旦您確信資料已被擷取,就可以主動刪除這些資料夾。 這是 ADLS API 的絕佳使用案例,它支援目錄的原子性刪除,比逐個列舉和刪除 Blob 以及更新 _FilesReadyToDelete.json 檔案更有效率。
具體操作方法如下:
public async Task CleanUpTableAsync(OpenMirroredTableId tableId)
{
var (fileSystemClient, tablePath) = GetTableLocation(tableId);
var foldersToDelete = new []
{
"_ProcessedFiles",
"_FilesReadyToDelete"
};
await Parallel.ForEachAsync(foldersToDelete, async (path, _) =>
{
var fullPath = $"{tablePath}{path}/";
var directoryClient = fileSystemClient.GetDirectoryClient(fullPath);
if (await directoryClient.ExistsAsync())
{
await directoryClient.DeleteAsync();
}
});
}
private (DataLakeFileSystemClient FileSystemClient, string TablePath) GetTableLocation(OpenMirroredTableId table)
{
var containerClient = dataLakeServiceClient.GetFileSystemClient(table.WorkspaceName);
var path = $"{table.MirroredDatabaseName}/Files/LandingZone/{table.TableName}/";
return (containerClient, path);
}
可靠地使用 Blob 區塊
我選擇這個例子是因為它讓我們可以討論區塊 Blob 的一些細節。 這些不是 OneLake 特定的,但由於 OneLake 是以 Azure 儲存體為基礎,因此 OneLake 也會公開這些細微差別。
例如:.NET 儲存體 SDK 會公開 OpenWrite 傳回串流的 API。 超級方便。 如上面的範例所示,該串流非常適合 Parquet.NET API。 它還使測試變得輕而易舉——您可以輕鬆地替換單元測試中的流,而無需僅僅為了可測試性而構建額外的抽象。
[Test]
public async Task when_writing_price_paid_data_to_parquet()
{
var row = new PricePaid
{
TransactionId = "{34222872-B554-4D2B-E063-4704A8C07853}",
Price = 375000,
DateOfTransfer = new DateTime(2004, 4, 27),
Postcode = "SW13 0NP",
PropertyType = PropertyType.Detached,
IsNew = true,
DurationType = DurationType.Freehold,
PrimaryAddressableObjectName = "10A",
SecondaryAddressableObjectName = string.Empty,
Street = "THE TERRACE",
Locality = string.Empty,
TownCity = "LONDON",
District = "RICHMOND UPON THAMES",
County = "GREATER LONDON",
CategoryType = CategoryType.AdditionalPricePaid,
RecordStatus = RecordStatus.Added
};
using var memoryStream = new MemoryStream();
await new[] { row }.ToAsyncEnumerable().WriteAsync(memoryStream);
var readData = await PricePaidMirroredDataFormat.Read(memoryStream).SingleAsync();
Assert.Multiple(() =>
{
Assert.That(readData.TransactionId, Is.EqualTo(row.TransactionId));
Assert.That(readData.Price, Is.EqualTo(row.Price));
Assert.That(readData.DateOfTransfer, Is.EqualTo(row.DateOfTransfer));
// more assertions
Assert.That(readData.__rowMarker__, Is.EqualTo(0)); // RecordStatus.Added
});
}
但這裡有一個問題:Blob 儲存體與本機檔案系統不同。
呼叫 flush – 提交區塊
若要瞭解 Parquet.NET 中的呼叫為何 Flush() 重要,我們需要快速繞道瞭解 Parquet 檔案的結構方式,以及它如何與 Blob 儲存體區塊 Blob API 互動。
Parquet 檔案基本概念
Parquet 是一種單欄式儲存格式,專為高效分析而設計。 Parquet 檔案由以下部分組成:
- 資料列群組:這些是核心構造塊。 每個列群組包含一塊列,依欄組織。 行群組會以順序且獨立的方式寫入。
- 資料行區塊:在每個資料列群組中,資料會逐欄儲存。
- 中繼資料:在檔案結尾,Parquet 會寫入頁尾,其中包含用於快速讀取的結構描述和偏移量資訊。
在 Parquet.NET 中,每次完成資料列群組時,程式庫都會在輸出流呼叫
Flush()。 當您寫入 Blob 儲存體時,事情就會變得有趣。
區塊 blob 模型
Blob 儲存體,因此 OneLake 會使用區塊 Blob 模型,其運作方式如下:
- 您上傳區塊:每個區塊的大小最多可達 4,000 MiB (預設為 4 MiB)。 您可以平行上傳區塊以最大化吞吐量,而 .NET Blob 用戶端會自動為您執行此操作,這很棒。 這是使用 Blob 用戶端 (而非 DFS 用戶端) 進行上傳的原因之一。 DFS 用戶端不支援平行區塊上傳,這可能是效能瓶頸。 也就是說,DFS 用戶端仍然可以正常讀取產生的檔案。
- 您提交區塊:上傳所有區塊後,您會呼叫 CommitBlockList()來完成 Blob。 每個 Blob 最多可以上傳 50,000 個區塊,這表示如果您使用 4,000 MiB 區塊,理論上可以寫入單一 190.7 TiB 檔案。 (並不是說我會推薦它。如果您想要深入瞭解,請閱讀這篇文章 瞭解區塊 Blob、附加 Blob 和頁面 Blob。
這是問題
當 Parquet.NET 在每個資料列群組後呼叫 Flush(),而您正寫入由 Blob 儲存體支援的資料流時,該 flush 會觸發 BlockList 認可。 隨著檔案的成長,每新增一個資料列群組都會導致程式庫模組重新提交所有先前上傳的區塊。
假設您正在使用 4MB 區塊編寫一個具有 100MB 列組的 1 GB 檔案。 這總共給了你 250 個區塊。 在第一次清空時,您提交了 25 個區塊。 第二場,50. 到最終階段時,您已經提交了所有 250 個區塊。 將所有 10 個列組相加,您總共提交了 1,375 個區塊,儘管最終所需提交的檔案只有一個。
這是低效的,而且會引進兩個關鍵問題:
- 逾時和重試。 大型封鎖清單可能會導致逾時。 請記住,Blob 儲存體是以 HTTP 為基礎建置的。 它很可靠,但並不完美。 逾時可能會在伺服器上成功,但您的用戶端不知道這一點,因此會重試。 重試可能會導致 409 衝突。 做昂貴的事情 250 次而不是一次會增加這種情況發生的機會。 更少的提交次數 = 更少的重試次數 = 更少的麻煩。
- Blob 的可見度過早。 Blob API 的優點之一是,在您明確認可 BlockList 之前,區塊不會變得可見。 這非常適合 Parquet 等案例,在這些案例中,檔案在寫入頁尾元資料之前無效。 這意味著下游程序不會誤讀寫入一半的文件。 但這裡有一個轉折點:因為 Parquet.NET 在每個列組之後調用
Flush(),並且該刷新會觸發提交,因此 blob 在文件完成之前變得可見。 因此,即使 Blob API 旨在幫助您避免此問題,Parquet.NET 與 Stream 的配合方式也會引入問題,除非您採取措施防止它。
如要縮小 Parquet.NET 與 Blob 儲存體細微差別之間的差距,請勿在BlobFile.BlobStream實作中呼叫Flush()。 如此一來,即使 Parquet.NET 在每個列群組後呼叫 Flush,底層流仍不會排清至儲存體。
儲存體用戶端 (DFS 和 Blob) 會在呼叫 OpenWrite 時建立空白檔案。 開放的鏡像處理器在遇到 0 位元組檔案時會記錄錯誤,這種情況可能發生在複製程序與檔案創建交錯時。 若要避免這種情況,請將檔案寫入另一個位置,並將它移至正確的路徑,ADLS API 透過 重新命名作業支援該路徑,如下所示:
public async Task<BlobFile> CreateFileAsync(string filePath)
{
if (filePath is null)
{
throw new ArgumentNullException(nameof(filePath), "File path cannot be null.");
}
var containerClient = client.blobServiceClient.GetBlobContainerClient(containerName);
var temporaryPath = $"_{filePath}.temp"!;
var blobClient = containerClient.GetBlobClient(Combine(temporaryPath));
var blobStream = await blobClient.OpenWriteAsync(overwrite: true);
return new BlobFile(blobStream, GetChildPath(temporaryPath), Combine(filePath)!);
}
執行 dispose 函數 – 認可區塊並將檔案移動
以下是另一個需要注意的微妙但重要行為:當您使用 Azure Blob 資料流時,呼叫 .Dispose()(或讓它自動釋放)會提交所有已上傳的區塊。
這通常是您想要的,但並非總是如此。
假設您的來源系統正在使用 IAsyncEnumerable 串流資料,就像範例中為了說明錯誤所做的。 如果資料來源中途發生故障,例如資料庫連線逾時或網路中斷,您可能會只獲得部分寫入的 Parquet 檔案。 但是,如果流被處置(由於等待使用或使用塊,它會這樣做),則會提交這些部分塊。
為了避免這種情況,範例程式碼只會在整個作業成功完成時明確認可一次。 這樣,如果製作程式在中途失敗,您就不會留下一個未寫完的檔案。
這就是為什麼範例會從 CreateNextTableDataFileAsync 傳回 BlobFile 物件,以封裝串流來防止零位元組檔案,並解決與 Flush 和 Dispose 認可部分 Parquet 檔案相關的問題。
public class BlobFile(Stream stream, IStoragePath temporaryFilePath, string finalFilePath) : IAsyncDisposable
{
private readonly BlobStream stream = new(stream, temporaryFilePath, finalFilePath);
public async Task WriteData(Func<Stream, Task> writeOperation)
{
try
{
await writeOperation(stream);
}
catch (Exception)
{
stream.Failed();
throw;
}
}
public async ValueTask DisposeAsync()
{
await stream.DisposeAsync();
}
private class BlobStream(Stream innerStream, IStoragePath temporaryFilePath, string finalFilePath) : Stream
{
private bool disposed = false;
private bool success = true;
public override bool CanRead => innerStream.CanRead;
public override bool CanSeek => innerStream.CanSeek;
public override bool CanWrite => innerStream.CanWrite;
public override long Length => innerStream.Length;
public override long Position
{
get => innerStream.Position;
set => innerStream.Position = value;
}
public override void Flush()
{
// no-op
}
public override int Read(byte[] buffer, int offset, int count) => innerStream.Read(buffer, offset, count);
public override long Seek(long offset, SeekOrigin origin) => innerStream.Seek(offset, origin);
public override void SetLength(long value) => innerStream.SetLength(value);
public override void Write(byte[] buffer, int offset, int count) => innerStream.Write(buffer, offset, count);
public void Failed()
{
success = false;
}
public override async ValueTask DisposeAsync()
{
if (disposed)
{
return;
}
if (success)
{
await innerStream.DisposeAsync();
await temporaryFilePath.RenameAsync(finalFilePath);
}
disposed = true;
}
}
}
使用 Azurite 模擬器在 OneLake 上撰寫測試
OneLake 的一大優點是它是以 Azure 儲存體為基礎,這表示您可以使用 Azurite 模擬器在本機測試整合程式碼。 這可讓您輕鬆撰寫可靠的測試,以模擬 OneLake/Azure 儲存體的行為,而不需要即時 Fabric 環境或雲端資源。
Azurite 模擬 Blob 存儲 API,這正是 OneLake 公開的。 這表示您在生產中使用的相同程式碼可以在測試套件中原封不動地執行。 您可以將 Azurite 設定為本地過程或容器,將您的 BlobServiceClient 指向它,即可開始使用。
這對於單元和集成測試特別有用。 您可以:
- 檢查您的
_metadata.json是否正確寫入。 - 檢查您的檔案命名邏輯是否產生預期的順序。
- 模擬因呼叫
Flush和Dispose失敗而導致的部分寫入。 這允許測試上述細微差別,並確保實現適當地處理它們。 - 斷言您的 Parquet 序列化來回完全符合要求。
Azurite 不支援 ADFS API。 這就是為什麼上述BlobFile使用介面來透過 blob API 實作 ADFS 功能,IStoragePath以便它們可以與正在測試的模擬器協同運作。 以下是範例:
public async Task RenameAsync(string newPath)
{
async Task RenameDirectoryAsync(DataLakeServiceClient dataLakeServiceClient)
{
var fileSystemClient = dataLakeServiceClient.GetFileSystemClient(containerName);
var directoryClient = fileSystemClient.GetDirectoryClient(path);
await directoryClient.RenameAsync(newPath);
}
async Task CopyThenDeleteAsync(BlobServiceClient blobServiceClient)
{
var sourceBlob = blobServiceClient.GetBlobContainerClient(containerName).GetBlobClient(path);
var destinationBlob = blobServiceClient.GetBlobContainerClient(containerName).GetBlobClient(newPath);
await destinationBlob.StartCopyFromUriAsync(sourceBlob.Uri);
await sourceBlob.DeleteIfExistsAsync();
}
StorageOperation operation = new()
{
WithFlatNamespace = CopyThenDeleteAsync,
WithHierarchicalNamespace = RenameDirectoryAsync
};
await operation.Execute(client);
}
測試邊界情況:清空並釋放
稍早,我們討論了 Parquet.NET 在使用 Flush() 和 Dispose() 寫入 OneLake 時如何可能導致 blob 過早提交或部分提交。 這些行為很微妙,但可以測試。
以下是一些測試,可驗證鏡像邏輯是否正確處理這些案例:
[Test]
public async Task it_should_write_data_to_table()
{
await setup.FabricPricePaidMirror.SeedMirrorAsync(setup.TableId);
var mirroredData = await GetMirroredBlobItem();
var mirroredDataClient = setup.WorkspaceContainer.GetBlobClient(mirroredData!.Name);
var mirroredDataContents = await mirroredDataClient.DownloadContentAsync();
var readData = await PricePaidMirroredDataFormat.Read(mirroredDataContents.Value.Content.ToStream()).SingleAsync();
Assert.Multiple(() =>
{
Assert.That(mirroredData, Is.Not.Null);
Assert.That(mirroredData!.Properties.ContentLength, Is.GreaterThan(0));
Assert.That(readData.TransactionId, Is.EqualTo(setup.PricePaidReader.TransactionId));
// ... more assertions ...
});
}
此測試確認成功寫入會產生正確往返的有效非空白 Parquet 檔案。
現在對於失敗案例:
[Test]
public async Task it_should_not_commit_partially_written_data()
{
long? lengthDuringWrite = null;
setup.PricePaidReader.ActionBetweenRowGroups = async () =>
{
var mirroredData = await GetMirroredBlobItem();
lengthDuringWrite = mirroredData!.Properties.ContentLength;
};
await setup.FabricPricePaidMirror.SeedMirrorAsync(setup.TableId);
Assert.That(lengthDuringWrite, Is.EqualTo(0));
}
此測試會模擬中間資料流程例外狀況,並確認在寫入進行中時,由於 no-op Flush() 覆寫而沒有部分資料可見。
最後,失敗路徑:
[Test]
public async Task after_a_row_group_is_written_it_should_not_leave_a_partially_complete_blob()
{
setup.PricePaidReader.ThrowsAfterFirstRowGroup = true;
var previouslyMirroredFile = await GetMirroredBlobItem();
var threw = true;
try
{
await setup.FabricPricePaidMirror.SeedMirrorAsync(setup.TableId);
}
catch (Exception)
{
threw = true;
}
var mirroredData = await GetMirroredBlobItem();
var mirroredTemporaryData = await GetMirroredBlobTemporaryItem();
Assert.Multiple(() =>
{
Assert.That(threw, Is.True);
if (previouslyMirroredFile == null)
{
Assert.That(mirroredData, Is.Null);
}
else
{
Assert.That(mirroredData!.Name, Is.EqualTo(previouslyMirroredFile.Name));
}
Assert.That(mirroredTemporaryData, Is.Not.Null);
Assert.That(mirroredTemporaryData!.Properties.ContentLength, Is.EqualTo(0));
});
}
此測試確認,即使資料流因例外狀況而處置,也不會部分寫入新的鏡像檔案。
這些測試提供了鏡像邏輯穩健的信心,即使在故障條件下也是如此。 它們示範如何使用模擬器來模擬實際行為,而不需要完整的 Fabric (或 Azure) 環境。
而僅僅為了證明相容性,將測試設置調整為指向 Fabric 工作區,這裡有一個包含英國房價數據的表格。
public class when_using_fabric
{
public class in_success_cases : when_copying_to_mirror_successfully
{
[SetUp]
public void UseFabric() => setup = TestSetup.UsingFabric();
}
public class in_failure_cases : when_copying_to_mirror_fails
{
[SetUp]
public void UseFabric() => setup = TestSetup.UsingFabric();
}
}
public static TestSetup UsingFabric()
{
var blobServiceClient = new BlobServiceClient(new Uri("https://onelake.blob.fabric.microsoft.com/"), new DefaultAzureCredential());
var pricePaidReader = new TestPricePaidReader();
var tableId = new OpenMirroredTableId($"TestWorkspace", "HousePriceOpenMirror.MountedRelationalDatabase", "PricePaid");
var workspaceContainer = blobServiceClient.GetBlobContainerClient(tableId.WorkspaceName);
var fabricPricePaidMirror = new FabricPricePaidMirror(new FabricOpenMirroringWriter(blobServiceClient), pricePaidReader)
{
Settings = new FabricPricePaidMirrorSettings { RowsPerRowGroup = 1 }
};
return new TestSetup
{
BlobServiceClient = blobServiceClient,
PricePaidReader = pricePaidReader,
TableId = tableId,
WorkspaceContainer = workspaceContainer,
FabricPricePaidMirror = fabricPricePaidMirror
};
}
總結
如果您要在 OneLake 上建置新的管線,特別是針對串流或無伺服器工作負載,ADLS 和 Blob 儲存體 API 會如預期般運作。 它們快速、靈活,並且與開放鏡像無縫協作。 藉由遵循登陸區域通訊協定並處理區塊 Blob 和檔案系統差異,您可以建置健全、可測試的整合,這些整合在生產環境中的運作方式與在本機模擬器中一樣好。 最重要的是,您無需重寫應用程式或與檔案系統作鬥爭。 只需串流、序列化並上傳到 OneLake。