Ez a cikk bemutatja, hogyan lehet lekérdezésgyorsítással lekérni egy adathalmazt a tárfiókból.
A lekérdezések gyorsítása lehetővé teszi, hogy az alkalmazások és az elemzési keretrendszerek jelentősen optimalizálják az adatfeldolgozást úgy, hogy csak azokat az adatokat kérik le, amelyekre egy adott művelet végrehajtásához szükségük van. További információkért tekintse meg az Azure Data Lake Storage lekérdezésgyorsítását.
Előfeltételek
Az Azure Storage eléréséhez Azure-előfizetésre lesz szüksége. Ha még nem rendelkezik előfizetéssel, a kezdés előtt hozzon létre egy ingyenes fiókot .
Általános célú v2-tárfiók. lásd : Tárfiók létrehozása.
A kettős titkosítás nem támogatott.
JSON-fájl lekérdezése esetén a fájl minden rekordméretének 1 MB-nál kisebbnek kell lennie.
Válasszon egy lapot az SDK-specifikus előfeltételek megtekintéséhez.
A Node.js SDK használatához nincsenek további előfeltételek.
Saját környezet beállítása
1. lépés: Csomagok telepítése
Telepítse az Az modul 4.6.0-s vagy újabb verzióját.
Install-Module -Name Az -Repository PSGallery -Force
Az Az régebbi verziójáról való frissítéshez futtassa a következő parancsot:
Update-Module -Name Az
Nyisson meg egy parancssort, és módosítsa a könyvtárat (cd) a projektmappába, például:
cd myProject
Telepítse az 12.5.0-preview.6 Azure Blob Storage .NET-csomaghoz készült ügyfélkódtárának verzióját vagy újabb verzióját a dotnet add package parancs használatával.
dotnet add package Azure.Storage.Blobs -v 12.8.0
A cikkben szereplő példák egy CSV-fájlt elemeznek a CsvHelper-kódtár használatával. A kódtár használatához használja az alábbi parancsot.
dotnet add package CsvHelper
Nyissa meg a projektpom.xml fájlját egy szövegszerkesztőben. Adja hozzá a következő függőségi elemeket a függőségek csoportjához.
<!-- Request static dependencies from Maven -->
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-csv</artifactId>
<version>1.8</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-blob</artifactId>
<version>12.8.0-beta.1</version>
</dependency>
Telepítse a Pythonhoz készült Azure Data Lake Storage ügyfélkódtárat pip használatával.
pip install azure-storage-blob==12.4.0
Telepítse a JavaScripthez készült Data Lake-ügyfélkódtárat egy terminálablak megnyitásával, majd írja be a következő parancsot.
npm install @azure/storage-blob
npm install @fast-csv/parse
2. lépés: Állítások hozzáadása
Adja hozzá ezeket az using utasításokat a kódfájl elejéhez.
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;
using Azure.Storage.Blobs.Specialized;
A lekérdezésgyorsítás CSV- és JSON-formátumú adatokat kér le. Ezért mindenképpen adjon hozzá használati utasításokat a használni kívánt CSV- vagy JSON-elemzési kódtárakhoz. A cikkben szereplő példák egy CSV-fájlt elemeznek a NuGeten elérhető CsvHelper-kódtár használatával. Ezért ezeket az using utasításokat hozzáadjuk a kódfájl elejéhez.
using CsvHelper;
using CsvHelper.Configuration;
A cikkben bemutatott példák összeállításához ezeket using az utasításokat is hozzá kell adnia.
using System.Threading.Tasks;
using System.IO;
using System.Globalization;
Adja hozzá ezeket az import utasításokat a kódfájl elejéhez.
import com.azure.storage.blob.*;
import com.azure.storage.blob.options.*;
import com.azure.storage.blob.models.*;
import com.azure.storage.common.*;
import java.io.*;
import java.util.function.Consumer;
import org.apache.commons.csv.*;
Adja hozzá ezeket az importálási utasításokat a kódfájl tetejére.
import sys, csv
from azure.storage.blob import BlobServiceClient, ContainerClient, BlobClient, DelimitedTextDialect, BlobQueryError
A modult storage-blob úgy vegye fel, hogy ezt az utasítást a kódfájl tetejére helyezi.
const { BlobServiceClient } = require("@azure/storage-blob");
A lekérdezésgyorsítás CSV- és JSON-formátumú adatokat kér le. Ezért mindenképpen adjon hozzá utasításokat a használni kívánt CSV- vagy JSON-elemző modulokhoz. A cikkben szereplő példák egy CSV-fájlt elemeznek a fast-csv modul használatával. Ezért ezt az utasítást hozzáadjuk a kódfájl elejéhez.
const csv = require('@fast-csv/parse');
Adatok lekérése szűrővel
Az SQL használatával megadhatja a sorszűrő predikátumait és oszlopvetületeit egy lekérdezésgyorsítási kérelemben. Az alábbi kód lekérdez egy CSV-fájlt a tárolóban, és visszaadja az összes adatsort, ahol a harmadik oszlop megegyezik az értékkel Hemingway, Ernest.
Az SQL-lekérdezésben a kulcsszó BlobStorage a lekérdezett fájl jelölésére szolgál.
Az oszlophivatkozások _N formában vannak megadva, ahol az első oszlop a _1. Ha a forrásfájl fejlécsort tartalmaz, akkor a fejlécsorban megadott név alapján hivatkozhat az oszlopokra.
Function Get-QueryCsv($ctx, $container, $blob, $query, $hasheaders) {
$tempfile = New-TemporaryFile
$informat = New-AzStorageBlobQueryConfig -AsCsv -HasHeader:$hasheaders -RecordSeparator "`n" -ColumnSeparator "," -QuotationCharacter """" -EscapeCharacter "\"
Get-AzStorageBlobQueryResult -Context $ctx -Container $container -Blob $blob -InputTextConfiguration $informat -OutputTextConfiguration (New-AzStorageBlobQueryConfig -AsCsv -HasHeader -RecordSeparator "`n" -ColumnSeparator "," -QuotationCharacter """" -EscapeCharacter "\") -ResultFile $tempfile.FullName -QueryString $query -Force
Get-Content $tempfile.FullName
}
$container = "data"
$blob = "csv/csv-general/seattle-library.csv"
Get-QueryCsv $ctx $container $blob "SELECT * FROM BlobStorage WHERE _3 = 'Hemingway, Ernest, 1899-1961'" $false
Az aszinkron metódus BlockBlobClient.QueryAsync elküldi a lekérdezést a lekérdezésgyorsító API-nak, majd stream objektumként streameli az eredményeket az alkalmazásnak.
static async Task QueryHemingway(BlockBlobClient blob)
{
string query = @"SELECT * FROM BlobStorage WHERE _3 = 'Hemingway, Ernest, 1899-1961'";
await DumpQueryCsv(blob, query, false);
}
private static async Task DumpQueryCsv(BlockBlobClient blob, string query, bool headers)
{
try
{
var options = new BlobQueryOptions()
{
InputTextConfiguration = new BlobQueryCsvTextOptions()
{
HasHeaders = true,
RecordSeparator = "\n",
ColumnSeparator = ",",
EscapeCharacter = '\\',
QuotationCharacter = '"'
},
OutputTextConfiguration = new BlobQueryCsvTextOptions()
{
HasHeaders = true,
RecordSeparator = "\n",
ColumnSeparator = ",",
EscapeCharacter = '\\',
QuotationCharacter = '"' },
ProgressHandler = new Progress<long>((finishedBytes) =>
Console.Error.WriteLine($"Data read: {finishedBytes}"))
};
options.ErrorHandler += (BlobQueryError err) => {
Console.ForegroundColor = ConsoleColor.Red;
Console.Error.WriteLine($"Error: {err.Position}:{err.Name}:{err.Description}");
Console.ResetColor();
};
// BlobDownloadInfo exposes a Stream that will make results available when received rather than blocking for the entire response.
using (var reader = new StreamReader((await blob.QueryAsync(
query,
options)).Value.Content))
{
using (var parser = new CsvReader
(reader, new CsvConfiguration(CultureInfo.CurrentCulture) { HasHeaderRecord = true }))
{
while (await parser.ReadAsync())
{
Console.Out.WriteLine(String.Join(" ", parser.Parser.Record));
}
}
}
}
catch (Exception ex)
{
System.Windows.Forms.MessageBox.Show("Exception: " + ex.ToString());
}
}
A metódus BlockBlobClient.openInputStream() elküldi a lekérdezést a lekérdezésgyorsító API-nak, majd az eredményeket egy objektumként InputStream továbbítja az alkalmazásnak, amely bármely más InputStream-objektumhoz hasonlóan olvasható.
static void QueryHemingway(BlobClient blobClient) {
String expression = "SELECT * FROM BlobStorage WHERE _3 = 'Hemingway, Ernest, 1899-1961'";
DumpQueryCsv(blobClient, expression, true);
}
static void DumpQueryCsv(BlobClient blobClient, String query, Boolean headers) {
try {
BlobQuerySerialization input = new BlobQueryDelimitedSerialization()
.setRecordSeparator('\n')
.setColumnSeparator(',')
.setHeadersPresent(headers)
.setFieldQuote('\0')
.setEscapeChar('\\');
BlobQuerySerialization output = new BlobQueryDelimitedSerialization()
.setRecordSeparator('\n')
.setColumnSeparator(',')
.setHeadersPresent(true)
.setFieldQuote('\0')
.setEscapeChar('\n');
Consumer<BlobQueryError> errorConsumer = System.out::println;
Consumer<BlobQueryProgress> progressConsumer = progress -> System.out.println("total bytes read: " + progress.getBytesScanned());
BlobQueryOptions queryOptions = new BlobQueryOptions(query)
.setInputSerialization(input)
.setOutputSerialization(output)
.setErrorConsumer(errorConsumer)
.setProgressConsumer(progressConsumer);
/* Open the query input stream. */
InputStream stream = blobClient.openQueryInputStream(queryOptions).getValue();
try (BufferedReader reader = new BufferedReader(new InputStreamReader(stream))) {
/* Read from stream like you normally would. */
for (CSVRecord record : CSVParser.parse(reader, CSVFormat.EXCEL.withHeader())) {
System.out.println(record.toString());
}
}
} catch (Exception e) {
System.err.println("Exception: " + e.toString());
e.printStackTrace(System.err);
}
}
def query_hemingway(blob: BlobClient):
query = "SELECT * FROM BlobStorage WHERE _3 = 'Hemingway, Ernest, 1899-1961'"
dump_query_csv(blob, query, False)
def dump_query_csv(blob: BlobClient, query: str, headers: bool):
qa_reader = blob.query_blob(query, blob_format=DelimitedTextDialect(has_header=headers), on_error=report_error, encoding='utf-8')
# records() returns a generator that will stream results as received. It will not block pending all results.
csv_reader = csv.reader(qa_reader.records())
for row in csv_reader:
print("*".join(row))
Ez a példa elküldi a lekérdezést a lekérdezésgyorsító API-nak, majd visszaküldi az eredményeket. A blob segédfüggvénybe queryHemingway átadott objektum BlockBlobClient típusú.
A BlockBlobClient objektum beszerzéséről további információt talál a Rövid útmutató: Blobok kezelése a JavaScript v12 SDK-val a Node.js-ban.
async function queryHemingway(blob)
{
const query = "SELECT * FROM BlobStorage WHERE _3 = 'Hemingway, Ernest, 1899-1961'";
await dumpQueryCsv(blob, query, false);
}
async function dumpQueryCsv(blob, query, headers)
{
var response = await blob.query(query, {
inputTextConfiguration: {
kind: "csv",
recordSeparator: '\n',
hasHeaders: headers
},
outputTextConfiguration: {
kind: "csv",
recordSeparator: '\n',
hasHeaders: true
},
onProgress: (progress) => console.log(`Data read: ${progress.loadedBytes}`),
onError: (err) => console.error(`Error: ${err.position}:${err.name}:${err.description}`)});
return new Promise(
function (resolve, reject) {
csv.parseStream(response.readableStreamBody)
.on('data', row => console.log(row))
.on('error', error => {
console.error(error);
reject(error);
})
.on('end', rowCount => resolve());
});
}
Adott oszlopok lekérése
Az eredményeket az oszlopok egy részhalmazára is hatókörbe helyezheti. Így csak azokat az oszlopokat kéri le, amelyek egy adott számítás elvégzéséhez szükségesek. Ez javítja az alkalmazás teljesítményét, és csökkenti a költségeket, mivel kevesebb adat kerül át a hálózaton keresztül.
Megjegyzés
Az oszlopok maximális száma, amelyre az eredményeket korlátozhatja, 49. Ha az eredményeknek 49-nél több oszlopot kell tartalmazniuk, használjon helyettesítő karaktert (*) a SELECT kifejezéshez (például: SELECT *).
Ez a kód csak az BibNum adathalmaz összes könyvének oszlopát kéri le. A forrásfájl fejlécsorának adatait is felhasználja a lekérdezés oszlopainak hivatkozására.
Function Get-QueryCsv($ctx, $container, $blob, $query, $hasheaders) {
$tempfile = New-TemporaryFile
$informat = New-AzStorageBlobQueryConfig -AsCsv -HasHeader:$hasheaders
Get-AzStorageBlobQueryResult -Context $ctx -Container $container -Blob $blob -InputTextConfiguration $informat -OutputTextConfiguration (New-AzStorageBlobQueryConfig -AsCsv -HasHeader) -ResultFile $tempfile.FullName -QueryString $query -Force
Get-Content $tempfile.FullName
}
$container = "data"
$blob = "csv/csv-general/seattle-library-with-headers.csv"
Get-QueryCsv $ctx $container $blob "SELECT BibNum FROM BlobStorage" $true
static async Task QueryBibNum(BlockBlobClient blob)
{
string query = @"SELECT BibNum FROM BlobStorage";
await DumpQueryCsv(blob, query, true);
}
static void QueryBibNum(BlobClient blobClient)
{
String expression = "SELECT BibNum FROM BlobStorage";
DumpQueryCsv(blobClient, expression, true);
}
def query_bibnum(blob: BlobClient):
query = "SELECT BibNum FROM BlobStorage"
dump_query_csv(blob, query, True)
async function queryBibNum(blob)
{
const query = "SELECT BibNum FROM BlobStorage";
await dumpQueryCsv(blob, query, true);
}
Az alábbi kód egyesíti a sorszűréseket és az oszlopvetületeket ugyanabba a lekérdezésbe.
Get-QueryCsv $ctx $container $blob $query $true
Function Get-QueryCsv($ctx, $container, $blob, $query, $hasheaders) {
$tempfile = New-TemporaryFile
$informat = New-AzStorageBlobQueryConfig -AsCsv -HasHeader:$hasheaders
Get-AzStorageBlobQueryResult -Context $ctx -Container $container -Blob $blob -InputTextConfiguration $informat -OutputTextConfiguration (New-AzStorageBlobQueryConfig -AsCsv -HasHeader) -ResultFile $tempfile.FullName -QueryString $query -Force
Get-Content $tempfile.FullName
}
$container = "data"
$query = "SELECT BibNum, Title, Author, ISBN, Publisher, ItemType
FROM BlobStorage
WHERE ItemType IN
('acdvd', 'cadvd', 'cadvdnf', 'calndvd', 'ccdvd', 'ccdvdnf', 'jcdvd', 'nadvd', 'nadvdnf', 'nalndvd', 'ncdvd', 'ncdvdnf')"
static async Task QueryDvds(BlockBlobClient blob)
{
string query = @"SELECT BibNum, Title, Author, ISBN, Publisher, ItemType
FROM BlobStorage
WHERE ItemType IN
('acdvd', 'cadvd', 'cadvdnf', 'calndvd', 'ccdvd', 'ccdvdnf', 'jcdvd', 'nadvd', 'nadvdnf', 'nalndvd', 'ncdvd', 'ncdvdnf')";
await DumpQueryCsv(blob, query, true);
}
static void QueryDvds(BlobClient blobClient)
{
String expression = "SELECT BibNum, Title, Author, ISBN, Publisher, ItemType " +
"FROM BlobStorage " +
"WHERE ItemType IN " +
" ('acdvd', 'cadvd', 'cadvdnf', 'calndvd', 'ccdvd', 'ccdvdnf', 'jcdvd', 'nadvd', 'nadvdnf', 'nalndvd', 'ncdvd', 'ncdvdnf')";
DumpQueryCsv(blobClient, expression, true);
}
def query_dvds(blob: BlobClient):
query = "SELECT BibNum, Title, Author, ISBN, Publisher, ItemType "\
"FROM BlobStorage "\
"WHERE ItemType IN "\
" ('acdvd', 'cadvd', 'cadvdnf', 'calndvd', 'ccdvd', 'ccdvdnf', 'jcdvd', 'nadvd', 'nadvdnf', 'nalndvd', 'ncdvd', 'ncdvdnf')"
dump_query_csv(blob, query, True)
async function queryDvds(blob)
{
const query = "SELECT BibNum, Title, Author, ISBN, Publisher, ItemType " +
"FROM BlobStorage " +
"WHERE ItemType IN " +
" ('acdvd', 'cadvd', 'cadvdnf', 'calndvd', 'ccdvd', 'ccdvdnf', 'jcdvd', 'nadvd', 'nadvdnf', 'nalndvd', 'ncdvd', 'ncdvdnf')";
await dumpQueryCsv(blob, query, true);
}
Következő lépések