Adatok szűrése az Azure Data Lake Storage lekérdezésgyorsításával
Cikk
Ez a cikk bemutatja, hogyan lehet lekérdezésgyorsítással lekérni egy adathalmazt a tárfiókból.
A lekérdezések gyorsítása lehetővé teszi, hogy az alkalmazások és az elemzési keretrendszerek jelentősen optimalizálják az adatfeldolgozást úgy, hogy csak azokat az adatokat kérik le, amelyekre egy adott művelet végrehajtásához szükségük van. További információkért tekintse meg az Azure Data Lake Storage lekérdezésgyorsítását.
Előfeltételek
Az Azure Storage eléréséhez Azure-előfizetésre lesz szüksége. Ha még nem rendelkezik előfizetéssel, a kezdés előtt hozzon létre egy ingyenes fiókot .
Általános célú v2-tárfiók . lásd: Tárfiók létrehozása.
A kettős titkosítás nem támogatott.
JSON-fájl lekérdezése esetén a fájl minden rekordméretének 1 MB-nál kisebbnek kell lennie.
Válasszon egy lapot az SDK-specifikus előfeltételek megtekintéséhez.
Ez a cikk feltételezi, hogy Létrehozott egy Java-projektet az Apache Maven használatával. Ha szeretné, hogy az Apache Maven használatával hogyan hozhat létre projektet, tekintse meg a Beállítás című témakört.
Telepítse az Az modul 4.6.0-s vagy újabb verzióját.
Install-Module -Name Az -Repository PSGallery -Force
Az Az régebbi verziójáról való frissítéshez futtassa a következő parancsot:
Update-Module -Name Az
Nyisson meg egy parancssort, és módosítsa a könyvtárat (cd) a projektmappába, például:
cd myProject
Telepítse az 12.5.0-preview.6 Azure Blob Storage .NET-csomaghoz készült ügyfélkódtárának verzióját vagy újabb verzióját a dotnet add package parancs használatával.
dotnet add package Azure.Storage.Blobs -v 12.8.0
A cikkben szereplő példák egy CSV-fájlt elemeznek a CsvHelper-kódtár használatával. A kódtár használatához használja az alábbi parancsot.
dotnet add package CsvHelper
Nyissa meg a projekt pom.xml fájlját egy szövegszerkesztőben. Adja hozzá a következő függőségi elemeket a függőségek csoportjához.
Adja hozzá ezeket az using utasításokat a kódfájl elejéhez.
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;
using Azure.Storage.Blobs.Specialized;
A lekérdezésgyorsítás CSV- és JSON-formátumú adatokat kér le. Ezért mindenképpen adjon hozzá használati utasításokat a használni kívánt CSV- vagy Json-elemzési kódtárakhoz. A cikkben szereplő példák egy CSV-fájlt elemeznek a NuGeten elérhető CsvHelper-kódtár használatával. Ezért ezeket az using utasításokat hozzáadjuk a kódfájl elejéhez.
using CsvHelper;
using CsvHelper.Configuration;
A cikkben bemutatott példák fordításához ezeket using az utasításokat is hozzá kell adnia.
using System.Threading.Tasks;
using System.IO;
using System.Globalization;
Adja hozzá ezeket az import utasításokat a kódfájl elejéhez.
A lekérdezésgyorsítás CSV- és JSON-formátumú adatokat kér le. Ezért mindenképpen adjon hozzá utasításokat a használni kívánt CSV- vagy JSON-elemző modulokhoz. A cikkben szereplő példák egy CSV-fájlt elemeznek a fast-csv modul használatával. Ezért ezt az utasítást hozzáadjuk a kódfájl elejéhez.
const csv = require('@fast-csv/parse');
Adatok lekérése szűrővel
Az SQL használatával megadhatja a sorszűrő predikátumait és oszlopvetületeit egy lekérdezésgyorsítási kérelemben. Az alábbi kód lekérdez egy CSV-fájlt a tárolóban, és visszaadja az összes adatsort, ahol a harmadik oszlop megegyezik az értékkel Hemingway, Ernest.
Az SQL-lekérdezésben a kulcsszó BlobStorage a lekérdezett fájl jelölésére szolgál.
Az oszlophivatkozások az első oszlop _1helyeként _N vannak megadva. Ha a forrásfájl fejlécsort tartalmaz, akkor a fejlécsorban megadott név alapján hivatkozhat az oszlopokra.
Az aszinkron metódus BlockBlobClient.QueryAsync elküldi a lekérdezést a lekérdezésgyorsító API-nak, majd stream objektumként streameli az eredményeket az alkalmazásnak.
static async Task QueryHemingway(BlockBlobClient blob)
{
string query = @"SELECT * FROM BlobStorage WHERE _3 = 'Hemingway, Ernest, 1899-1961'";
await DumpQueryCsv(blob, query, false);
}
private static async Task DumpQueryCsv(BlockBlobClient blob, string query, bool headers)
{
try
{
var options = new BlobQueryOptions()
{
InputTextConfiguration = new BlobQueryCsvTextOptions()
{
HasHeaders = true,
RecordSeparator = "\n",
ColumnSeparator = ",",
EscapeCharacter = '\\',
QuotationCharacter = '"'
},
OutputTextConfiguration = new BlobQueryCsvTextOptions()
{
HasHeaders = true,
RecordSeparator = "\n",
ColumnSeparator = ",",
EscapeCharacter = '\\',
QuotationCharacter = '"' },
ProgressHandler = new Progress<long>((finishedBytes) =>
Console.Error.WriteLine($"Data read: {finishedBytes}"))
};
options.ErrorHandler += (BlobQueryError err) => {
Console.ForegroundColor = ConsoleColor.Red;
Console.Error.WriteLine($"Error: {err.Position}:{err.Name}:{err.Description}");
Console.ResetColor();
};
// BlobDownloadInfo exposes a Stream that will make results available when received rather than blocking for the entire response.
using (var reader = new StreamReader((await blob.QueryAsync(
query,
options)).Value.Content))
{
using (var parser = new CsvReader
(reader, new CsvConfiguration(CultureInfo.CurrentCulture) { HasHeaderRecord = true }))
{
while (await parser.ReadAsync())
{
Console.Out.WriteLine(String.Join(" ", parser.Parser.Record));
}
}
}
}
catch (Exception ex)
{
System.Windows.Forms.MessageBox.Show("Exception: " + ex.ToString());
}
}
A metódus BlockBlobClient.openInputStream() elküldi a lekérdezést a lekérdezésgyorsító API-nak, majd az eredményeket egy objektumként InputStream továbbítja az alkalmazásnak, amely bármely más InputStream-objektumhoz hasonlóan olvasható.
static void QueryHemingway(BlobClient blobClient) {
String expression = "SELECT * FROM BlobStorage WHERE _3 = 'Hemingway, Ernest, 1899-1961'";
DumpQueryCsv(blobClient, expression, true);
}
static void DumpQueryCsv(BlobClient blobClient, String query, Boolean headers) {
try {
BlobQuerySerialization input = new BlobQueryDelimitedSerialization()
.setRecordSeparator('\n')
.setColumnSeparator(',')
.setHeadersPresent(headers)
.setFieldQuote('\0')
.setEscapeChar('\\');
BlobQuerySerialization output = new BlobQueryDelimitedSerialization()
.setRecordSeparator('\n')
.setColumnSeparator(',')
.setHeadersPresent(true)
.setFieldQuote('\0')
.setEscapeChar('\n');
Consumer<BlobQueryError> errorConsumer = System.out::println;
Consumer<BlobQueryProgress> progressConsumer = progress -> System.out.println("total bytes read: " + progress.getBytesScanned());
BlobQueryOptions queryOptions = new BlobQueryOptions(query)
.setInputSerialization(input)
.setOutputSerialization(output)
.setErrorConsumer(errorConsumer)
.setProgressConsumer(progressConsumer);
/* Open the query input stream. */
InputStream stream = blobClient.openQueryInputStream(queryOptions).getValue();
try (BufferedReader reader = new BufferedReader(new InputStreamReader(stream))) {
/* Read from stream like you normally would. */
for (CSVRecord record : CSVParser.parse(reader, CSVFormat.EXCEL.withHeader())) {
System.out.println(record.toString());
}
}
} catch (Exception e) {
System.err.println("Exception: " + e.toString());
e.printStackTrace(System.err);
}
}
def query_hemingway(blob: BlobClient):
query = "SELECT * FROM BlobStorage WHERE _3 = 'Hemingway, Ernest, 1899-1961'"
dump_query_csv(blob, query, False)
def dump_query_csv(blob: BlobClient, query: str, headers: bool):
qa_reader = blob.query_blob(query, blob_format=DelimitedTextDialect(has_header=headers), on_error=report_error, encoding='utf-8')
# records() returns a generator that will stream results as received. It will not block pending all results.
csv_reader = csv.reader(qa_reader.records())
for row in csv_reader:
print("*".join(row))
Az eredményeket az oszlopok egy részhalmazára is hatókörbe helyezheti. Így csak azokat az oszlopokat kéri le, amelyek egy adott számítás elvégzéséhez szükségesek. Ez javítja az alkalmazás teljesítményét, és csökkenti a költségeket, mivel kevesebb adat kerül át a hálózaton keresztül.
Feljegyzés
Az eredmények hatókörének maximális száma 49. Ha az eredményeknek 49-nél több oszlopot kell tartalmazniuk, használjon helyettesítő karaktert (*) a Standard kiadás LECT kifejezéshez (például: SELECT *).
Ez a kód csak az BibNum adathalmaz összes könyvének oszlopát kéri le. A forrásfájl fejlécsorának adatait is felhasználja a lekérdezés oszlopainak hivatkozására.
Hamarosan elérhető: 2024-ben fokozatosan kivezetjük a GitHub-problémákat a tartalom visszajelzési mechanizmusaként, és lecseréljük egy új visszajelzési rendszerre. További információ: https://aka.ms/ContentUserFeedback.
Visszajelzés küldése és megtekintése a következőhöz: