Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Wenn Ihre Anwendung Azure Data Lake Storage (ADLS) oder BLOB Storage-APIs verwendet und eine Verbindung mit OneLake herstellen muss, können Sie die vorhandenen APIs weiterhin verwenden.
Wir veranschaulichen, wie Blob- und ADLS-APIs mit OneLake mithilfe eines echten Spiegelungsbeispiels verwendet werden und Entwicklereinblicke aus OneLake teilen. Wir untersuchen, wann und warum Sie eine API über eine andere auswählen und wie Sie die einzelnen apIs optimal verwenden können. Alle Muster, die wir behandeln, gelten auch für Azure Storage Storage.
In diesem Szenario behandeln wir Folgendes:
- Was ist offene Spiegelung?
- Wie man die .NET Azure Blob Storage und DFS-Clients (Verteiltes Dateisystem) verwendet, um Daten in die offene Spiegelungs-Zielzone zu schreiben.
- Kombinieren der Blob Storage- und DFS-Clients zum Hochladen von Daten und Verwalten von Ordnern in OneLake, insbesondere wenn Leistung von Bedeutung ist.
- So wird's gemacht: Behandeln von Szenarien, die bei Block-Blobs beim Schreiben von Parkettdaten in blob-Speicher von .NET auftreten
- So testen Sie alles lokal mit dem Azurite-Emulator. (Ja – Code, den Sie für OneLake schreiben, funktioniert auch mit dem Speicher-Emulator!)
Hinweis
Der Code für diesen Artikel finden Sie in GitHub.
Streamen von Parkett in OneLake mit Blob-APIs
In diesem Abschnitt zeigen wir, wie Sie Parquet-Daten effizient in OneLake streamen, insbesondere in die offene Spiegelungs-Landezone. Offene Spiegelung ist eine leistungsstarke Methode, um Daten aus proprietären Systemen, in denen Abkürzungen nicht verwendet werden können, in Microsoft Fabric einzubringen. Es übernimmt die schwere Arbeit, konvertiert Rohdaten in das Delta Lake-Format, verwaltet Upserts, Löschvektoren, optimiert, führt Bereinigungen durch und vieles mehr. Alles, was Sie tun müssen, ist, Ihre Daten in die Landing-Zone hochzuladen, eine Zeilenmarkierung hinzuzufügen, und die Spiegelung kümmert sich um den Rest.
Es ist üblich, dass Teams benutzerdefinierten Code schreiben, um Daten aus proprietären Systemen zu extrahieren und in einem offenen Format auszugeben. Während das offene Mirroring sowohl CSV- als auch Parquet-Dateien in Delta-Tabellen einlesen kann, sollten Sie, wenn Sie bereits Code schreiben, besser Parquet verwenden, da es effizienter hochzuladen und zu verarbeiten ist.
Parkett ist ein Speicherdateiformat , das für die Analyse konzipiert ist. Delta ist hingegen ein Tabellenprotokoll, das auf Parquet aufgebaut ist. Es fügt Transaktionsgarantien, Schemaerzwingung und Unterstützung für Updates und Löschungen hinzu. Wenn Sie Parquet-Dateien in die offene Spiegelungs-Landezone hochladen, werden diese Dateien in Delta-Tabellen umgewandelt – mit ACID-Semantik und Abfrageleistungsoptimierungen, ohne dass Sie diese Komplexitäten selbst verwalten müssen. Die Zeilenmarkierung gibt an, wie jeder Datensatz mit der Tabelle zusammengeführt werden soll, wodurch der Spiegelungsprozess wissen kann, wann Zeilen eingefügt, aktualisiert oder gelöscht werden sollen.
Sehen wir uns ein konkretes (aber fiktives) Beispiel an.
Stellen Sie sich vor, Sie haben eine .NET-Anwendung, die UK-Hauspreisdaten aus einem unternehmensinternen System der Finanzbehörde abruft. (Nur um klar zu sein, dies ist ein ausgedachtes Szenario. Die Britische Finanzbehörde macht dies nach meinem Wissen nicht, aber der Datensatz ist öffentlich verfügbar und dient als gutes Beispiel.) Diese App wird monatlich als Azure-Funktion ausgeführt und um die Kosten niedrig zu halten, muss sie schnell sein und die Nutzung des lokalen Datenträgers vermeiden. Ziel ist es also, die Daten direkt aus der Quelle im Parquet-Format in die offene Landingzone für Spiegelungen zu streamen. Das "Price Paid"-Datensatz von der UK-Finanzbehörde enthält ein RecordStatus-Feld, das der Zeilenmarkierung zugeordnet werden muss, die für das offene Spiegelungsformat erforderlich ist.
Dieses Szenario entspricht gut der Blob Storage-API. Es unterstützt Streaming-Schreibvorgänge, sodass Sie Daten direkt in OneLake übertragen können, ohne sie lokal zwischenzuspeichern. Das macht es einfach, effizient und kosteneffizient, insbesondere für serverlose Workloads wie Azure Functions. Es ist auch schnell: Die Blob-API unterstützt parallele Blockuploads, was den Durchsatz beim Schreiben großer Dateien erheblich steigern kann und bei Verwendung des DFS-Endpunkts nicht unterstützt wird.
Dazu verwenden wir die Open-Source-Bibliothek Parquet.NET, eine vollständig verwaltete .NET-Assembly, die das Schreiben von Parquet-Daten erleichtert. Außerdem führt die Arbeit mit einem Objektspeicher wie Blob Storage einige Nuancen ein, insbesondere im Hinblick auf Streaming und Pufferung, was uns die Möglichkeit gibt, einige Nuancen beim Arbeiten mit BLOB-Speicher zu erkunden.
Dieser Ansatz hält Ihre Azure Function leicht, schnell und kostengünstig, keine lokale Festplatte, kein Staging, nur Streamen, Serialisieren und Uploaden.
Zusammenfassung der offenen Spiegelungs-Landezonen
Die offene Spiegelungs-Landezone fungiert wie ein Posteingang für Ihre gespiegelten Tabellen; wenn Sie Dateien hinzufügen, kümmert sich Fabric um die Aufnahme. Hinter dieser Einfachheit handelt es sich jedoch um ein klares Protokoll, das Ihre Anwendung befolgen muss, um sicherzustellen, dass Daten ordnungsgemäß erkannt und verarbeitet werden.
Ordnerstruktur
Jede gespiegelte Tabelle verfügt über einen dedizierten Pfad in OneLake:
<workspace>/mirrored-database/Files/LandingZone/<table-name>/
Hier schreiben Sie Metadaten und Datendateien. Sie müssen keine expliziten Ordner erstellen, nur Blobs mit dem entsprechenden Präfix schreiben und die Struktur wird abgeleitet.
Schritt 1: Deklarieren der Tabellenschlüssel
Bevor Sie Daten schreiben, müssen Sie eine _metadata.json Datei im Zielzonenordner der Tabelle erstellen. Diese Datei definiert die Schlüsselspalten, die für Upsert-Operationen und Löschvorgänge verwendet werden:
public async Task CreateTableAsync(OpenMirroredTableId table, params string[] keyColumns)
{
await using var metadataFile = await OpenWriteAsync(table, "_metadata.json");
var json = new { keyColumns };
await JsonSerializer.SerializeAsync(metadataFile, json);
}
Diese Metadatendatei weist Fabric an, wie Zeilen eindeutig identifiziert werden. Ohne dies werden Ihre Daten von Fabric nicht erfasst.
Schritt 2: Erstellen von Datendateien mit dem richtigen Namen
Sobald die Metadaten vorhanden sind, können Sie mit dem Schreiben von Daten beginnen. Dateien müssen sequenziell benannt werden, indem nullgefüllte Zahlen wie 00000000000000000001.parquet, 00000000000000000002.parquetusw. verwendet werden. Dadurch wird eine deterministische Anordnung gewährleistet und Kollisionen vermieden.
Listen-APIs geben Blobs alphabetisch zurück, damit unsere Logik schnell die nächste Sequenznummer finden kann, indem sie den Landing-Zone-Ordner mit einer flachen Liste verarbeitet. Bei der offenen Spiegelung werden verarbeitete Dateien in Ordner verschoben, die mit dem Präfix _ versehen werden und unterhalb numerischer Werte sortiert werden. Nachdem Sie die Schleife beendet haben, nachdem alle Parquet-Dateien angezeigt wurden, verbessert sich die Leistung, wenn Sie die Azure Storage List Blob API verwenden – die Blobs in Unterordnern auflistet, die dem Präfix entsprechen. Wenn Sie die ADLS-Pfadlisten-API verwenden, können Sie eine rekursive Liste ausführen, die die Kontrolle darüber ermöglicht, ob der Inhalt von Unterordnern aufgelistet werden soll. Dies ist ein klarer Vorteil des hierarchischen Namespaces.
Die Logik zum Ermitteln des nächsten Dateinamens sieht wie folgt aus:
public async Task<MirrorDataFile> CreateNextTableDataFileAsync(OpenMirroredTableId table)
{
var (containerClient, path) = GetTableLocation(table);
var listBlobs = containerClient.GetBlobsAsync(prefix: path);
var tableFound = false;
BlobItem? lastDataFile = null;
// Parquet files will be first in the folder because other files and folders all start with an underscore.
// So we can just take the last Parquet file to get our sequence number.
await foreach (var blob in listBlobs)
{
tableFound = true;
if (blob.Name.EndsWith(".parquet") && blob.Properties.ContentLength > 0)
{
lastDataFile = blob;
}
else
{
break;
}
}
if (!tableFound)
{
throw new ArgumentException($"Table not found.", nameof(table));
}
long lastFileNumber = 0;
if (lastDataFile is not null)
{
var dataFileName = Path.GetFileName(lastDataFile.Name);
lastFileNumber = long.Parse(dataFileName.Split('.')[0]);
}
var stream = await OpenWriteAsync(table, $"{++lastFileNumber:D20}.parquet");
return new MirrorDataFile(stream)
{
FileSequenceNumber = lastFileNumber
};
}
Vielleicht fragen Sie sich, wozu die MirrorDataFile-Klasse dient, wir kommen darauf zurück, wenn wir behandeln, wie wir zuverlässig mit Block-BLOBs arbeiten.
Schritt 3: Hochladen des Inhalts der Datei
Der eigentliche Schreibvorgang wird durchgeführt, indem ein Datenstrom zu dem Blob geöffnet wird.
private async Task<Stream> OpenWriteAsync(OpenMirroredTableId table, string fileName)
{
var (containerClient, path) = GetTableLocation(table);
path += fileName;
var blobClient = containerClient.GetBlobClient(path);
var stream = await blobClient.OpenWriteAsync(true);
return stream;
}
Hinweis
Gespiegelte Datenbanken unterstützen Schemas, sodass die Zielzone einen optionalen Schemaordner enthalten kann, um anzuzeigen, dass sich die Tabelle in einem Schema befindet. Außerdem werden Fabric-Arbeitsbereiche in OneLake Speichercontainern und ADLS-Dateisystemen zugeordnet.
public record OpenMirroredTableId(string WorkspaceName, string MirroredDatabaseName, string TableName)
{
public string? Schema { get; init; } = null;
public string GetTablePath() => Schema == null
? $"{MirroredDatabaseName}/Files/LandingZone/{TableName}/"
: $"{MirroredDatabaseName}/Files/LandingZone/{Schema}.schema/{TableName}/";
}
private (BlobContainerClient ContainerClient, string TablePath) GetTableLocation(OpenMirroredTableId table)
{
var containerClient = blobServiceClient.GetBlobContainerClient(table.WorkspaceName);
var path = table.GetTablePath();
return (containerClient, path);
}
Die Geschäftslogik, die Price Paid-Daten aus dem Inland Revenue System liest, wandelt jeden Datensatz in das Parquet-Format um, während er von der Quelle gestreamt wird. Jede Zeile wird direkt in eine Parquet-Datei geschrieben, die gleichzeitig Byte für Byte über den von CreateNextTableDataFileAsync zurückgegebenen Datenstrom in Azure Storage gestreamt wird. Bei diesem Ansatz wird das lokale Staging vermieden und eine effiziente, serverlose Datenaufnahme ermöglicht.
So funktioniert der Code:
public async Task SeedMirrorAsync(OpenMirroredTableId tableId, CancellationToken cancellationToken = default)
{
await openMirroringWriter.CreateTableAsync(tableId, PricePaidMirroredDataFormat.KeyColumns);
var data = pricePaidDataReader.ReadCompleteData(cancellationToken);
await using var mirrorDataFile = await openMirror.CreateNextTableDataFileAsync(tableId);
await mirrorDataFile.WriteData(async stream => await data.WriteAsync(stream, Settings.RowsPerRowGroup, cancellationToken));
}
Die WriteAsync-Methode serialisiert die Daten Zeilengruppe nach Zeilengruppe in das Parquet-Format.
public static async Task WriteAsync(this IAsyncEnumerable<PricePaid> data, Stream resultStream, int rowsPerRowGroup = 10000, CancellationToken cancellationToken = default)
{
await using var parquetWriter = await ParquetWriter.CreateAsync(
PricePaidMirroredDataFormat.CreateSchema(),
resultStream,
cancellationToken: cancellationToken);
await foreach (var chunk in data
.Select(PricePaidMirroredDataFormat.Create)
.ChunkAsync(rowsPerRowGroup, cancellationToken))
{
await ParquetSerializer.SerializeRowGroupAsync(parquetWriter, chunk, cancellationToken);
}
}
Jeder PricePaid Datensatz wird in ein PricePaidMirroredDataFormat Objekt umgewandelt, das das erforderliche __rowMarker__ Feld enthält:
public static PricePaidMirroredDataFormat Create(PricePaid pricePaid)
{
var recordMarker = pricePaid.RecordStatus.Value switch
{
RecordStatus.Added => 0,
RecordStatus.Changed => 1,
RecordStatus.Deleted => 2,
_ => throw new InvalidEnumArgumentException("Unexpected RecordStatus value")
};
return new PricePaidMirroredDataFormat
{
TransactionId = pricePaid.TransactionId,
Price = pricePaid.Price,
...
__rowMarker__ = recordMarker
};
}
Dieses Protokoll ist einfach und deterministisch. Es vermeidet unnötige API-Aufrufe, funktioniert nahtlos mit Batch- und Streamingpipelinen und lässt sich problemlos in serverlose Umgebungen wie Azure Functions integrieren. Durch das Hochladen eines Blobs in ein Pfad-Präfix werden die übergeordneten Ordner automatisch erstellt und die Kompatibilität mit dem Speicheremulator bleibt erhalten – mehr dazu im Abschnitt 'Tests'.
Schritt 4: Aufräumen nach sich selbst
Sobald das Open-Mirroring-Verfahren Ihre Daten erfolgreich verarbeitet hat, verschiebt es die ursprünglichen Dateien in die speziellen Ordner _ProcessedFiles und _FilesReadyToDelete und fügt eine _FilesReadyToDelete.json-Datei hinzu. Während Fabric diese Dateien nach sieben Tagen automatisch löscht, kann dieses Aufbewahrungsfenster zu erheblichen Speicherkosten führen, wenn Sie große Datenmengen spiegeln.
Um die Kosten zu reduzieren, können Sie diese Ordner proaktiv löschen, sobald Sie sicher sind, dass die Daten aufgenommen wurden. Dies ist ein hervorragender Anwendungsfall für die ADLS-API, die das Löschen von Atomverzeichnissen unterstützt – viel effizienter als das Aufzählen und Löschen einzelner Blobs und Aktualisieren der _FilesReadyToDelete.json Datei.
Gehen Sie dazu wie folgt vor:
public async Task CleanUpTableAsync(OpenMirroredTableId tableId)
{
var (fileSystemClient, tablePath) = GetTableLocation(tableId);
var foldersToDelete = new []
{
"_ProcessedFiles",
"_FilesReadyToDelete"
};
await Parallel.ForEachAsync(foldersToDelete, async (path, _) =>
{
var fullPath = $"{tablePath}{path}/";
var directoryClient = fileSystemClient.GetDirectoryClient(fullPath);
if (await directoryClient.ExistsAsync())
{
await directoryClient.DeleteAsync();
}
});
}
private (DataLakeFileSystemClient FileSystemClient, string TablePath) GetTableLocation(OpenMirroredTableId table)
{
var containerClient = dataLakeServiceClient.GetFileSystemClient(table.WorkspaceName);
var path = $"{table.MirroredDatabaseName}/Files/LandingZone/{table.TableName}/";
return (containerClient, path);
}
Zuverlässiges Arbeiten mit Block-Blobs
Ich habe dieses Beispiel ausgewählt, weil es die Tür öffnet, um über einige Nuancen mit Blockblobs zu sprechen. Dies sind nicht OneLake-spezifisch, aber da OneLake auf Azure Storage basiert, macht OneLake auch diese Nuancen verfügbar.
Ein solcher Fall: Das .NET Storage SDK macht eine OpenWrite API verfügbar, die einen Stream zurückgibt. Super praktisch. Wie im obigen Beispiel gezeigt, passt dieser Datenstrom gut zu den Parquet.NET-APIs. Es macht auch das Testen ein Kinderspiel – Sie können den Datenstrom einfach in Komponententests ersetzen, ohne zusätzliche Abstraktionen nur zur Testbarkeit erstellen zu müssen.
[Test]
public async Task when_writing_price_paid_data_to_parquet()
{
var row = new PricePaid
{
TransactionId = "{34222872-B554-4D2B-E063-4704A8C07853}",
Price = 375000,
DateOfTransfer = new DateTime(2004, 4, 27),
Postcode = "SW13 0NP",
PropertyType = PropertyType.Detached,
IsNew = true,
DurationType = DurationType.Freehold,
PrimaryAddressableObjectName = "10A",
SecondaryAddressableObjectName = string.Empty,
Street = "THE TERRACE",
Locality = string.Empty,
TownCity = "LONDON",
District = "RICHMOND UPON THAMES",
County = "GREATER LONDON",
CategoryType = CategoryType.AdditionalPricePaid,
RecordStatus = RecordStatus.Added
};
using var memoryStream = new MemoryStream();
await new[] { row }.ToAsyncEnumerable().WriteAsync(memoryStream);
var readData = await PricePaidMirroredDataFormat.Read(memoryStream).SingleAsync();
Assert.Multiple(() =>
{
Assert.That(readData.TransactionId, Is.EqualTo(row.TransactionId));
Assert.That(readData.Price, Is.EqualTo(row.Price));
Assert.That(readData.DateOfTransfer, Is.EqualTo(row.DateOfTransfer));
// more assertions
Assert.That(readData.__rowMarker__, Is.EqualTo(0)); // RecordStatus.Added
});
}
Aber hier ist der Catch: Blob Storage ist nicht identisch mit einem lokalen Dateisystem.
Aufrufen von "flush" – Schreibt die Blöcke
Um zu verstehen, warum der Flush() Aufruf in Parquet.NET wichtig ist, müssen wir einen kurzen Exkurs in die Struktur von Parquet-Dateien machen – und wie das mit der Blob-Storage-Block-Blob-API interagiert.
Grundlagen der Parquet-Datei
Parkett ist ein Säulenspeicherformat, das für eine effiziente Analyse konzipiert ist. Eine Parquet-Datei besteht aus:
- Zeilengruppen: Dies sind die wichtigsten Bausteine. Jede Zeilengruppe enthält einen Teil von Zeilen, organisiert nach Spalte. Zeilengruppen werden sequenziell und unabhängig geschrieben.
- Spaltenblöcke: Innerhalb jeder Zeilengruppe werden Daten spaltenweise gespeichert.
- Metadaten: Am Ende der Datei schreibt Parquet einen Footer, der Schema- und Offsetinformationen für schnelle Lesevorgänge enthält.
Bei jedem Abschluss einer Zeilengruppe ruft
Flush()die Bibliothek in Parquet.NET den Ausgabedatenstrom auf. Hier wird es interessant, wenn Sie in Blob Storage schreiben.
Blockblob-Modell
Blob-Speicher, und daher OneLake, verwendet ein Blockblob-Modell; es funktioniert wie folgt:
- Sie laden Blöcke hoch: Jeder Block kann bis zu 4.000 MiB groß sein (Standard ist 4 MiB). Sie können Blöcke parallel hochladen, um den Durchsatz zu maximieren– und der .NET Blob-Client erledigt dies automatisch, was hervorragend ist. Dies ist ein Grund, den BLOB-Client (nicht den DFS-Client) für Uploads zu verwenden. Der DFS-Client unterstützt keine parallelen Blockuploads, was ein Leistungsengpässe sein kann. Das heißt, der DFS-Client kann die resultierenden Dateien immer noch in Ordnung lesen.
- Sie übernehmen den Commit für die Blöcke: Sobald alle Blöcke hochgeladen wurden, rufen Sie CommitBlockList() auf, um das Blob abzuschließen. Sie können bis zu 50.000 Blöcke pro Blob hochladen. Wenn Sie 4.000 MiB-Blöcke verwenden, könnten Sie theoretisch eine einzelne 190.7-TiB-Datei schreiben. (Nicht, dass ich es empfehlen würde.) Wenn Sie mehr erfahren möchten, lesen Sie diesen Artikel , in dem Sie Blockblobs, Anfügen von Blobs und Seitenblobs verstehen.
Hier ist der Haken
Wenn Parquet.NET Flush() nach jeder Zeilengruppe aufruft und Sie in einen von Blob Storage unterstützten Datenstrom schreiben, löst dies einen BlockList-Commit aus. Wenn die Datei wächst, bewirkt jede neue Zeilengruppe, dass die Bibliothek alle zuvor hochgeladenen Blöcke erneut annimmt.
Angenommen, Sie schreiben eine 1-GB-Datei mit 100 MB Zeilengruppen mit 4 MB-Blöcken. Dadurch erhalten Sie insgesamt 250 Blöcke. Beim ersten Leeren übernehmen Sie 25 Blöcke. Auf der zweiten, 50. Durch die endgültige Löschung übernehmen Sie alle 250 Blöcke. Fügen Sie dies in allen 10 Zeilengruppen hinzu, und Sie haben insgesamt 1.375 Blöcke zugesichert, obwohl die endgültige Datei nur einen Commit benötigt.
Dies ist ineffizient und führt zu zwei wichtigen Problemen:
- Timeouts und erneute Versuche. Große Blocklisten können zu Timeouts führen. Denken Sie daran, dass Blob Storage auf HTTP basiert. Es ist zuverlässig – aber nicht perfekt. Ein Timeout kann auf dem Server erfolgreich sein, aber Ihr Client weiß das nicht, sodass es erneut ausgeführt wird. Dieser Wiederholungsversuch kann zu einem 409-Konflikt führen. Wenn Sie etwas teures 250 Mal statt einmal tun, erhöht sich die Wahrscheinlichkeit, dass dies geschieht. Weniger Commits = weniger Wiederholungsversuche = weniger Kopfschmerzen.
- Vorzeitige Blob-Sichtbarkeit. Eine der schönen Dinge bei der Blob-API ist, dass die Blöcke erst sichtbar werden, wenn Sie explizit einen Commit für die BlockList ausführen. Dies eignet sich gut für Szenarien wie Parkett, in denen die Datei erst gültig ist, wenn die Fußzeilenmetadaten geschrieben wurden. Dies bedeutet, dass nachgelagerte Prozesse nicht versehentlich eine halb geschriebene Datei aufnehmen. Aber hier ist die Wendung: Da Parquet.NET nach jeder Zeilengruppe
Flush()aufruft und das Leeren einen Commit auslöst, wird das Blob sichtbar, bevor die Datei abgeschlossen ist. Obwohl die Blob-API ihnen dabei helfen soll, dieses Problem zu vermeiden, führt Parquet.NET mit einem Stream ein Problem ein – es sei denn, Sie ergreifen Schritte, um es zu verhindern.
Um die Lücke zwischen Parquet.NET und den Nuancen von Blob Storage zu überbrücken, implementieren Sie den Flush() Aufruf nicht in der BlobFile.BlobStream Implementierung. Auf diese Weise wird der zugrunde liegende Datenstrom nicht auf den Speicher geschrieben, obwohl Parquet.NET Flush nach jeder Zeilengruppe aufruft.
Die Speicherclients (DFS und Blob) erstellen beim Aufrufen von OpenWrite eine leere Datei. Der offene Spiegelprozessor protokolliert einen Fehler beim Auftreten einer 0-Byte-Datei, was möglich ist, wenn der Replikationsprozess sich mit der Dateierstellung überschneidet. Um dies zu vermeiden, schreiben Sie eine Datei an einen anderen Speicherort, und verschieben Sie sie in den richtigen Pfad, den die ADLS-APIs durch einen Umbenennungsvorgang unterstützen, z. B.:
public async Task<BlobFile> CreateFileAsync(string filePath)
{
if (filePath is null)
{
throw new ArgumentNullException(nameof(filePath), "File path cannot be null.");
}
var containerClient = client.blobServiceClient.GetBlobContainerClient(containerName);
var temporaryPath = $"_{filePath}.temp"!;
var blobClient = containerClient.GetBlobClient(Combine(temporaryPath));
var blobStream = await blobClient.OpenWriteAsync(overwrite: true);
return new BlobFile(blobStream, GetChildPath(temporaryPath), Combine(filePath)!);
}
Dispose aufrufen – Die Blöcke werden bestätigt und die Datei wird verschoben
Hier ist ein weiteres subtiles, aber wichtiges Verhalten zu beachten: Wenn Sie den Azure Blob-Stream verwenden, werden alle hochgeladenen Blöcke durch das Aufrufen von .Dispose() (oder wenn es implizit verworfen wird) festgeschrieben.
Das ist in der Regel das, was Sie wollen – aber nicht immer.
Angenommen, Ihr Quellsystem streamt Daten mit einem IAsyncEnumerable, wie im Beispiel gezeigt wird, um den Fehler zu veranschaulichen. Wenn diese Quelle zum Beispiel mitten im Prozess ausfällt, etwa wenn die Datenbankverbindung abläuft oder das Netzwerk abbricht, haben Sie möglicherweise nur eine teilweise geschriebene Parquet-Datei. Wenn der Datenstrom jedoch verworfen wird (was er aufgrund von await using oder eines Using-Blocks erwartet wird), werden diese Teilblöcke festgeschrieben.
Um dies zu vermeiden, führt der Beispielcode das Commit nur einmal aus, wenn der gesamte Vorgang erfolgreich abgeschlossen wird. Auf diese Weise, wenn der Produzent mid-Stream fehlschlägt, bleiben Sie nicht mit einer halb geschriebenen Datei übrig.
Aus diesem Grund gibt das Beispiel ein BlobFile Objekt aus CreateNextTableDataFileAsync zurück, um den Datenstrom zu kapseln, wodurch sowohl Zero-Byte-Dateien verhindert werden als auch die im Zusammenhang mit Flush und Dispose beschriebenen Probleme mit teilweise geschriebenen Parquet-Dateien.
public class BlobFile(Stream stream, IStoragePath temporaryFilePath, string finalFilePath) : IAsyncDisposable
{
private readonly BlobStream stream = new(stream, temporaryFilePath, finalFilePath);
public async Task WriteData(Func<Stream, Task> writeOperation)
{
try
{
await writeOperation(stream);
}
catch (Exception)
{
stream.Failed();
throw;
}
}
public async ValueTask DisposeAsync()
{
await stream.DisposeAsync();
}
private class BlobStream(Stream innerStream, IStoragePath temporaryFilePath, string finalFilePath) : Stream
{
private bool disposed = false;
private bool success = true;
public override bool CanRead => innerStream.CanRead;
public override bool CanSeek => innerStream.CanSeek;
public override bool CanWrite => innerStream.CanWrite;
public override long Length => innerStream.Length;
public override long Position
{
get => innerStream.Position;
set => innerStream.Position = value;
}
public override void Flush()
{
// no-op
}
public override int Read(byte[] buffer, int offset, int count) => innerStream.Read(buffer, offset, count);
public override long Seek(long offset, SeekOrigin origin) => innerStream.Seek(offset, origin);
public override void SetLength(long value) => innerStream.SetLength(value);
public override void Write(byte[] buffer, int offset, int count) => innerStream.Write(buffer, offset, count);
public void Failed()
{
success = false;
}
public override async ValueTask DisposeAsync()
{
if (disposed)
{
return;
}
if (success)
{
await innerStream.DisposeAsync();
await temporaryFilePath.RenameAsync(finalFilePath);
}
disposed = true;
}
}
}
Schreiben von Tests mit OneLake mithilfe des Azurite-Emulators
Eines der großen Dinge bei OneLake ist, dass es auf Azure Storage basiert – was bedeutet, dass Sie Ihren Integrationscode lokal mit dem Azurite-Emulator testen können. Dies erleichtert das Schreiben zuverlässiger Tests, die das Verhalten von OneLake / Azure Storage emulieren, ohne eine Live Fabric-Umgebung oder Cloudressourcen zu benötigen.
Azurite emuliert die Blob Storage-API, was genau das ist, was OneLake verfügbar macht. Das bedeutet, dass derselbe Code, den Sie in der Produktion verwenden, in Ihrer Testsuite unverändert ausgeführt werden kann. Sie können Azurite als lokalen Prozess oder Container starten, BlobServiceClient darauf verweisen und loslegen.
Dies ist besonders nützlich für Komponenten- und Integrationstests. Sie haben folgende Möglichkeiten:
- Überprüfen Sie, ob Ihre
_metadata.jsonSchreibweise korrekt ist. - Überprüfen Sie, ob ihre Dateibenennungslogik die erwartete Sequenz erzeugt.
- Simulieren Sie partielle Schreibvorgänge, die entstehen durch das Aufrufen von
FlushundDisposebei Fehlern. Dies ermöglicht das Testen der oben beschriebenen Nuancen und dass die Implementierung sie angemessen behandelt. - Stellen Sie sicher, dass Ihre Parquet-Serialisierungs-Rundreisen einwandfrei abgeschlossen werden.
Azurite unterstützt die ADFS-APIs nicht. Aus diesem Grund wird oben eine BlobFile Schnittstelle verwendet, um ADFS-Funktionen mithilfe der BLOB-APIs zu implementieren, damit sie im Test mit dem Emulator arbeiten können. Hier ist ein Beispiel angegeben:
public async Task RenameAsync(string newPath)
{
async Task RenameDirectoryAsync(DataLakeServiceClient dataLakeServiceClient)
{
var fileSystemClient = dataLakeServiceClient.GetFileSystemClient(containerName);
var directoryClient = fileSystemClient.GetDirectoryClient(path);
await directoryClient.RenameAsync(newPath);
}
async Task CopyThenDeleteAsync(BlobServiceClient blobServiceClient)
{
var sourceBlob = blobServiceClient.GetBlobContainerClient(containerName).GetBlobClient(path);
var destinationBlob = blobServiceClient.GetBlobContainerClient(containerName).GetBlobClient(newPath);
await destinationBlob.StartCopyFromUriAsync(sourceBlob.Uri);
await sourceBlob.DeleteIfExistsAsync();
}
StorageOperation operation = new()
{
WithFlatNamespace = CopyThenDeleteAsync,
WithHierarchicalNamespace = RenameDirectoryAsync
};
await operation.Execute(client);
}
Testen von Eckfällen: Leeren und Freigeben
Früher haben wir darüber gesprochen, wie die Verwendung von Flush() Parquet.NET zu Dispose() vorzeitigen oder teilweisen Blob-Commits führen kann, wenn sie in OneLake schreiben. Diese Verhaltensweisen sind subtil – aber testbar.
Im Folgenden sind einige Tests aufgeführt, um zu überprüfen, ob die Spiegellogik diese Szenarien ordnungsgemäß verarbeitet:
[Test]
public async Task it_should_write_data_to_table()
{
await setup.FabricPricePaidMirror.SeedMirrorAsync(setup.TableId);
var mirroredData = await GetMirroredBlobItem();
var mirroredDataClient = setup.WorkspaceContainer.GetBlobClient(mirroredData!.Name);
var mirroredDataContents = await mirroredDataClient.DownloadContentAsync();
var readData = await PricePaidMirroredDataFormat.Read(mirroredDataContents.Value.Content.ToStream()).SingleAsync();
Assert.Multiple(() =>
{
Assert.That(mirroredData, Is.Not.Null);
Assert.That(mirroredData!.Properties.ContentLength, Is.GreaterThan(0));
Assert.That(readData.TransactionId, Is.EqualTo(setup.PricePaidReader.TransactionId));
// ... more assertions ...
});
}
Dieser Test bestätigt, dass ein erfolgreicher Schreibvorgang zu einer gültigen, nichtleeren Parquet-Datei führt, die korrekt zurückgeschrieben wird.
Jetzt für die Fehlerfälle:
[Test]
public async Task it_should_not_commit_partially_written_data()
{
long? lengthDuringWrite = null;
setup.PricePaidReader.ActionBetweenRowGroups = async () =>
{
var mirroredData = await GetMirroredBlobItem();
lengthDuringWrite = mirroredData!.Properties.ContentLength;
};
await setup.FabricPricePaidMirror.SeedMirrorAsync(setup.TableId);
Assert.That(lengthDuringWrite, Is.EqualTo(0));
}
Dieser Test simuliert eine Mid-Stream-Ausnahme und überprüft, ob während des Schreibvorgangs aufgrund des no-op Flush()-Overrides keine Teildaten sichtbar sind.
Und schließlich der Fehlerpfad:
[Test]
public async Task after_a_row_group_is_written_it_should_not_leave_a_partially_complete_blob()
{
setup.PricePaidReader.ThrowsAfterFirstRowGroup = true;
var previouslyMirroredFile = await GetMirroredBlobItem();
var threw = true;
try
{
await setup.FabricPricePaidMirror.SeedMirrorAsync(setup.TableId);
}
catch (Exception)
{
threw = true;
}
var mirroredData = await GetMirroredBlobItem();
var mirroredTemporaryData = await GetMirroredBlobTemporaryItem();
Assert.Multiple(() =>
{
Assert.That(threw, Is.True);
if (previouslyMirroredFile == null)
{
Assert.That(mirroredData, Is.Null);
}
else
{
Assert.That(mirroredData!.Name, Is.EqualTo(previouslyMirroredFile.Name));
}
Assert.That(mirroredTemporaryData, Is.Not.Null);
Assert.That(mirroredTemporaryData!.Properties.ContentLength, Is.EqualTo(0));
});
}
Dieser Test bestätigt, dass auch dann, wenn der Datenstrom aufgrund einer Ausnahme gelöscht wird, eine neue gespiegelte Datei nicht teilweise geschrieben wird.
Diese Tests stellen sicher, dass die Spiegelungslogik robust ist, auch unter Fehlerbedingungen. Sie veranschaulichen, wie der Emulator verwendet werden kann, um reales Verhalten zu simulieren, ohne eine vollständige Fabric-Umgebung (oder Azure) zu benötigen.
Und um die Kompatibilität zu beweisen, indem Sie die Testeinrichtung so optimieren, dass sie auf einen Fabric-Arbeitsbereich verweist, finden Sie hier eine Tabelle voller UK-Hauspreisdaten.
public class when_using_fabric
{
public class in_success_cases : when_copying_to_mirror_successfully
{
[SetUp]
public void UseFabric() => setup = TestSetup.UsingFabric();
}
public class in_failure_cases : when_copying_to_mirror_fails
{
[SetUp]
public void UseFabric() => setup = TestSetup.UsingFabric();
}
}
public static TestSetup UsingFabric()
{
var blobServiceClient = new BlobServiceClient(new Uri("https://onelake.blob.fabric.microsoft.com/"), new DefaultAzureCredential());
var pricePaidReader = new TestPricePaidReader();
var tableId = new OpenMirroredTableId($"TestWorkspace", "HousePriceOpenMirror.MountedRelationalDatabase", "PricePaid");
var workspaceContainer = blobServiceClient.GetBlobContainerClient(tableId.WorkspaceName);
var fabricPricePaidMirror = new FabricPricePaidMirror(new FabricOpenMirroringWriter(blobServiceClient), pricePaidReader)
{
Settings = new FabricPricePaidMirrorSettings { RowsPerRowGroup = 1 }
};
return new TestSetup
{
BlobServiceClient = blobServiceClient,
PricePaidReader = pricePaidReader,
TableId = tableId,
WorkspaceContainer = workspaceContainer,
FabricPricePaidMirror = fabricPricePaidMirror
};
}
Abschließen
Wenn Sie neue Pipelines auf OneLake erstellen, insbesondere für Streaming- oder serverlose Workloads, funktionieren die ADLS- und Blob-Speicher-APIs erwartungsgemäß. Sie sind schnell, flexibel und arbeiten nahtlos mit offener Spiegelung. Wenn Sie dem Landezonenprotokoll folgen und dem Umgang mit Unterschieden bei Blockblobs und Dateisystemen beachten, können Sie robuste, testbare Integrationen erstellen, die sowohl in der Produktion als auch im lokalen Emulator gleichermaßen gut funktionieren. Und am besten – Sie müssen Ihre App nicht neu schreiben oder das Dateisystem bekämpfen. streamen, serialisieren und in OneLake hochladen.