Artikel ini memperlihatkan kepada Anda cara menggunakan akselerasi kueri untuk mengambil subset data dari akun penyimpanan Anda.
Akselerasi kueri memungkinkan aplikasi dan kerangka kerja analitik untuk mengoptimalkan pemrosesan data secara dramatis dengan hanya mengambil data yang mereka butuhkan untuk melakukan operasi tertentu. Untuk mempelajari selengkapnya, lihat Akselerasi Kueri Azure Data Lake Storage.
Instal modul Az versi 4.6.0 atau yang lebih tinggi.
Install-Module -Name Az -Repository PSGallery -Force
Untuk memperbarui dari versi Az yang lebih lama, jalankan perintah berikut:
Update-Module -Name Az
Buka perintah dan ubah direktori (cd) ke folder proyek Anda Misalnya:
cd myProject
Instal versi atau versi yang lebih baru dari pustaka klien penyimpanan Azure Blob untuk paket .NET menggunakan perintah 12.5.0-preview.6.
dotnet add package Azure.Storage.Blobs -v 12.8.0
Contoh yang muncul dalam artikel ini mengurai file CSV dengan menggunakan pustaka CsvHelper . Untuk menggunakan pustaka tersebut, gunakan perintah berikut.
dotnet add package CsvHelper
Buka file pom.xml proyek Anda di editor teks. Tambahkan elemen dependensi berikut ke grup dependensi.
<!-- Request static dependencies from Maven -->
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-csv</artifactId>
<version>1.8</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-blob</artifactId>
<version>12.8.0-beta.1</version>
</dependency>
Instal pustaka klien Azure Data Lake Storage untuk Python dengan menggunakan pip.
pip install azure-storage-blob==12.4.0
Instal pustaka klien Data Lake untuk JavaScript dengan membuka jendela terminal, lalu ketik perintah berikut.
npm install @azure/storage-blob
npm install @fast-csv/parse
Tambahkan pernyataan ini using ke bagian atas file kode Anda.
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;
using Azure.Storage.Blobs.Specialized;
Akselerasi kueri mengambil data berformat CSV dan JSON. Oleh karena itu, pastikan untuk menambahkan pernyataan 'using' untuk pustaka penguraian CSV atau JSON yang Anda pilih. Contoh yang muncul dalam artikel ini mengurai file CSV dengan menggunakan pustaka CsvHelper yang tersedia di NuGet. Oleh karena itu, kami akan menambahkan pernyataan ini using ke bagian atas file kode.
using CsvHelper;
using CsvHelper.Configuration;
Untuk mengkompilasi contoh yang disajikan dalam artikel ini, Anda juga perlu menambahkan pernyataan ini using .
using System.Threading.Tasks;
using System.IO;
using System.Globalization;
Tambahkan pernyataan ini import ke bagian atas file kode Anda.
import com.azure.storage.blob.*;
import com.azure.storage.blob.options.*;
import com.azure.storage.blob.models.*;
import com.azure.storage.common.*;
import java.io.*;
import java.util.function.Consumer;
import org.apache.commons.csv.*;
Tambahkan pernyataan impor ini ke bagian atas file kode Anda.
import sys, csv
from azure.storage.blob import BlobServiceClient, ContainerClient, BlobClient, DelimitedTextDialect, BlobQueryError
Sertakan storage-blob modul dengan menempatkan pernyataan ini di bagian atas file kode Anda.
const { BlobServiceClient } = require("@azure/storage-blob");
Akselerasi kueri mengambil data berformat CSV dan JSON. Oleh karena itu, pastikan untuk menambahkan pernyataan untuk modul penguraian CSV atau JSON apa pun yang Anda pilih untuk digunakan. Contoh yang muncul dalam artikel ini mengurai file CSV dengan menggunakan modul csv cepat . Oleh karena itu, kami akan menambahkan pernyataan ini ke bagian atas file kode.
const csv = require('@fast-csv/parse');
Anda dapat menggunakan SQL untuk menentukan predikat filter baris dan proyeksi kolom dalam permintaan akselerasi kueri. Kode berikut meminta file CSV di penyimpanan dan mengembalikan semua baris data di mana kolom ketiga cocok dengan nilai Hemingway, Ernest.
Function Get-QueryCsv($ctx, $container, $blob, $query, $hasheaders) {
$tempfile = New-TemporaryFile
$informat = New-AzStorageBlobQueryConfig -AsCsv -HasHeader:$hasheaders -RecordSeparator "`n" -ColumnSeparator "," -QuotationCharacter """" -EscapeCharacter "\"
Get-AzStorageBlobQueryResult -Context $ctx -Container $container -Blob $blob -InputTextConfiguration $informat -OutputTextConfiguration (New-AzStorageBlobQueryConfig -AsCsv -HasHeader -RecordSeparator "`n" -ColumnSeparator "," -QuotationCharacter """" -EscapeCharacter "\") -ResultFile $tempfile.FullName -QueryString $query -Force
Get-Content $tempfile.FullName
}
$container = "data"
$blob = "csv/csv-general/seattle-library.csv"
Get-QueryCsv $ctx $container $blob "SELECT * FROM BlobStorage WHERE _3 = 'Hemingway, Ernest, 1899-1961'" $false
Metode asinkron BlockBlobClient.QueryAsync mengirim kueri ke API akselerasi kueri, lalu mengalirkan hasilnya kembali ke aplikasi sebagai objek Stream .
static async Task QueryHemingway(BlockBlobClient blob)
{
string query = @"SELECT * FROM BlobStorage WHERE _3 = 'Hemingway, Ernest, 1899-1961'";
await DumpQueryCsv(blob, query, false);
}
private static async Task DumpQueryCsv(BlockBlobClient blob, string query, bool headers)
{
try
{
var options = new BlobQueryOptions()
{
InputTextConfiguration = new BlobQueryCsvTextOptions()
{
HasHeaders = true,
RecordSeparator = "\n",
ColumnSeparator = ",",
EscapeCharacter = '\\',
QuotationCharacter = '"'
},
OutputTextConfiguration = new BlobQueryCsvTextOptions()
{
HasHeaders = true,
RecordSeparator = "\n",
ColumnSeparator = ",",
EscapeCharacter = '\\',
QuotationCharacter = '"' },
ProgressHandler = new Progress<long>((finishedBytes) =>
Console.Error.WriteLine($"Data read: {finishedBytes}"))
};
options.ErrorHandler += (BlobQueryError err) => {
Console.ForegroundColor = ConsoleColor.Red;
Console.Error.WriteLine($"Error: {err.Position}:{err.Name}:{err.Description}");
Console.ResetColor();
};
// BlobDownloadInfo exposes a Stream that will make results available when received rather than blocking for the entire response.
using (var reader = new StreamReader((await blob.QueryAsync(
query,
options)).Value.Content))
{
using (var parser = new CsvReader
(reader, new CsvConfiguration(CultureInfo.CurrentCulture) { HasHeaderRecord = true }))
{
while (await parser.ReadAsync())
{
Console.Out.WriteLine(String.Join(" ", parser.Parser.Record));
}
}
}
}
catch (Exception ex)
{
System.Windows.Forms.MessageBox.Show("Exception: " + ex.ToString());
}
}
Metode BlockBlobClient.openInputStream() mengirim kueri ke API akselerasi kueri, lalu mengalirkan hasilnya kembali ke aplikasi sebagai InputStream objek yang dapat dibaca seperti objek InputStream lainnya.
static void QueryHemingway(BlobClient blobClient) {
String expression = "SELECT * FROM BlobStorage WHERE _3 = 'Hemingway, Ernest, 1899-1961'";
DumpQueryCsv(blobClient, expression, true);
}
static void DumpQueryCsv(BlobClient blobClient, String query, Boolean headers) {
try {
BlobQuerySerialization input = new BlobQueryDelimitedSerialization()
.setRecordSeparator('\n')
.setColumnSeparator(',')
.setHeadersPresent(headers)
.setFieldQuote('\0')
.setEscapeChar('\\');
BlobQuerySerialization output = new BlobQueryDelimitedSerialization()
.setRecordSeparator('\n')
.setColumnSeparator(',')
.setHeadersPresent(true)
.setFieldQuote('\0')
.setEscapeChar('\n');
Consumer<BlobQueryError> errorConsumer = System.out::println;
Consumer<BlobQueryProgress> progressConsumer = progress -> System.out.println("total bytes read: " + progress.getBytesScanned());
BlobQueryOptions queryOptions = new BlobQueryOptions(query)
.setInputSerialization(input)
.setOutputSerialization(output)
.setErrorConsumer(errorConsumer)
.setProgressConsumer(progressConsumer);
/* Open the query input stream. */
InputStream stream = blobClient.openQueryInputStream(queryOptions).getValue();
try (BufferedReader reader = new BufferedReader(new InputStreamReader(stream))) {
/* Read from stream like you normally would. */
for (CSVRecord record : CSVParser.parse(reader, CSVFormat.EXCEL.withHeader())) {
System.out.println(record.toString());
}
}
} catch (Exception e) {
System.err.println("Exception: " + e.toString());
e.printStackTrace(System.err);
}
}
def query_hemingway(blob: BlobClient):
query = "SELECT * FROM BlobStorage WHERE _3 = 'Hemingway, Ernest, 1899-1961'"
dump_query_csv(blob, query, False)
def dump_query_csv(blob: BlobClient, query: str, headers: bool):
qa_reader = blob.query_blob(query, blob_format=DelimitedTextDialect(has_header=headers), on_error=report_error, encoding='utf-8')
# records() returns a generator that will stream results as received. It will not block pending all results.
csv_reader = csv.reader(qa_reader.records())
for row in csv_reader:
print("*".join(row))
Contoh ini mengirimkan kueri ke API akselerasi kueri, lalu mengalirkan kembali hasilnya. Objek blob yang diteruskan ke queryHemingway fungsi pembantu berjenis BlockBlobClient. Untuk mempelajari selengkapnya tentang cara mendapatkan objek BlockBlobClient , lihat Mulai Cepat: Mengelola blob dengan JavaScript v12 SDK di Node.js.
async function queryHemingway(blob)
{
const query = "SELECT * FROM BlobStorage WHERE _3 = 'Hemingway, Ernest, 1899-1961'";
await dumpQueryCsv(blob, query, false);
}
async function dumpQueryCsv(blob, query, headers)
{
var response = await blob.query(query, {
inputTextConfiguration: {
kind: "csv",
recordSeparator: '\n',
hasHeaders: headers
},
outputTextConfiguration: {
kind: "csv",
recordSeparator: '\n',
hasHeaders: true
},
onProgress: (progress) => console.log(`Data read: ${progress.loadedBytes}`),
onError: (err) => console.error(`Error: ${err.position}:${err.name}:${err.description}`)});
return new Promise(
function (resolve, reject) {
csv.parseStream(response.readableStreamBody)
.on('data', row => console.log(row))
.on('error', error => {
console.error(error);
reject(error);
})
.on('end', rowCount => resolve());
});
}
Anda dapat membatasi hasil Anda pada subset kolom. Dengan begitu Anda hanya mengambil kolom yang diperlukan untuk melakukan penghitungan tertentu. Ini meningkatkan performa aplikasi dan mengurangi biaya karena lebih sedikit data yang ditransfer melalui jaringan.
Function Get-QueryCsv($ctx, $container, $blob, $query, $hasheaders) {
$tempfile = New-TemporaryFile
$informat = New-AzStorageBlobQueryConfig -AsCsv -HasHeader:$hasheaders
Get-AzStorageBlobQueryResult -Context $ctx -Container $container -Blob $blob -InputTextConfiguration $informat -OutputTextConfiguration (New-AzStorageBlobQueryConfig -AsCsv -HasHeader) -ResultFile $tempfile.FullName -QueryString $query -Force
Get-Content $tempfile.FullName
}
$container = "data"
$blob = "csv/csv-general/seattle-library-with-headers.csv"
Get-QueryCsv $ctx $container $blob "SELECT BibNum FROM BlobStorage" $true
static async Task QueryBibNum(BlockBlobClient blob)
{
string query = @"SELECT BibNum FROM BlobStorage";
await DumpQueryCsv(blob, query, true);
}
static void QueryBibNum(BlobClient blobClient)
{
String expression = "SELECT BibNum FROM BlobStorage";
DumpQueryCsv(blobClient, expression, true);
}
def query_bibnum(blob: BlobClient):
query = "SELECT BibNum FROM BlobStorage"
dump_query_csv(blob, query, True)
async function queryBibNum(blob)
{
const query = "SELECT BibNum FROM BlobStorage";
await dumpQueryCsv(blob, query, true);
}
Kode berikut menggabungkan pemfilteran baris dan proyeksi kolom ke dalam kueri yang sama.
Get-QueryCsv $ctx $container $blob $query $true
Function Get-QueryCsv($ctx, $container, $blob, $query, $hasheaders) {
$tempfile = New-TemporaryFile
$informat = New-AzStorageBlobQueryConfig -AsCsv -HasHeader:$hasheaders
Get-AzStorageBlobQueryResult -Context $ctx -Container $container -Blob $blob -InputTextConfiguration $informat -OutputTextConfiguration (New-AzStorageBlobQueryConfig -AsCsv -HasHeader) -ResultFile $tempfile.FullName -QueryString $query -Force
Get-Content $tempfile.FullName
}
$container = "data"
$query = "SELECT BibNum, Title, Author, ISBN, Publisher, ItemType
FROM BlobStorage
WHERE ItemType IN
('acdvd', 'cadvd', 'cadvdnf', 'calndvd', 'ccdvd', 'ccdvdnf', 'jcdvd', 'nadvd', 'nadvdnf', 'nalndvd', 'ncdvd', 'ncdvdnf')"
static async Task QueryDvds(BlockBlobClient blob)
{
string query = @"SELECT BibNum, Title, Author, ISBN, Publisher, ItemType
FROM BlobStorage
WHERE ItemType IN
('acdvd', 'cadvd', 'cadvdnf', 'calndvd', 'ccdvd', 'ccdvdnf', 'jcdvd', 'nadvd', 'nadvdnf', 'nalndvd', 'ncdvd', 'ncdvdnf')";
await DumpQueryCsv(blob, query, true);
}
static void QueryDvds(BlobClient blobClient)
{
String expression = "SELECT BibNum, Title, Author, ISBN, Publisher, ItemType " +
"FROM BlobStorage " +
"WHERE ItemType IN " +
" ('acdvd', 'cadvd', 'cadvdnf', 'calndvd', 'ccdvd', 'ccdvdnf', 'jcdvd', 'nadvd', 'nadvdnf', 'nalndvd', 'ncdvd', 'ncdvdnf')";
DumpQueryCsv(blobClient, expression, true);
}
def query_dvds(blob: BlobClient):
query = "SELECT BibNum, Title, Author, ISBN, Publisher, ItemType "\
"FROM BlobStorage "\
"WHERE ItemType IN "\
" ('acdvd', 'cadvd', 'cadvdnf', 'calndvd', 'ccdvd', 'ccdvdnf', 'jcdvd', 'nadvd', 'nadvdnf', 'nalndvd', 'ncdvd', 'ncdvdnf')"
dump_query_csv(blob, query, True)
async function queryDvds(blob)
{
const query = "SELECT BibNum, Title, Author, ISBN, Publisher, ItemType " +
"FROM BlobStorage " +
"WHERE ItemType IN " +
" ('acdvd', 'cadvd', 'cadvdnf', 'calndvd', 'ccdvd', 'ccdvdnf', 'jcdvd', 'nadvd', 'nadvdnf', 'nalndvd', 'ncdvd', 'ncdvdnf')";
await dumpQueryCsv(blob, query, true);
}