Poznámka
Na prístup k tejto stránke sa vyžaduje oprávnenie. Môžete sa skúsiť prihlásiť alebo zmeniť adresáre.
Na prístup k tejto stránke sa vyžaduje oprávnenie. Môžete skúsiť zmeniť adresáre.
Ak vaša aplikácia používa rozhrania API Azure Data Lake Storage (ADLS) alebo Blob Storage a potrebuje sa pripojiť k OneLake, môžete pokračovať v používaní existujúcich rozhraní API.
Ukazujeme, ako sa rozhrania Blob a ADLS API používajú s OneLake prostredníctvom príkladu zrkadlenia z reálneho sveta a zdieľame poznatky vývojárov z OneLake. Skúmame, kedy a prečo si môžete vybrať jedno API pred druhým a ako z každého vyťažiť maximum. Všetky vzory, ktoré pokrývame, sa vzťahujú aj na úložisko Azure Storage.
V tomto scenári pokrývame:
- Čo je otvorené zrkadlenie
- Ako používať klientov .NET Azure Blob Storage a distribuovaného systému súborov (DFS) na zápis údajov do cieľovej zóny otvoreného zrkadla.
- Ako skombinovať klientov Blob Storage a DFS na nahrávanie údajov a správu priečinkov vo OneLake, najmä ak záleží na výkone
- Ako spracovať scenáre, ktoré sa objavia s objektmi blob bloku pri zápise údajov Parquet do úložiska objektov BLOB z .NET
- Ako otestovať všetko lokálne pomocou emulátora Azurite. (Áno – kód, ktorý napíšete proti OneLake, funguje aj s emulátorom úložiska!)
Streamovanie parkiet do OneLake pomocou rozhraní Blob API
V tejto časti ukazujeme, ako efektívne streamovať údaje o parketách do OneLake, najmä do otvorenej zrkadlovej pristávacej zóny. Otvorené zrkadlenie je výkonný spôsob, ako preniesť údaje z proprietárnych systémov, v ktorých nie je možné použiť skratky , do služby Microsoft Fabric. Zvláda ťažkú prácu, konverziu nespracovaných údajov do formátu Delta Lake, správu upsertov, mazanie vektorov,optimalizáciu, vysávanie a ďalšie. Všetko, čo musíte urobiť, je nahrať údaje do vstupnej zóny, zahrnúť značku riadka a zrkadlenie ich odtiaľ prevezme.
Je bežné, že tímy píšu vlastný kód na extrahovanie údajov z proprietárnych systémov a ich výstup v otvorenom formáte. Zatiaľ čo otvorené zrkadlenie prijíma CSV aj Parquet do tabuliek Delta, ak už píšete kód, môžete tiež použiť Parquet, nahrávanie a spracovanie je efektívnejšie.
Parquet je formát úložného súboru určený na analýzu. Delta je na druhej strane stolový protokol postavený na Parquete. Pridáva transakčné záruky, vynucovanie schémy a podporu aktualizácií a odstránení. Keď nahráte súbory Parquet do otvorenej cieľovej zóny zrkadlenia, tieto súbory sa ingestujú do tabuliek Delta, čím sa prinesie sémantika ACID a optimalizácia výkonu dotazov bez toho, aby ste museli tieto zložitosti spravovať sami. Značka riadka označuje, ako by sa mal každý záznam zlúčiť do tabuľky, čo umožňuje procesu zrkadlenia vedieť, kedy vložiť, aktualizovať alebo odstrániť riadky.
Poďme si prejsť konkrétnym (ale fiktívnym) príkladom.
Predstavte si, že máte aplikáciu .NET, ktorá získava údaje o cenách domov v Spojenom kráľovstve z lokálneho daňového systému. (Aby bolo jasné, toto je vymyslený scenár. Britský daňový úrad to nerobí, pokiaľ viem, ale súbor údajov je verejne dostupný a je dobrým príkladom.) Táto aplikácia beží mesačne ako funkcia Azure a na udržanie nízkych nákladov musí byť rýchla a vyhnúť sa používaniu lokálneho disku. Cieľom je teda streamovať údaje priamo zo zdroja do otvorenej zrkadlovej pristávacej zóny vo formáte Parquet. Množina údajov Zaplatená cena od daňového úradu Spojeného kráľovstva obsahuje pole RecordStatus , ktoré musí byť priradené k značke riadka vyžadovanej formátom otvoreného zrkadlenia.
Tento scenár je dobre v súlade s rozhraním API služby Blob Storage. Podporuje streamovanie zápisov, čo vám umožňuje posielať údaje priamo do OneLake bez toho, aby ste ich lokálne pripravovali. Vďaka tomu je jednoduchý, efektívny a nákladovo efektívny, najmä pre bezserverové pracovné zaťaženia, ako je Azure Functions. Je tiež rýchly: rozhranie Blob API podporuje nahrávanie paralelných blokov, čo môže výrazne zvýšiť priepustnosť pri zápise veľkých súborov – a nie je podporované pri používaní koncového bodu DFS.
Na tento účel používame knižnicu Parquet.NET s otvoreným zdrojovým kódom, plne spravovanú zostavu .NET, ktorá uľahčuje zápis údajov Parquet za chodu. Práca s úložiskom objektov, ako je Blob Storage, tiež prináša niekoľko nuancií, najmä v oblasti streamovania a ukladania do vyrovnávacej pamäte, čo nám dáva príležitosť preskúmať niektoré nuansy pri práci s úložiskom objektov BLOB.
Tento prístup udržuje vašu funkciu Azure ľahkú, rýchlu a nákladovo efektívnu, bez lokálneho disku, bez prípravy, stačí streamovať, serializovať a nahrávať.
Otvorenie súhrnu pristávacej zóny zrkadlenia
Otvorená zrkadlová vstupná zóna funguje ako doručená pošta pre zrkadlené tabuľky, pridajte súbory a Fabric sa postará o príjem. Za touto jednoduchosťou sa však skrýva jasný protokol, ktorý musí vaša aplikácia dodržiavať, aby sa zabezpečilo správne objavenie a spracovanie údajov.
Štruktúra priečinkov
Každá zrkadlová tabuľka má v OneLake vyhradenú cestu:
<workspace>/mirrored-database/Files/LandingZone/<table-name>/
Tu budete písať metadáta aj dátové súbory. Nemusíte explicitne vytvárať priečinky, stačí napísať objekty blob s príslušnou predponou a štruktúra je odvodená.
Krok 1: Deklarujte kľúče tabuľky
Pred zápisom _metadata.json akýchkoľvek údajov musíte vytvoriť súbor v priečinku cieľovej zóny tabuľky. Tento súbor definuje kľúčové stĺpce používané pre upserty a odstránenia:
public async Task CreateTableAsync(OpenMirroredTableId table, params string[] keyColumns)
{
await using var metadataFile = await OpenWriteAsync(table, "_metadata.json");
var json = new { keyColumns };
await JsonSerializer.SerializeAsync(metadataFile, json);
}
Tento súbor metaúdajov informuje o technológii Fabric, ako jednoznačne identifikovať riadky. Bez neho nebude Fabric prijímať vaše údaje.
Krok 2: Vytvorenie dátových súborov so správnym názvom
Po vytvorení metaúdajov môžete začať zapisovať údaje. Súbory musia byť pomenované postupne pomocou čísel s nulovým polstrovaním ako 00000000000000000001.parquet, 00000000000000000002.parquetatď. Tým sa zabezpečí deterministické usporiadanie a zabráni sa kolíziám.
Rozhrania API zoznamu vracajú objekty blob podľa abecedy, takže naša logika môže rýchlo nájsť ďalšie poradové číslo spracovaním priečinka cieľovej zóny s plochým zoznamom. Otvorené zrkadlenie presunie spracované súbory do priečinkov s predponou _, ktoré sú zoradené pod číselnými hodnotami. Ukončenie slučky po zobrazení všetkých súborov Parquet zlepšuje výkon, keď používate rozhranie Azure Storage List Blob API , ktoré by vymenovalo objekty blob v podpriečinkoch, ktoré sa zhodujú s predponou. Ak používate rozhranie API zoznamu ciest ADLS, môžete sa rozhodnúť vykonať rekurzívny zoznam, ktorý umožňuje kontrolovať, či sa má alebo nemá zobrazovať obsah podpriečinkov – to je jasná výhoda hierarchického menného priestoru.
Logika na určenie ďalšieho názvu súboru vyzerá takto:
public async Task<MirrorDataFile> CreateNextTableDataFileAsync(OpenMirroredTableId table)
{
var (containerClient, path) = GetTableLocation(table);
var listBlobs = containerClient.GetBlobsAsync(prefix: path);
var tableFound = false;
BlobItem? lastDataFile = null;
// Parquet files will be first in the folder because other files and folders all start with an underscore.
// So we can just take the last Parquet file to get our sequence number.
await foreach (var blob in listBlobs)
{
tableFound = true;
if (blob.Name.EndsWith(".parquet") && blob.Properties.ContentLength > 0)
{
lastDataFile = blob;
}
else
{
break;
}
}
if (!tableFound)
{
throw new ArgumentException($"Table not found.", nameof(table));
}
long lastFileNumber = 0;
if (lastDataFile is not null)
{
var dataFileName = Path.GetFileName(lastDataFile.Name);
lastFileNumber = long.Parse(dataFileName.Split('.')[0]);
}
var stream = await OpenWriteAsync(table, $"{++lastFileNumber:D20}.parquet");
return new MirrorDataFile(stream)
{
FileSequenceNumber = lastFileNumber
};
}
Možno sa pýtate, na čo slúži trieda MirrorDataFile, k tomu sa vrátime o chvíľu, keď sa budeme zaoberať tým, ako spoľahlivo pracovať s blobmi blokov.
Krok 3: Nahrajte obsah súboru
Skutočný zápis sa spracuje otvorením streamu do objektu blob:
private async Task<Stream> OpenWriteAsync(OpenMirroredTableId table, string fileName)
{
var (containerClient, path) = GetTableLocation(table);
path += fileName;
var blobClient = containerClient.GetBlobClient(path);
var stream = await blobClient.OpenWriteAsync(true);
return stream;
}
Nota
Zrkadlené databázy podporujú schémy, takže cieľová zóna môže obsahovať voliteľný priečinok schémy , ktorý označuje, že tabuľka je v rámci schémy. V OneLake sú pracovné priestory Fabric priradené k úložným kontajnerom a súborovým systémom ADLS.
public record OpenMirroredTableId(string WorkspaceName, string MirroredDatabaseName, string TableName)
{
public string? Schema { get; init; } = null;
public string GetTablePath() => Schema == null
? $"{MirroredDatabaseName}/Files/LandingZone/{TableName}/"
: $"{MirroredDatabaseName}/Files/LandingZone/{Schema}.schema/{TableName}/";
}
private (BlobContainerClient ContainerClient, string TablePath) GetTableLocation(OpenMirroredTableId table)
{
var containerClient = blobServiceClient.GetBlobContainerClient(table.WorkspaceName);
var path = table.GetTablePath();
return (containerClient, path);
}
Obchodná logika, ktorá číta údaje o cene zaplatenej zo systému Inland Revenue konvertuje každý záznam do formátu Parquet pri jeho streamovaní zo zdroja. Každý riadok sa zapíše priamo do súboru Parquet, ktorý sa súčasne streamuje bajt po bajte do služby Azure Storage pomocou streamu vráteného službou CreateNextTableDataFileAsync. Tento prístup zabraňuje lokálnemu prípravu a podporuje efektívny príjem bez servera.
Kód funguje takto:
public async Task SeedMirrorAsync(OpenMirroredTableId tableId, CancellationToken cancellationToken = default)
{
await openMirroringWriter.CreateTableAsync(tableId, PricePaidMirroredDataFormat.KeyColumns);
var data = pricePaidDataReader.ReadCompleteData(cancellationToken);
await using var mirrorDataFile = await openMirror.CreateNextTableDataFileAsync(tableId);
await mirrorDataFile.WriteData(async stream => await data.WriteAsync(stream, Settings.RowsPerRowGroup, cancellationToken));
}
Metóda WriteAsync serializuje údaje do formátu Parquet, riadok skupina podľa skupiny riadkov:
public static async Task WriteAsync(this IAsyncEnumerable<PricePaid> data, Stream resultStream, int rowsPerRowGroup = 10000, CancellationToken cancellationToken = default)
{
await using var parquetWriter = await ParquetWriter.CreateAsync(
PricePaidMirroredDataFormat.CreateSchema(),
resultStream,
cancellationToken: cancellationToken);
await foreach (var chunk in data
.Select(PricePaidMirroredDataFormat.Create)
.ChunkAsync(rowsPerRowGroup, cancellationToken))
{
await ParquetSerializer.SerializeRowGroupAsync(parquetWriter, chunk, cancellationToken);
}
}
Každý PricePaid záznam sa transformuje na PricePaidMirroredDataFormat objekt, ktorý obsahuje požadované __rowMarker__ pole:
public static PricePaidMirroredDataFormat Create(PricePaid pricePaid)
{
var recordMarker = pricePaid.RecordStatus.Value switch
{
RecordStatus.Added => 0,
RecordStatus.Changed => 1,
RecordStatus.Deleted => 2,
_ => throw new InvalidEnumArgumentException("Unexpected RecordStatus value")
};
return new PricePaidMirroredDataFormat
{
TransactionId = pricePaid.TransactionId,
Price = pricePaid.Price,
...
__rowMarker__ = recordMarker
};
}
Tento protokol je jednoduchý a deterministický. Vyhýba sa zbytočným volaniam rozhrania API, bezproblémovo spolupracuje s dávkovými aj streamovacími kanálmi a plynulo sa integruje s bezserverovými prostrediami, ako je Azure Functions. Nahraním objektu blob do predpony sa automaticky vytvoria nadradené priečinky a zostanú kompatibilné s emulátorom úložiska – viac o tom v časti testovania.
Krok 4: Upratujte po sebe
Keď otvorené zrkadlenie úspešne spracuje vaše údaje, presunie pôvodné súbory do špeciálnych priečinkov _ProcessedFiles a _FilesReadyToDeletepridá _FilesReadyToDelete.json súbor. Zatiaľ čo Fabric automaticky odstráni tieto súbory po siedmich dňoch, táto lehota na uchovávanie môže viesť k značným nákladom na ukladanie, ak zrkadlíte veľké objemy údajov.
Ak chcete znížiť náklady, môžete tieto priečinky proaktívne odstrániť, keď si budete istí, že údaje boli prijaté. Toto je skvelý prípad použitia rozhrania ADLS API, ktoré podporuje odstraňovanie atomických adresárov – oveľa efektívnejšie ako vymenovanie a odstraňovanie jednotlivých objektov blob a aktualizácia súboru _FilesReadyToDelete.json .
Tu je návod, ako na to:
public async Task CleanUpTableAsync(OpenMirroredTableId tableId)
{
var (fileSystemClient, tablePath) = GetTableLocation(tableId);
var foldersToDelete = new []
{
"_ProcessedFiles",
"_FilesReadyToDelete"
};
await Parallel.ForEachAsync(foldersToDelete, async (path, _) =>
{
var fullPath = $"{tablePath}{path}/";
var directoryClient = fileSystemClient.GetDirectoryClient(fullPath);
if (await directoryClient.ExistsAsync())
{
await directoryClient.DeleteAsync();
}
});
}
private (DataLakeFileSystemClient FileSystemClient, string TablePath) GetTableLocation(OpenMirroredTableId table)
{
var containerClient = dataLakeServiceClient.GetFileSystemClient(table.WorkspaceName);
var path = $"{table.MirroredDatabaseName}/Files/LandingZone/{table.TableName}/";
return (containerClient, path);
}
Spoľahlivá práca s blokovými objektmi blob
Tento príklad som si vybral, pretože otvára dvere na rozhovor o niektorých nuansách s blokovými blobmi. Nie sú špecifické pre OneLake, ale keďže OneLake je postavený na Azure Storage, OneLake odhaľuje aj tieto nuansy.
Jeden taký prípad: súprava .NET Storage SDK sprístupňuje rozhranie OpenWrite API, ktoré vracia Stream. Super šikovné. Ako je znázornené v príklade vyššie, tento stream pekne zapadá do Parquet.NET API. Vďaka tomu je testovanie hračkou – stream môžete ľahko nahradiť v jednotkových testoch bez toho, aby ste museli vytvárať ďalšie abstrakcie len kvôli testovateľnosti.
[Test]
public async Task when_writing_price_paid_data_to_parquet()
{
var row = new PricePaid
{
TransactionId = "{34222872-B554-4D2B-E063-4704A8C07853}",
Price = 375000,
DateOfTransfer = new DateTime(2004, 4, 27),
Postcode = "SW13 0NP",
PropertyType = PropertyType.Detached,
IsNew = true,
DurationType = DurationType.Freehold,
PrimaryAddressableObjectName = "10A",
SecondaryAddressableObjectName = string.Empty,
Street = "THE TERRACE",
Locality = string.Empty,
TownCity = "LONDON",
District = "RICHMOND UPON THAMES",
County = "GREATER LONDON",
CategoryType = CategoryType.AdditionalPricePaid,
RecordStatus = RecordStatus.Added
};
using var memoryStream = new MemoryStream();
await new[] { row }.ToAsyncEnumerable().WriteAsync(memoryStream);
var readData = await PricePaidMirroredDataFormat.Read(memoryStream).SingleAsync();
Assert.Multiple(() =>
{
Assert.That(readData.TransactionId, Is.EqualTo(row.TransactionId));
Assert.That(readData.Price, Is.EqualTo(row.Price));
Assert.That(readData.DateOfTransfer, Is.EqualTo(row.DateOfTransfer));
// more assertions
Assert.That(readData.__rowMarker__, Is.EqualTo(0)); // RecordStatus.Added
});
}
Ale tu je háčik: Blob Storage nie je to isté ako lokálny súborový systém.
Volanie flush – odovzdá bloky
Aby sme pochopili, prečo je volanie Flush() v Parquet.NET dôležité, musíme urobiť rýchlu obchádzku k tomu, ako sú štruktúrované súbory Parquet – a ako to interaguje s rozhraním API objektov blob bloku Blob Storage.
Základy parketových pilníkov
Parquet je stĺpcový formát úložiska určený na efektívnu analýzu. Súbor Parket sa skladá z:
- Skupiny riadkov: Toto sú základné stavebné bloky. Každá skupina riadkov obsahuje časť riadkov usporiadaných podľa stĺpcov. Skupiny riadkov sa zapisujú postupne a nezávisle.
- Bloky stĺpcov: V rámci každej skupiny riadkov sa údaje ukladajú stĺpec po stĺpci.
- Metadáta: Na konci súboru Parquet napíše pätu, ktorá obsahuje informácie o schéme a posune pre rýchle čítanie.
V Parquet.NET zakaždým, keď je skupina riadkov dokončená, knižnica zavolá
Flush()výstupný prúd. Tu sa veci stávajú zaujímavými, keď píšete do Blob Storage.
Model objektu blob bloku
Úložisko objektov blob, a teda OneLake, používa model objektu blob bloku, ktorý funguje takto:
- Nahrávate bloky: Každý blok môže mať veľkosť až 4 000 MiB (predvolené je 4 MiB). Bloky môžete nahrávať paralelne, aby ste maximalizovali priepustnosť – a klient .NET Blob to urobí automaticky za vás, čo je skvelé. Toto je jeden z dôvodov, prečo na nahrávanie používať klienta Blob (nie klienta DFS). Klient DFS nepodporuje paralelné nahrávanie blokov, čo môže byť prekážkou výkonu. To znamená, že klient DFS môže stále čítať výsledné súbory v pohode.
- Odovzdáte bloky: Po nahratí všetkých blokov zavoláte CommitBlockList() na dokončenie objektu blob. Môžete nahrať až 50 000 blokov na objekt blob, čo znamená, že ak používate bloky 4 000 MiB, teoreticky by ste mohli napísať jeden súbor 190,7 TiB. (Nie že by som to odporúčal.) Ak sa chcete dozvedieť viac, prečítajte si tento článok Vysvetlenie objektov blob bloku, pripájania objektov BLOB a objektov blob strany.
Tu je háčik
Keď Parquet.NET volá Flush() po každej skupine riadkov a zapisujete do streamu podporovaného službou Blob Storage, toto vyprázdnenie spustí potvrdenie zoznamu blokov. Ako sa súbor zväčšuje, každá nová skupina riadkov spôsobí, že knižnica znova odovzdá všetky predtým nahrané bloky.
Povedzme, že píšete 1 GB súbor so 100 MB skupinami riadkov pomocou 4 MB blokov. Celkovo tak získate 250 blokov. Pri prvom spláchnutí spáchate 25 blokov. Na druhom 50. Do posledného flushu spáchate všetkých 250 blokov. Spočítajte to vo všetkých 10 skupinách riadkov a odovzdali ste celkovo 1 375 blokov – aj keď konečný súbor potrebuje iba jedno odovzdanie.
To je neefektívne a prináša to dva kľúčové problémy:
- Časové limity a opakované pokusy. Veľké zoznamy blokov môžu viesť k vypršaniu časového limitu. Nezabudnite, že Blob Storage je postavený na protokole HTTP. Je spoľahlivý – ale nie dokonalý. Časový limit môže byť na serveri úspešný, ale váš klient to nevie, a tak sa opakuje. Toto opakovanie môže mať za následok konflikt 409. Ak urobíte niečo drahé 250-krát namiesto raz, zvyšuje sa pravdepodobnosť, že sa to stane. Menej odovzdaní = menej opakovaní = menej bolestí hlavy.
- Predčasná viditeľnosť objektu blob. Jednou z pekných vecí na rozhraní Blob API je, že bloky sa nestanú viditeľnými, kým explicitne nepotvrdíte zoznam blokov. To je dobre zarovnané pre scenáre, ako je Parquet, kde súbor nie je platný, kým sa nezapíšu metaúdaje päty. To znamená, že následné procesy náhodou nezachytia napoly napísaný súbor. Ale tu je zvrat: pretože Parquet.NET volá
Flush()po každej skupine riadkov a toto vyprázdnenie spustí odovzdanie, objekt blob sa stane viditeľným pred dokončením súboru. Takže aj keď je rozhranie Blob API navrhnuté tak, aby vám pomohlo vyhnúť sa tomuto problému, spôsob, akým Parquet.NET pracuje so streamom, predstavuje problém – pokiaľ nepodniknete kroky na jeho zabránenie.
Ak chcete preklenúť medzeru medzi Parquet.NET a nuansami úložiska objektov blob, neimplementujte Flush() volanie v implementácii BlobFile.BlobStream . Týmto spôsobom, aj keď Parquet.NET volá Flush po každej skupine riadkov, podkladový stream sa nevyprázdni do úložiska.
Klienti úložiska (DFS a Blob) vytvoria prázdny súbor pri volaní OpenWrite. Otvorený zrkadlový procesor zaznamená chybu, keď narazí na 0-bajtový súbor, čo je možné, ak sa proces replikátora prepletá s vytvorením súboru. Aby ste tomu zabránili, napíšte súbor do iného umiestnenia a presuňte ho na správnu cestu, ktorú ADLS API podporujú prostredníctvom operácie premenovania, napríklad:
public async Task<BlobFile> CreateFileAsync(string filePath)
{
if (filePath is null)
{
throw new ArgumentNullException(nameof(filePath), "File path cannot be null.");
}
var containerClient = client.blobServiceClient.GetBlobContainerClient(containerName);
var temporaryPath = $"_{filePath}.temp"!;
var blobClient = containerClient.GetBlobClient(Combine(temporaryPath));
var blobStream = await blobClient.OpenWriteAsync(overwrite: true);
return new BlobFile(blobStream, GetChildPath(temporaryPath), Combine(filePath)!);
}
Volanie dispose – potvrdí bloky a presunie súbor
Tu je ďalšie jemné, ale dôležité správanie, ktoré si treba uvedomiť: keď používate stream Azure Blob, volanie .Dispose() (alebo jeho implicitné zrušenie) potvrdí všetky nahrané bloky.
To je zvyčajne to, čo chcete – ale nie vždy.
Povedzme, že váš zdrojový systém streamuje údaje pomocou , IAsyncEnumerableako to robí v príklade na ilustráciu chyby. Ak tento zdroj zlyhá v priebehu cesty, napríklad vyprší časový limit pripojenia k databáze alebo sa sieť preruší, môžete mať iba čiastočne napísaný súbor Parquet. Ale ak sa stream zlikviduje (čo sa stane, kvôli čakaniu na použitie alebo použitie bloku), tieto čiastočné bloky sa odovzdajú.
Aby sa tomu zabránilo, vzorový kód sa explicitne odovzdá iba raz, keď sa celá operácia úspešne dokončí. Týmto spôsobom, ak producent zlyhá uprostred streamu, nezostane vám napoly napísaný súbor.
To je dôvod, prečo príklad vráti BlobFile objekt z CreateNextTableDataFileAsync, aby zapuzdril stream, aby sa zabránilo súboru s nulovým bajtom a problémom popísaným s FlushDispose a odovzdaním čiastočných súborov parket.
public class BlobFile(Stream stream, IStoragePath temporaryFilePath, string finalFilePath) : IAsyncDisposable
{
private readonly BlobStream stream = new(stream, temporaryFilePath, finalFilePath);
public async Task WriteData(Func<Stream, Task> writeOperation)
{
try
{
await writeOperation(stream);
}
catch (Exception)
{
stream.Failed();
throw;
}
}
public async ValueTask DisposeAsync()
{
await stream.DisposeAsync();
}
private class BlobStream(Stream innerStream, IStoragePath temporaryFilePath, string finalFilePath) : Stream
{
private bool disposed = false;
private bool success = true;
public override bool CanRead => innerStream.CanRead;
public override bool CanSeek => innerStream.CanSeek;
public override bool CanWrite => innerStream.CanWrite;
public override long Length => innerStream.Length;
public override long Position
{
get => innerStream.Position;
set => innerStream.Position = value;
}
public override void Flush()
{
// no-op
}
public override int Read(byte[] buffer, int offset, int count) => innerStream.Read(buffer, offset, count);
public override long Seek(long offset, SeekOrigin origin) => innerStream.Seek(offset, origin);
public override void SetLength(long value) => innerStream.SetLength(value);
public override void Write(byte[] buffer, int offset, int count) => innerStream.Write(buffer, offset, count);
public void Failed()
{
success = false;
}
public override async ValueTask DisposeAsync()
{
if (disposed)
{
return;
}
if (success)
{
await innerStream.DisposeAsync();
await temporaryFilePath.RenameAsync(finalFilePath);
}
disposed = true;
}
}
}
Písanie testov pomocou OneLake pomocou emulátora Azurite
Jednou zo skvelých vecí na OneLake je, že je postavený na Azure Storage, čo znamená, že svoj integračný kód môžete otestovať lokálne pomocou emulátora Azurite. To uľahčuje písanie spoľahlivých testov, ktoré emulujú správanie OneLake / Azure Storage, bez potreby živého prostredia Fabric alebo cloudových zdrojov.
Azurite emuluje rozhranie API Blob Storage, čo je presne to, čo OneLake odhaľuje. To znamená, že rovnaký kód, ktorý používate v produkcii, môže bežať v testovacej sade nezmenený. Azurit môžete roztočiť ako miestny proces alebo kontajner, nasmerovať BlobServiceClient naň a ísť.
To je užitočné najmä pri jednotkových a integračných testoch. Môžete vykonávať nasledujúce činnosti:
- Skontrolujte, či je váš napísaný
_metadata.jsonsprávne. - Skontrolujte, či logika pomenovania súborov vytvára očakávanú postupnosť.
- Simulujte čiastočné zápisy vyplývajúce z volania
FlushaDisposezlyhania. To umožňuje otestovať vyššie opísané nuansy a primerane ich zvládnuť. - Uistite sa, že vaša serializácia Parquet prebieha čisto.
Azurite nepodporuje rozhrania API ADFS. To je dôvod, prečo BlobFile vyššie uvedené používa rozhranie IStoragePath na implementáciu funkcií ADFS pomocou rozhraní BLOB API, aby mohli pracovať s testovaným emulátorom. Tu je príklad:
public async Task RenameAsync(string newPath)
{
async Task RenameDirectoryAsync(DataLakeServiceClient dataLakeServiceClient)
{
var fileSystemClient = dataLakeServiceClient.GetFileSystemClient(containerName);
var directoryClient = fileSystemClient.GetDirectoryClient(path);
await directoryClient.RenameAsync(newPath);
}
async Task CopyThenDeleteAsync(BlobServiceClient blobServiceClient)
{
var sourceBlob = blobServiceClient.GetBlobContainerClient(containerName).GetBlobClient(path);
var destinationBlob = blobServiceClient.GetBlobContainerClient(containerName).GetBlobClient(newPath);
await destinationBlob.StartCopyFromUriAsync(sourceBlob.Uri);
await sourceBlob.DeleteIfExistsAsync();
}
StorageOperation operation = new()
{
WithFlatNamespace = CopyThenDeleteAsync,
WithHierarchicalNamespace = RenameDirectoryAsync
};
await operation.Execute(client);
}
Testovanie rohových puzdier: prepláchnite a zlikvidujte
Predtým sme hovorili o tom, ako môže použitie Flush()Dispose() Parquet.NET viesť k predčasným alebo čiastočným odovzdaniam objektov blob pri písaní do OneLake. Toto správanie je jemné – ale testovateľné.
Tu je niekoľko testov na overenie, či logika zrkadlenia správne spracováva tieto scenáre:
[Test]
public async Task it_should_write_data_to_table()
{
await setup.FabricPricePaidMirror.SeedMirrorAsync(setup.TableId);
var mirroredData = await GetMirroredBlobItem();
var mirroredDataClient = setup.WorkspaceContainer.GetBlobClient(mirroredData!.Name);
var mirroredDataContents = await mirroredDataClient.DownloadContentAsync();
var readData = await PricePaidMirroredDataFormat.Read(mirroredDataContents.Value.Content.ToStream()).SingleAsync();
Assert.Multiple(() =>
{
Assert.That(mirroredData, Is.Not.Null);
Assert.That(mirroredData!.Properties.ContentLength, Is.GreaterThan(0));
Assert.That(readData.TransactionId, Is.EqualTo(setup.PricePaidReader.TransactionId));
// ... more assertions ...
});
}
Tento test potvrdzuje, že úspešný zápis má za následok platný, neprázdny súbor Parquet, ktorý sa správne vráti.
Teraz k prípadom zlyhania:
[Test]
public async Task it_should_not_commit_partially_written_data()
{
long? lengthDuringWrite = null;
setup.PricePaidReader.ActionBetweenRowGroups = async () =>
{
var mirroredData = await GetMirroredBlobItem();
lengthDuringWrite = mirroredData!.Properties.ContentLength;
};
await setup.FabricPricePaidMirror.SeedMirrorAsync(setup.TableId);
Assert.That(lengthDuringWrite, Is.EqualTo(0));
}
Tento test simuluje výnimku uprostred streamu a overuje, že počas zápisu nie sú viditeľné žiadne čiastočné údaje z dôvodu prepísania no-op Flush() .
A nakoniec cesta zlyhania:
[Test]
public async Task after_a_row_group_is_written_it_should_not_leave_a_partially_complete_blob()
{
setup.PricePaidReader.ThrowsAfterFirstRowGroup = true;
var previouslyMirroredFile = await GetMirroredBlobItem();
var threw = true;
try
{
await setup.FabricPricePaidMirror.SeedMirrorAsync(setup.TableId);
}
catch (Exception)
{
threw = true;
}
var mirroredData = await GetMirroredBlobItem();
var mirroredTemporaryData = await GetMirroredBlobTemporaryItem();
Assert.Multiple(() =>
{
Assert.That(threw, Is.True);
if (previouslyMirroredFile == null)
{
Assert.That(mirroredData, Is.Null);
}
else
{
Assert.That(mirroredData!.Name, Is.EqualTo(previouslyMirroredFile.Name));
}
Assert.That(mirroredTemporaryData, Is.Not.Null);
Assert.That(mirroredTemporaryData!.Properties.ContentLength, Is.EqualTo(0));
});
}
Tento test potvrdzuje, že aj keď je stream zrušený z dôvodu výnimky, nový zrkadlený súbor nie je čiastočne zapísaný.
Tieto testy poskytujú istotu, že logika zrkadlenia je robustná, a to aj v podmienkach zlyhania. Ukazujú, ako možno emulátor použiť na simuláciu správania v reálnom svete bez potreby úplného prostredia Fabric (alebo Azure).
A len na preukázanie kompatibility, vyladenie testovacieho nastavenia tak, aby smerovalo na pracovný priestor Fabric, tu je tabuľka plná údajov o cenách domov v Spojenom kráľovstve.
public class when_using_fabric
{
public class in_success_cases : when_copying_to_mirror_successfully
{
[SetUp]
public void UseFabric() => setup = TestSetup.UsingFabric();
}
public class in_failure_cases : when_copying_to_mirror_fails
{
[SetUp]
public void UseFabric() => setup = TestSetup.UsingFabric();
}
}
public static TestSetup UsingFabric()
{
var blobServiceClient = new BlobServiceClient(new Uri("https://onelake.blob.fabric.microsoft.com/"), new DefaultAzureCredential());
var pricePaidReader = new TestPricePaidReader();
var tableId = new OpenMirroredTableId($"TestWorkspace", "HousePriceOpenMirror.MountedRelationalDatabase", "PricePaid");
var workspaceContainer = blobServiceClient.GetBlobContainerClient(tableId.WorkspaceName);
var fabricPricePaidMirror = new FabricPricePaidMirror(new FabricOpenMirroringWriter(blobServiceClient), pricePaidReader)
{
Settings = new FabricPricePaidMirrorSettings { RowsPerRowGroup = 1 }
};
return new TestSetup
{
BlobServiceClient = blobServiceClient,
PricePaidReader = pricePaidReader,
TableId = tableId,
WorkspaceContainer = workspaceContainer,
FabricPricePaidMirror = fabricPricePaidMirror
};
}
Zhrnutie
Ak vytvárate nové kanály na OneLake, najmä pre streamovanie alebo bezserverové vyťaženia, rozhrania API ADLS a Blob Storage fungujú podľa očakávania. Sú rýchle, flexibilné a bezproblémovo fungujú s otvoreným zrkadlením. Dodržiavaním protokolu cieľovej zóny a spracovaním rozdielov medzi objektmi blob bloku a systémom súborov môžete vytvárať robustné, testovateľné integrácie, ktoré fungujú rovnako dobre v produkcii ako v lokálnom emulátore. A čo je najlepšie, nemusíte prepisovať svoju aplikáciu ani bojovať so systémom súborov. Stačí streamovať, serializovať a nahrávať do OneLake.