In diesem Artikel wird gezeigt, wie Sie mithilfe der Abfragebeschleunigung eine Teilmenge von Daten aus Ihrem Speicherkonto abrufen.
Die Abfragebeschleunigung ermöglicht es Anwendungen und Analyseframeworks, die Datenverarbeitung drastisch zu optimieren. Dabei werden nur die Daten abgerufen, die für die Durchführung eines bestimmten Vorgangs erforderlich sind. Weitere Informationen finden Sie unter Abfragebeschleunigung für Azure Data Lake Storage.
Voraussetzungen
Sie benötigen ein Azure-Abonnement, um auf Azure Storage zuzugreifen. Wenn Sie noch kein Abonnement haben, können Sie ein kostenloses Konto erstellen, bevor Sie beginnen.
Ein Speicherkonto vom Typ Allgemein V2. Informationen finden Sie unter Erstellen eines Speicherkontos.
Die doppelte Verschlüsselung wird nicht unterstützt.
Wenn Sie eine JSON-Datei abfragen, sollte die Größe der einzelnen Datensätze in dieser Datei kleiner als 1 MB sein.
Wählen Sie eine Registerkarte aus, um SDK-spezifische Voraussetzungen anzuzeigen.
Für die Verwendung des Node.js SDK sind keine weiteren Voraussetzungen zu erfüllen.
Einrichten Ihrer Umgebung
Schritt 1: Installieren von Paketen
Installieren Sie das Az-Modul, Version 4.6.0 oder höher.
Install-Module -Name Az -Repository PSGallery -Force
Führen Sie zum Aktualisieren von einer älteren Version von Az den folgenden Befehl aus:
Update-Module -Name Az
Öffnen Sie eine Eingabeaufforderung, und wechseln Sie mit dem Befehl cd
in Ihren Projektordner. Beispiel:
cd myProject
Installieren Sie Version 12.5.0-preview.6
oder höher der Azure Blob Storage-Clientbibliothek für das .NET-Paket mit dem Befehl dotnet add package
.
dotnet add package Azure.Storage.Blobs -v 12.8.0
In den Beispielen in diesem Artikel wird eine CSV-Datei mithilfe der Bibliothek CsvHelper analysiert. Führen Sie den folgenden Befehl aus, um diese Bibliothek zu verwenden.
dotnet add package CsvHelper
Öffnen Sie die Datei pom.xml des Projekts in einem Text-Editor. Fügen Sie der Gruppe der Abhängigkeiten das folgende Abhängigkeitselement hinzu:
<!-- Request static dependencies from Maven -->
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-csv</artifactId>
<version>1.8</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-blob</artifactId>
<version>12.8.0-beta.1</version>
</dependency>
Installieren Sie die Azure Data Lake Storage-Clientbibliothek für Python mithilfe von pip.
pip install azure-storage-blob==12.4.0
Installieren Sie die Data Lake-Clientbibliothek für JavaScript, indem Sie ein Terminalfenster öffnen und den folgenden Befehl eingeben.
npm install @azure/storage-blob
npm install @fast-csv/parse
Schritt 2: Hinzufügen von Anweisungen
Fügen Sie die folgenden using
-Anweisungen am Anfang Ihrer Codedatei hinzu.
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;
using Azure.Storage.Blobs.Specialized;
Bei der Abfragebeschleunigung werden CSV- und JSON-formatierte Daten abgerufen. Stellen Sie deshalb sicher, dass Sie die using-Anweisungen für alle CSV- oder JSON-Analysebibliotheken hinzufügen, die Sie verwenden möchten. In den Beispielen in diesem Artikel wird eine CSV-Datei mithilfe der Bibliothek CsvHelper analysiert, die auf NuGet verfügbar ist. Daher werden die folgenden using
-Anweisungen am Anfang der Codedatei hinzugefügt.
using CsvHelper;
using CsvHelper.Configuration;
Zum Kompilieren der Beispiele, die in diesem Artikel gezeigt werden, müssen Sie auch die folgenden using
-Anweisungen hinzufügen.
using System.Threading.Tasks;
using System.IO;
using System.Globalization;
Fügen Sie die folgenden import
-Anweisungen am Anfang Ihrer Codedatei hinzu.
import com.azure.storage.blob.*;
import com.azure.storage.blob.options.*;
import com.azure.storage.blob.models.*;
import com.azure.storage.common.*;
import java.io.*;
import java.util.function.Consumer;
import org.apache.commons.csv.*;
Fügen Sie diese Importanweisungen am Anfang Ihrer Codedatei hinzu.
import sys, csv
from azure.storage.blob import BlobServiceClient, ContainerClient, BlobClient, DelimitedTextDialect, BlobQueryError
Schließen Sie das storage-blob
-Modul ein, indem Sie die folgende Anweisung am Anfang Ihrer Codedatei einfügen.
const { BlobServiceClient } = require("@azure/storage-blob");
Bei der Abfragebeschleunigung werden CSV- und JSON-formatierte Daten abgerufen. Stellen Sie deshalb sicher, dass Sie Anweisungen für alle CSV- oder JSON-Analysemodule hinzufügen, die Sie verwenden möchten. In den Beispielen in diesem Artikel wird eine CSV-Datei mithilfe des Moduls fast-csv analysiert. Daher wird diese Anweisung am Anfang der Codedatei hinzugefügt.
const csv = require('@fast-csv/parse');
Abrufen von Daten mithilfe eines Filters
Sie können SQL verwenden, um die Zeilenfilterprädikate und Spaltenprojektionen in einer Abfragebeschleunigungsanforderung anzugeben. Der folgende Code fragt eine CSV-Datei im Speicher ab und gibt alle Datenzeilen zurück, bei denen die dritte Spalte mit dem Wert Hemingway, Ernest
übereinstimmt.
In der SQL-Abfrage wird das Schlüsselwort BlobStorage
verwendet, um die abzufragende Datei anzugeben.
Spaltenverweise werden als _N
angegeben, wobei die erste Spalte _1
ist. Wenn die Quelldatei eine Kopfzeile enthält, können Sie mit dem Namen, der in der Kopfzeile angegeben ist, auf Spalten verweisen.
Function Get-QueryCsv($ctx, $container, $blob, $query, $hasheaders) {
$tempfile = New-TemporaryFile
$informat = New-AzStorageBlobQueryConfig -AsCsv -HasHeader:$hasheaders
Get-AzStorageBlobQueryResult -Context $ctx -Container $container -Blob $blob -InputTextConfiguration $informat -OutputTextConfiguration (New-AzStorageBlobQueryConfig -AsCsv -HasHeader) -ResultFile $tempfile.FullName -QueryString $query -Force
Get-Content $tempfile.FullName
}
$container = "data"
$blob = "csv/csv-general/seattle-library.csv"
Get-QueryCsv $ctx $container $blob "SELECT * FROM BlobStorage WHERE _3 = 'Hemingway, Ernest, 1899-1961'" $false
Die asynchrone Methode BlockBlobClient.QueryAsync
sendet die Abfrage an die Abfragebeschleunigungs-API und streamt dann die Ergebnisse als Stream-Objekt an die Anwendung zurück.
static async Task QueryHemingway(BlockBlobClient blob)
{
string query = @"SELECT * FROM BlobStorage WHERE _3 = 'Hemingway, Ernest, 1899-1961'";
await DumpQueryCsv(blob, query, false);
}
private static async Task DumpQueryCsv(BlockBlobClient blob, string query, bool headers)
{
try
{
var options = new BlobQueryOptions()
{
InputTextConfiguration = new BlobQueryCsvTextOptions()
{
HasHeaders = true,
RecordSeparator = "\n",
ColumnSeparator = ",",
EscapeCharacter = '\\',
QuotationCharacter = '"'
},
OutputTextConfiguration = new BlobQueryCsvTextOptions()
{
HasHeaders = true,
RecordSeparator = "\n",
ColumnSeparator = ",",
EscapeCharacter = '\\',
QuotationCharacter = '"' },
ProgressHandler = new Progress<long>((finishedBytes) =>
Console.Error.WriteLine($"Data read: {finishedBytes}"))
};
options.ErrorHandler += (BlobQueryError err) => {
Console.ForegroundColor = ConsoleColor.Red;
Console.Error.WriteLine($"Error: {err.Position}:{err.Name}:{err.Description}");
Console.ResetColor();
};
// BlobDownloadInfo exposes a Stream that will make results available when received rather than blocking for the entire response.
using (var reader = new StreamReader((await blob.QueryAsync(
query,
options)).Value.Content))
{
using (var parser = new CsvReader
(reader, new CsvConfiguration(CultureInfo.CurrentCulture) { HasHeaderRecord = true }))
{
while (await parser.ReadAsync())
{
Console.Out.WriteLine(String.Join(" ", parser.Parser.Record));
}
}
}
}
catch (Exception ex)
{
System.Windows.Forms.MessageBox.Show("Exception: " + ex.ToString());
}
}
Die Methode BlockBlobClient.openInputStream()
sendet die Abfrage an die Abfragebeschleunigungs-API und streamt dann die Ergebnisse als InputStream
-Objekt, das wie jedes andere InputStream-Objekt gelesen werden kann, an die Anwendung zurück.
static void QueryHemingway(BlobClient blobClient) {
String expression = "SELECT * FROM BlobStorage WHERE _3 = 'Hemingway, Ernest, 1899-1961'";
DumpQueryCsv(blobClient, expression, true);
}
static void DumpQueryCsv(BlobClient blobClient, String query, Boolean headers) {
try {
BlobQuerySerialization input = new BlobQueryDelimitedSerialization()
.setRecordSeparator('\n')
.setColumnSeparator(',')
.setHeadersPresent(headers)
.setFieldQuote('\0')
.setEscapeChar('\\');
BlobQuerySerialization output = new BlobQueryDelimitedSerialization()
.setRecordSeparator('\n')
.setColumnSeparator(',')
.setHeadersPresent(true)
.setFieldQuote('\0')
.setEscapeChar('\n');
Consumer<BlobQueryError> errorConsumer = System.out::println;
Consumer<BlobQueryProgress> progressConsumer = progress -> System.out.println("total bytes read: " + progress.getBytesScanned());
BlobQueryOptions queryOptions = new BlobQueryOptions(query)
.setInputSerialization(input)
.setOutputSerialization(output)
.setErrorConsumer(errorConsumer)
.setProgressConsumer(progressConsumer);
/* Open the query input stream. */
InputStream stream = blobClient.openQueryInputStream(queryOptions).getValue();
try (BufferedReader reader = new BufferedReader(new InputStreamReader(stream))) {
/* Read from stream like you normally would. */
for (CSVRecord record : CSVParser.parse(reader, CSVFormat.EXCEL.withHeader())) {
System.out.println(record.toString());
}
}
} catch (Exception e) {
System.err.println("Exception: " + e.toString());
e.printStackTrace(System.err);
}
}
def query_hemingway(blob: BlobClient):
query = "SELECT * FROM BlobStorage WHERE _3 = 'Hemingway, Ernest, 1899-1961'"
dump_query_csv(blob, query, False)
def dump_query_csv(blob: BlobClient, query: str, headers: bool):
qa_reader = blob.query_blob(query, blob_format=DelimitedTextDialect(has_header=headers), on_error=report_error, encoding='utf-8')
# records() returns a generator that will stream results as received. It will not block pending all results.
csv_reader = csv.reader(qa_reader.records())
for row in csv_reader:
print("*".join(row))
In diesem Beispiel wird die Abfrage an die Abfragebeschleunigung-API gesendet, und die Ergebnisse werden anschließend zurückgestreamt. Das blob
-Objekt, das an die Hilfsfunktion queryHemingway
übergeben wird, ist vom Typ BlockBlobClient. Informationen zum Abrufen eines BlockBlobClient-Objekts finden Sie unter Schnellstart: Verwalten von Blobs per JavaScript v12 SDK in Node.js.
async function queryHemingway(blob)
{
const query = "SELECT * FROM BlobStorage WHERE _3 = 'Hemingway, Ernest, 1899-1961'";
await dumpQueryCsv(blob, query, false);
}
async function dumpQueryCsv(blob, query, headers)
{
var response = await blob.query(query, {
inputTextConfiguration: {
kind: "csv",
recordSeparator: '\n',
hasHeaders: headers
},
outputTextConfiguration: {
kind: "csv",
recordSeparator: '\n',
hasHeaders: true
},
onProgress: (progress) => console.log(`Data read: ${progress.loadedBytes}`),
onError: (err) => console.error(`Error: ${err.position}:${err.name}:${err.description}`)});
return new Promise(
function (resolve, reject) {
csv.parseStream(response.readableStreamBody)
.on('data', row => console.log(row))
.on('error', error => {
console.error(error);
reject(error);
})
.on('end', rowCount => resolve());
});
}
Abrufen bestimmter Spalten
Sie können die Ergebnisse auf eine Teilmenge der Spalten beschränken. Auf diese Weise rufen Sie nur die Spalten ab, die zum Ausführen einer bestimmten Berechnung benötigt werden. Dadurch werden die Anwendungsleistung verbessert und Kosten reduziert, da weniger Daten über das Netzwerk übertragen werden.
Hinweis
Die maximale Anzahl von Spalten, auf die Sie ihre Ergebnisse beschränken können, ist 49. Wenn Ihre Ergebnisse mehr als 49 Spalten enthalten müssen, verwenden Sie für den SELECT-Ausdruck ein Platzhalterzeichen (*
) (Beispiel: SELECT *
).
Dieser Code ruft nur die Spalte BibNum
für alle Bücher im Dataset ab. Außerdem werden die Informationen aus der Kopfzeile in der Quelldatei verwendet, um auf Spalten in der Abfrage zu verweisen.
Function Get-QueryCsv($ctx, $container, $blob, $query, $hasheaders) {
$tempfile = New-TemporaryFile
$informat = New-AzStorageBlobQueryConfig -AsCsv -HasHeader:$hasheaders
Get-AzStorageBlobQueryResult -Context $ctx -Container $container -Blob $blob -InputTextConfiguration $informat -OutputTextConfiguration (New-AzStorageBlobQueryConfig -AsCsv -HasHeader) -ResultFile $tempfile.FullName -QueryString $query -Force
Get-Content $tempfile.FullName
}
$container = "data"
$blob = "csv/csv-general/seattle-library-with-headers.csv"
Get-QueryCsv $ctx $container $blob "SELECT BibNum FROM BlobStorage" $true
static async Task QueryBibNum(BlockBlobClient blob)
{
string query = @"SELECT BibNum FROM BlobStorage";
await DumpQueryCsv(blob, query, true);
}
static void QueryBibNum(BlobClient blobClient)
{
String expression = "SELECT BibNum FROM BlobStorage";
DumpQueryCsv(blobClient, expression, true);
}
def query_bibnum(blob: BlobClient):
query = "SELECT BibNum FROM BlobStorage"
dump_query_csv(blob, query, True)
async function queryBibNum(blob)
{
const query = "SELECT BibNum FROM BlobStorage";
await dumpQueryCsv(blob, query, true);
}
Im folgenden Code werden Zeilenfilterung und Spaltenprojektionen in derselben Abfrage kombiniert.
Get-QueryCsv $ctx $container $blob $query $true
Function Get-QueryCsv($ctx, $container, $blob, $query, $hasheaders) {
$tempfile = New-TemporaryFile
$informat = New-AzStorageBlobQueryConfig -AsCsv -HasHeader:$hasheaders
Get-AzStorageBlobQueryResult -Context $ctx -Container $container -Blob $blob -InputTextConfiguration $informat -OutputTextConfiguration (New-AzStorageBlobQueryConfig -AsCsv -HasHeader) -ResultFile $tempfile.FullName -QueryString $query -Force
Get-Content $tempfile.FullName
}
$container = "data"
$query = "SELECT BibNum, Title, Author, ISBN, Publisher, ItemType
FROM BlobStorage
WHERE ItemType IN
('acdvd', 'cadvd', 'cadvdnf', 'calndvd', 'ccdvd', 'ccdvdnf', 'jcdvd', 'nadvd', 'nadvdnf', 'nalndvd', 'ncdvd', 'ncdvdnf')"
static async Task QueryDvds(BlockBlobClient blob)
{
string query = @"SELECT BibNum, Title, Author, ISBN, Publisher, ItemType
FROM BlobStorage
WHERE ItemType IN
('acdvd', 'cadvd', 'cadvdnf', 'calndvd', 'ccdvd', 'ccdvdnf', 'jcdvd', 'nadvd', 'nadvdnf', 'nalndvd', 'ncdvd', 'ncdvdnf')";
await DumpQueryCsv(blob, query, true);
}
static void QueryDvds(BlobClient blobClient)
{
String expression = "SELECT BibNum, Title, Author, ISBN, Publisher, ItemType " +
"FROM BlobStorage " +
"WHERE ItemType IN " +
" ('acdvd', 'cadvd', 'cadvdnf', 'calndvd', 'ccdvd', 'ccdvdnf', 'jcdvd', 'nadvd', 'nadvdnf', 'nalndvd', 'ncdvd', 'ncdvdnf')";
DumpQueryCsv(blobClient, expression, true);
}
def query_dvds(blob: BlobClient):
query = "SELECT BibNum, Title, Author, ISBN, Publisher, ItemType "\
"FROM BlobStorage "\
"WHERE ItemType IN "\
" ('acdvd', 'cadvd', 'cadvdnf', 'calndvd', 'ccdvd', 'ccdvdnf', 'jcdvd', 'nadvd', 'nadvdnf', 'nalndvd', 'ncdvd', 'ncdvdnf')"
dump_query_csv(blob, query, True)
async function queryDvds(blob)
{
const query = "SELECT BibNum, Title, Author, ISBN, Publisher, ItemType " +
"FROM BlobStorage " +
"WHERE ItemType IN " +
" ('acdvd', 'cadvd', 'cadvdnf', 'calndvd', 'ccdvd', 'ccdvdnf', 'jcdvd', 'nadvd', 'nadvdnf', 'nalndvd', 'ncdvd', 'ncdvdnf')";
await dumpQueryCsv(blob, query, true);
}
Nächste Schritte