In diesem Artikel wird erläutert, wie Sie die Abfragebeschleunigung verwenden, um eine Teilmenge von Daten aus Ihrem Speicherkonto abzurufen.
Mit der Abfragebeschleunigung können Anwendungen und Analyseframeworks die Datenverarbeitung erheblich optimieren, indem nur die Daten abgerufen werden, die sie zum Ausführen eines bestimmten Vorgangs benötigen. Weitere Informationen finden Sie unter Azure Data Lake Storage Query Acceleration.
Voraussetzungen
Sie benötigen ein Azure-Abonnement, um auf Azure Storage zuzugreifen. Wenn Sie noch kein Abonnement haben, können Sie ein kostenloses Konto erstellen, bevor Sie beginnen.
Ein allgemeines v2-Speicherkonto . siehe Erstellen eines Speicherkontos.
Doppelte Verschlüsselung wird nicht unterstützt.
Wenn Sie eine JSON-Datei abfragen, sollte jede Datensatzgröße in dieser Datei kleiner als 1 MB sein.
Wählen Sie eine Registerkarte aus, um alle SDK-spezifischen Voraussetzungen anzuzeigen.
Für die Verwendung des Node.js SDK sind keine zusätzlichen Voraussetzungen erforderlich.
Richten Sie Ihre Umgebung ein
Schritt 1: Installieren von Paketen
Installieren Sie die Az-Modulversion 4.6.0 oder höher.
Install-Module -Name Az -Repository PSGallery -Force
Um von einer älteren Version von Az zu aktualisieren, führen Sie den folgenden Befehl aus:
Update-Module -Name Az
Öffnen Sie eine Eingabeaufforderung, und ändern Sie das Verzeichnis (cd) in Ihrem Projektordner. Beispiel:
cd myProject
Installieren Sie die 12.5.0-preview.6 Version oder höher der Azure Blob Storage-Clientbibliothek für .NET-Paket mithilfe des dotnet add package Befehls.
dotnet add package Azure.Storage.Blobs -v 12.8.0
Die In diesem Artikel angezeigten Beispiele analysieren eine CSV-Datei mithilfe der CsvHelper-Bibliothek . Um diese Bibliothek zu verwenden, verwenden Sie den folgenden Befehl.
dotnet add package CsvHelper
Öffnen Sie die pom.xml Datei Ihres Projekts in einem Text-Editor. Fügen Sie der Gruppe der Abhängigkeiten das folgende Abhängigkeitselement hinzu:
<!-- Request static dependencies from Maven -->
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-csv</artifactId>
<version>1.8</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-blob</artifactId>
<version>12.8.0-beta.1</version>
</dependency>
Installieren Sie die Azure Data Lake Storage-Clientbibliothek für Python mithilfe von Pip.
pip install azure-storage-blob==12.4.0
Installieren Sie die Data Lake-Clientbibliothek für JavaScript, indem Sie ein Terminalfenster öffnen und dann den folgenden Befehl eingeben.
npm install @azure/storage-blob
npm install @fast-csv/parse
Schritt 2: Hinzufügen von Aussagen
Fügen Sie diese using Anweisungen am Anfang der Codedatei hinzu.
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;
using Azure.Storage.Blobs.Specialized;
Die Abfragebeschleunigung ruft CSV- und JSON-formatierte Daten ab. Stellen Sie daher sicher, dass Sie verwendungsanweisungen für beliebige CSV- oder JSON-Analysebibliotheken hinzufügen, die Sie verwenden möchten. Die Beispiele, die in diesem Artikel angezeigt werden, analysieren eine CSV-Datei mithilfe der CsvHelper-Bibliothek , die in NuGet verfügbar ist. Daher fügen wir diese using Anweisungen am Anfang der Codedatei hinzu.
using CsvHelper;
using CsvHelper.Configuration;
Zum Kompilieren der Beispiele, die in diesem Artikel gezeigt werden, müssen Sie auch die folgenden using-Anweisungen hinzufügen.
using System.Threading.Tasks;
using System.IO;
using System.Globalization;
Fügen Sie diese import Anweisungen am Anfang der Codedatei hinzu.
import com.azure.storage.blob.*;
import com.azure.storage.blob.options.*;
import com.azure.storage.blob.models.*;
import com.azure.storage.common.*;
import java.io.*;
import java.util.function.Consumer;
import org.apache.commons.csv.*;
Fügen Sie diese Importanweisungen am Anfang der Codedatei hinzu.
import sys, csv
from azure.storage.blob import BlobServiceClient, ContainerClient, BlobClient, DelimitedTextDialect, BlobQueryError
Fügen Sie das storage-blob Modul hinzu, indem Sie diese Anweisung oben in der Codedatei platzieren.
const { BlobServiceClient } = require("@azure/storage-blob");
Die Abfragebeschleunigung ruft CSV- und JSON-formatierte Daten ab. Stellen Sie daher sicher, dass Sie Anweisungen für beliebige CSV- oder JSON-Analysemodule hinzufügen, die Sie verwenden möchten. Die Beispiele, die in diesem Artikel angezeigt werden, analysieren eine CSV-Datei mithilfe des Fast-CSV-Moduls . Daher fügen wir diese Anweisung am Anfang der Codedatei hinzu.
const csv = require('@fast-csv/parse');
Abrufen von Daten mithilfe eines Filters
Sie können SQL verwenden, um die Zeilenfilter-Prädikate und Spaltenprojektionen in einer Abfragebeschleunigungsanforderung anzugeben. Der folgende Code fragt eine CSV-Datei im Speicher ab und gibt alle Datenzeilen zurück, in denen die dritte Spalte mit dem Wert Hemingway, Ernestübereinstimmt.
In der SQL-Abfrage wird das Schlüsselwort BlobStorage verwendet, um die datei anzuzeigen, die abgefragt wird.
Spaltenverweise werden als _N angegeben, wobei die erste Spalte _1 ist. Wenn die Quelldatei eine Kopfzeile enthält, können Sie anhand des Namens, der in der Kopfzeile angegeben ist, auf Spalten verweisen.
Function Get-QueryCsv($ctx, $container, $blob, $query, $hasheaders) {
$tempfile = New-TemporaryFile
$informat = New-AzStorageBlobQueryConfig -AsCsv -HasHeader:$hasheaders -RecordSeparator "`n" -ColumnSeparator "," -QuotationCharacter """" -EscapeCharacter "\"
Get-AzStorageBlobQueryResult -Context $ctx -Container $container -Blob $blob -InputTextConfiguration $informat -OutputTextConfiguration (New-AzStorageBlobQueryConfig -AsCsv -HasHeader -RecordSeparator "`n" -ColumnSeparator "," -QuotationCharacter """" -EscapeCharacter "\") -ResultFile $tempfile.FullName -QueryString $query -Force
Get-Content $tempfile.FullName
}
$container = "data"
$blob = "csv/csv-general/seattle-library.csv"
Get-QueryCsv $ctx $container $blob "SELECT * FROM BlobStorage WHERE _3 = 'Hemingway, Ernest, 1899-1961'" $false
Die asynchrone Methode BlockBlobClient.QueryAsync sendet die Abfrage an die Abfragebeschleunigungs-API und streamt dann die Ergebnisse als Stream-Objekt zurück an die Anwendung.
static async Task QueryHemingway(BlockBlobClient blob)
{
string query = @"SELECT * FROM BlobStorage WHERE _3 = 'Hemingway, Ernest, 1899-1961'";
await DumpQueryCsv(blob, query, false);
}
private static async Task DumpQueryCsv(BlockBlobClient blob, string query, bool headers)
{
try
{
var options = new BlobQueryOptions()
{
InputTextConfiguration = new BlobQueryCsvTextOptions()
{
HasHeaders = true,
RecordSeparator = "\n",
ColumnSeparator = ",",
EscapeCharacter = '\\',
QuotationCharacter = '"'
},
OutputTextConfiguration = new BlobQueryCsvTextOptions()
{
HasHeaders = true,
RecordSeparator = "\n",
ColumnSeparator = ",",
EscapeCharacter = '\\',
QuotationCharacter = '"' },
ProgressHandler = new Progress<long>((finishedBytes) =>
Console.Error.WriteLine($"Data read: {finishedBytes}"))
};
options.ErrorHandler += (BlobQueryError err) => {
Console.ForegroundColor = ConsoleColor.Red;
Console.Error.WriteLine($"Error: {err.Position}:{err.Name}:{err.Description}");
Console.ResetColor();
};
// BlobDownloadInfo exposes a Stream that will make results available when received rather than blocking for the entire response.
using (var reader = new StreamReader((await blob.QueryAsync(
query,
options)).Value.Content))
{
using (var parser = new CsvReader
(reader, new CsvConfiguration(CultureInfo.CurrentCulture) { HasHeaderRecord = true }))
{
while (await parser.ReadAsync())
{
Console.Out.WriteLine(String.Join(" ", parser.Parser.Record));
}
}
}
}
catch (Exception ex)
{
System.Windows.Forms.MessageBox.Show("Exception: " + ex.ToString());
}
}
Die Methode BlockBlobClient.openInputStream() sendet die Abfrage an die Abfragebeschleunigungs-API und streamt dann die Ergebnisse als InputStream Objekt zurück an die Anwendung, das wie jedes andere InputStream-Objekt gelesen werden kann.
static void QueryHemingway(BlobClient blobClient) {
String expression = "SELECT * FROM BlobStorage WHERE _3 = 'Hemingway, Ernest, 1899-1961'";
DumpQueryCsv(blobClient, expression, true);
}
static void DumpQueryCsv(BlobClient blobClient, String query, Boolean headers) {
try {
BlobQuerySerialization input = new BlobQueryDelimitedSerialization()
.setRecordSeparator('\n')
.setColumnSeparator(',')
.setHeadersPresent(headers)
.setFieldQuote('\0')
.setEscapeChar('\\');
BlobQuerySerialization output = new BlobQueryDelimitedSerialization()
.setRecordSeparator('\n')
.setColumnSeparator(',')
.setHeadersPresent(true)
.setFieldQuote('\0')
.setEscapeChar('\n');
Consumer<BlobQueryError> errorConsumer = System.out::println;
Consumer<BlobQueryProgress> progressConsumer = progress -> System.out.println("total bytes read: " + progress.getBytesScanned());
BlobQueryOptions queryOptions = new BlobQueryOptions(query)
.setInputSerialization(input)
.setOutputSerialization(output)
.setErrorConsumer(errorConsumer)
.setProgressConsumer(progressConsumer);
/* Open the query input stream. */
InputStream stream = blobClient.openQueryInputStream(queryOptions).getValue();
try (BufferedReader reader = new BufferedReader(new InputStreamReader(stream))) {
/* Read from stream like you normally would. */
for (CSVRecord record : CSVParser.parse(reader, CSVFormat.EXCEL.withHeader())) {
System.out.println(record.toString());
}
}
} catch (Exception e) {
System.err.println("Exception: " + e.toString());
e.printStackTrace(System.err);
}
}
def query_hemingway(blob: BlobClient):
query = "SELECT * FROM BlobStorage WHERE _3 = 'Hemingway, Ernest, 1899-1961'"
dump_query_csv(blob, query, False)
def dump_query_csv(blob: BlobClient, query: str, headers: bool):
qa_reader = blob.query_blob(query, blob_format=DelimitedTextDialect(has_header=headers), on_error=report_error, encoding='utf-8')
# records() returns a generator that will stream results as received. It will not block pending all results.
csv_reader = csv.reader(qa_reader.records())
for row in csv_reader:
print("*".join(row))
In diesem Beispiel wird die Abfrage an die Abfragebeschleunigungs-API gesendet und dann die Ergebnisse zurück gestreamt. Das blob an die queryHemingway Hilfsfunktion übergebene Objekt ist vom Typ "BlockBlobClient". Weitere Informationen zum Abrufen eines BlockBlobClient-Objekts finden Sie in der Schnellstartanleitung: Verwalten von Blobs mit JavaScript v12 SDK in Node.js.
async function queryHemingway(blob)
{
const query = "SELECT * FROM BlobStorage WHERE _3 = 'Hemingway, Ernest, 1899-1961'";
await dumpQueryCsv(blob, query, false);
}
async function dumpQueryCsv(blob, query, headers)
{
var response = await blob.query(query, {
inputTextConfiguration: {
kind: "csv",
recordSeparator: '\n',
hasHeaders: headers
},
outputTextConfiguration: {
kind: "csv",
recordSeparator: '\n',
hasHeaders: true
},
onProgress: (progress) => console.log(`Data read: ${progress.loadedBytes}`),
onError: (err) => console.error(`Error: ${err.position}:${err.name}:${err.description}`)});
return new Promise(
function (resolve, reject) {
csv.parseStream(response.readableStreamBody)
.on('data', row => console.log(row))
.on('error', error => {
console.error(error);
reject(error);
})
.on('end', rowCount => resolve());
});
}
Abrufen bestimmter Spalten
Sie können die Ergebnisse auf eine Teilmenge von Spalten beschränken. Auf diese Weise rufen Sie nur die Spalten ab, die zum Ausführen einer bestimmten Berechnung erforderlich sind. Dies verbessert die Anwendungsleistung und verringert die Kosten, da weniger Daten über das Netzwerk übertragen werden.
Hinweis
Die maximale Anzahl von Spalten, auf die Sie ihre Ergebnisse beschränken können, beträgt 49. Wenn Die Ergebnisse mehr als 49 Spalten enthalten müssen, verwenden Sie für den SELECT-Ausdruck ein Wildcardzeichen (*z. B.: SELECT *).
Dieser Code ruft nur die BibNum Spalte für alle Bücher im Dataset ab. Außerdem werden die Informationen aus der Kopfzeile in der Quelldatei verwendet, um auf Spalten in der Abfrage zu verweisen.
Function Get-QueryCsv($ctx, $container, $blob, $query, $hasheaders) {
$tempfile = New-TemporaryFile
$informat = New-AzStorageBlobQueryConfig -AsCsv -HasHeader:$hasheaders
Get-AzStorageBlobQueryResult -Context $ctx -Container $container -Blob $blob -InputTextConfiguration $informat -OutputTextConfiguration (New-AzStorageBlobQueryConfig -AsCsv -HasHeader) -ResultFile $tempfile.FullName -QueryString $query -Force
Get-Content $tempfile.FullName
}
$container = "data"
$blob = "csv/csv-general/seattle-library-with-headers.csv"
Get-QueryCsv $ctx $container $blob "SELECT BibNum FROM BlobStorage" $true
static async Task QueryBibNum(BlockBlobClient blob)
{
string query = @"SELECT BibNum FROM BlobStorage";
await DumpQueryCsv(blob, query, true);
}
static void QueryBibNum(BlobClient blobClient)
{
String expression = "SELECT BibNum FROM BlobStorage";
DumpQueryCsv(blobClient, expression, true);
}
def query_bibnum(blob: BlobClient):
query = "SELECT BibNum FROM BlobStorage"
dump_query_csv(blob, query, True)
async function queryBibNum(blob)
{
const query = "SELECT BibNum FROM BlobStorage";
await dumpQueryCsv(blob, query, true);
}
Der folgende Code kombiniert Zeilenfilterung und Spaltenprojektionen in derselben Abfrage.
Get-QueryCsv $ctx $container $blob $query $true
Function Get-QueryCsv($ctx, $container, $blob, $query, $hasheaders) {
$tempfile = New-TemporaryFile
$informat = New-AzStorageBlobQueryConfig -AsCsv -HasHeader:$hasheaders
Get-AzStorageBlobQueryResult -Context $ctx -Container $container -Blob $blob -InputTextConfiguration $informat -OutputTextConfiguration (New-AzStorageBlobQueryConfig -AsCsv -HasHeader) -ResultFile $tempfile.FullName -QueryString $query -Force
Get-Content $tempfile.FullName
}
$container = "data"
$query = "SELECT BibNum, Title, Author, ISBN, Publisher, ItemType
FROM BlobStorage
WHERE ItemType IN
('acdvd', 'cadvd', 'cadvdnf', 'calndvd', 'ccdvd', 'ccdvdnf', 'jcdvd', 'nadvd', 'nadvdnf', 'nalndvd', 'ncdvd', 'ncdvdnf')"
static async Task QueryDvds(BlockBlobClient blob)
{
string query = @"SELECT BibNum, Title, Author, ISBN, Publisher, ItemType
FROM BlobStorage
WHERE ItemType IN
('acdvd', 'cadvd', 'cadvdnf', 'calndvd', 'ccdvd', 'ccdvdnf', 'jcdvd', 'nadvd', 'nadvdnf', 'nalndvd', 'ncdvd', 'ncdvdnf')";
await DumpQueryCsv(blob, query, true);
}
static void QueryDvds(BlobClient blobClient)
{
String expression = "SELECT BibNum, Title, Author, ISBN, Publisher, ItemType " +
"FROM BlobStorage " +
"WHERE ItemType IN " +
" ('acdvd', 'cadvd', 'cadvdnf', 'calndvd', 'ccdvd', 'ccdvdnf', 'jcdvd', 'nadvd', 'nadvdnf', 'nalndvd', 'ncdvd', 'ncdvdnf')";
DumpQueryCsv(blobClient, expression, true);
}
def query_dvds(blob: BlobClient):
query = "SELECT BibNum, Title, Author, ISBN, Publisher, ItemType "\
"FROM BlobStorage "\
"WHERE ItemType IN "\
" ('acdvd', 'cadvd', 'cadvdnf', 'calndvd', 'ccdvd', 'ccdvdnf', 'jcdvd', 'nadvd', 'nadvdnf', 'nalndvd', 'ncdvd', 'ncdvdnf')"
dump_query_csv(blob, query, True)
async function queryDvds(blob)
{
const query = "SELECT BibNum, Title, Author, ISBN, Publisher, ItemType " +
"FROM BlobStorage " +
"WHERE ItemType IN " +
" ('acdvd', 'cadvd', 'cadvdnf', 'calndvd', 'ccdvd', 'ccdvdnf', 'jcdvd', 'nadvd', 'nadvdnf', 'nalndvd', 'ncdvd', 'ncdvdnf')";
await dumpQueryCsv(blob, query, true);
}
Nächste Schritte