Artikel ini memperlihatkan kepada Anda cara menggunakan akselerasi kueri untuk mengambil subset data dari akun penyimpanan Anda.
Akselerasi kueri memungkinkan aplikasi dan kerangka analitik mengoptimalkan pemrosesan data secara dramatis dengan hanya mengambil data yang mereka perlukan untuk melakukan operasi tertentu. Untuk mempelajari selengkapnya, lihat Akselerasi Kueri Azure Data Lake Storage.
Instal modul Az versi 4.6.0 atau yang lebih tinggi.
Install-Module -Name Az -Repository PSGallery -Force
Untuk memperbarui dari az versi lama, jalankan perintah berikut:
Update-Module -Name Az
Buka prompt perintah dan ubah direktori (cd
) ke folder proyek Anda Misalnya:
cd myProject
Instal versi 12.5.0-preview.6
atau yang lebih baru dari pustaka klien penyimpanan Azure Blob untuk paket .NET dengan menggunakan perintah dotnet add package
.
dotnet add package Azure.Storage.Blobs -v 12.8.0
Contoh yang muncul di artikel ini memilah file CSV dengan menggunakan pustaka CsvHelper. Untuk menggunakan pustaka tersebut, gunakan perintah berikut ini.
dotnet add package CsvHelper
Buka file pom.xml proyek Anda di editor teks. Tambahkan elemen dependensi berikut ke grup dependensi.
<!-- Request static dependencies from Maven -->
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-csv</artifactId>
<version>1.8</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-blob</artifactId>
<version>12.8.0-beta.1</version>
</dependency>
Instal pustaka klien Azure Data Lake Storage untuk Python dengan menggunakan pip.
pip install azure-storage-blob==12.4.0
Instal pustaka klien Data Lake untuk JavaScript dengan membuka jendela terminal, lalu ketik perintah berikut.
npm install @azure/storage-blob
npm install @fast-csv/parse
Tambahkan pernyataan using
ini ke bagian atas file kode Anda.
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;
using Azure.Storage.Blobs.Specialized;
Akselerasi kueri mengambil data berformat CSV dan Json. Oleh karena itu, pastikan untuk menambah menggunakan pernyataan untuk setiap pustaka pemilah CSV atau Json yang Anda pilih. Contoh yang muncul di artikel ini memilah file CSV dengan menggunakan pustaka CsvHelper yang tersedia di NuGet. Oleh karena itu, kita tambahkan pernyataan using
ini ke bagian atas file kode.
using CsvHelper;
using CsvHelper.Configuration;
Untuk mengompilasi contoh yang disajikan dalam artikel ini, Anda juga harus menambahkan pernyataan using
ini juga.
using System.Threading.Tasks;
using System.IO;
using System.Globalization;
Tambahkan pernyataan import
ini ke bagian atas file kode Anda.
import com.azure.storage.blob.*;
import com.azure.storage.blob.options.*;
import com.azure.storage.blob.models.*;
import com.azure.storage.common.*;
import java.io.*;
import java.util.function.Consumer;
import org.apache.commons.csv.*;
Tambahkan pernyataan impor ini ke bagian atas file kode Anda.
import sys, csv
from azure.storage.blob import BlobServiceClient, ContainerClient, BlobClient, DelimitedTextDialect, BlobQueryError
Sertakan modul storage-blob
dengan menempatkan pernyataan ini di bagian atas file kode Anda.
const { BlobServiceClient } = require("@azure/storage-blob");
Akselerasi kueri mengambil data berformat CSV dan Json. Oleh karena itu, pastikan untuk menambah menggunakan pernyataan untuk setiap pustaka pemilah CSV atau Json yang Anda pilih. Contoh yang muncul di artikel ini memilah file CSV dengan menggunakan modul fast-csv. Oleh karena itu, kita tambahkan pernyataan ini ke bagian atas file kode.
const csv = require('@fast-csv/parse');
Anda dapat menggunakan SQL untuk menentukan predikat filter baris dan proyeksi kolom dalam permintaan akselerasi kueri. Kode berikut ini meminta file CSV dalam penyimpanan dan mengembalikan semua baris data yang kolom ketiganya cocok dengan nilai Hemingway, Ernest
.
Function Get-QueryCsv($ctx, $container, $blob, $query, $hasheaders) {
$tempfile = New-TemporaryFile
$informat = New-AzStorageBlobQueryConfig -AsCsv -HasHeader:$hasheaders
Get-AzStorageBlobQueryResult -Context $ctx -Container $container -Blob $blob -InputTextConfiguration $informat -OutputTextConfiguration (New-AzStorageBlobQueryConfig -AsCsv -HasHeader) -ResultFile $tempfile.FullName -QueryString $query -Force
Get-Content $tempfile.FullName
}
$container = "data"
$blob = "csv/csv-general/seattle-library.csv"
Get-QueryCsv $ctx $container $blob "SELECT * FROM BlobStorage WHERE _3 = 'Hemingway, Ernest, 1899-1961'" $false
Metode asinkron BlockBlobClient.QueryAsync
mengirimkan kueri ke API akselerasi kueri, lalu mengalirkan hasilnya kembali ke aplikasi sebagai objek Stream.
static async Task QueryHemingway(BlockBlobClient blob)
{
string query = @"SELECT * FROM BlobStorage WHERE _3 = 'Hemingway, Ernest, 1899-1961'";
await DumpQueryCsv(blob, query, false);
}
private static async Task DumpQueryCsv(BlockBlobClient blob, string query, bool headers)
{
try
{
var options = new BlobQueryOptions()
{
InputTextConfiguration = new BlobQueryCsvTextOptions()
{
HasHeaders = true,
RecordSeparator = "\n",
ColumnSeparator = ",",
EscapeCharacter = '\\',
QuotationCharacter = '"'
},
OutputTextConfiguration = new BlobQueryCsvTextOptions()
{
HasHeaders = true,
RecordSeparator = "\n",
ColumnSeparator = ",",
EscapeCharacter = '\\',
QuotationCharacter = '"' },
ProgressHandler = new Progress<long>((finishedBytes) =>
Console.Error.WriteLine($"Data read: {finishedBytes}"))
};
options.ErrorHandler += (BlobQueryError err) => {
Console.ForegroundColor = ConsoleColor.Red;
Console.Error.WriteLine($"Error: {err.Position}:{err.Name}:{err.Description}");
Console.ResetColor();
};
// BlobDownloadInfo exposes a Stream that will make results available when received rather than blocking for the entire response.
using (var reader = new StreamReader((await blob.QueryAsync(
query,
options)).Value.Content))
{
using (var parser = new CsvReader
(reader, new CsvConfiguration(CultureInfo.CurrentCulture) { HasHeaderRecord = true }))
{
while (await parser.ReadAsync())
{
Console.Out.WriteLine(String.Join(" ", parser.Parser.Record));
}
}
}
}
catch (Exception ex)
{
System.Windows.Forms.MessageBox.Show("Exception: " + ex.ToString());
}
}
Metode BlockBlobClient.openInputStream()
mengirim kueri ke API akselerasi kueri, lalu mengalirkan hasilnya kembali ke aplikasi sebagai objek InputStream
yang dapat dibaca seperti objek InputStream lainnya.
static void QueryHemingway(BlobClient blobClient) {
String expression = "SELECT * FROM BlobStorage WHERE _3 = 'Hemingway, Ernest, 1899-1961'";
DumpQueryCsv(blobClient, expression, true);
}
static void DumpQueryCsv(BlobClient blobClient, String query, Boolean headers) {
try {
BlobQuerySerialization input = new BlobQueryDelimitedSerialization()
.setRecordSeparator('\n')
.setColumnSeparator(',')
.setHeadersPresent(headers)
.setFieldQuote('\0')
.setEscapeChar('\\');
BlobQuerySerialization output = new BlobQueryDelimitedSerialization()
.setRecordSeparator('\n')
.setColumnSeparator(',')
.setHeadersPresent(true)
.setFieldQuote('\0')
.setEscapeChar('\n');
Consumer<BlobQueryError> errorConsumer = System.out::println;
Consumer<BlobQueryProgress> progressConsumer = progress -> System.out.println("total bytes read: " + progress.getBytesScanned());
BlobQueryOptions queryOptions = new BlobQueryOptions(query)
.setInputSerialization(input)
.setOutputSerialization(output)
.setErrorConsumer(errorConsumer)
.setProgressConsumer(progressConsumer);
/* Open the query input stream. */
InputStream stream = blobClient.openQueryInputStream(queryOptions).getValue();
try (BufferedReader reader = new BufferedReader(new InputStreamReader(stream))) {
/* Read from stream like you normally would. */
for (CSVRecord record : CSVParser.parse(reader, CSVFormat.EXCEL.withHeader())) {
System.out.println(record.toString());
}
}
} catch (Exception e) {
System.err.println("Exception: " + e.toString());
e.printStackTrace(System.err);
}
}
def query_hemingway(blob: BlobClient):
query = "SELECT * FROM BlobStorage WHERE _3 = 'Hemingway, Ernest, 1899-1961'"
dump_query_csv(blob, query, False)
def dump_query_csv(blob: BlobClient, query: str, headers: bool):
qa_reader = blob.query_blob(query, blob_format=DelimitedTextDialect(has_header=headers), on_error=report_error, encoding='utf-8')
# records() returns a generator that will stream results as received. It will not block pending all results.
csv_reader = csv.reader(qa_reader.records())
for row in csv_reader:
print("*".join(row))
Contoh ini mengirim kueri ke API akselerasi kueri, lalu mengalirkan kembali hasilnya. Objek blob
yang diteruskan ke fungsi pembantu queryHemingway
adalah tipe BlockBlobClient. Untuk mempelajari selengkapnya tentang cara mendapatkan objek BlockBlobClient, lihat Mulai Cepat: Kelola blob dengan JavaScript v12 SDK di Node.js.
async function queryHemingway(blob)
{
const query = "SELECT * FROM BlobStorage WHERE _3 = 'Hemingway, Ernest, 1899-1961'";
await dumpQueryCsv(blob, query, false);
}
async function dumpQueryCsv(blob, query, headers)
{
var response = await blob.query(query, {
inputTextConfiguration: {
kind: "csv",
recordSeparator: '\n',
hasHeaders: headers
},
outputTextConfiguration: {
kind: "csv",
recordSeparator: '\n',
hasHeaders: true
},
onProgress: (progress) => console.log(`Data read: ${progress.loadedBytes}`),
onError: (err) => console.error(`Error: ${err.position}:${err.name}:${err.description}`)});
return new Promise(
function (resolve, reject) {
csv.parseStream(response.readableStreamBody)
.on('data', row => console.log(row))
.on('error', error => {
console.error(error);
reject(error);
})
.on('end', rowCount => resolve());
});
}
Anda bisa mencakup hasil Anda ke subset kolom. Dengan begitu Anda hanya mengambil kolom yang diperlukan untuk melakukan kalkulasi tertentu. Ini meningkatkan kinerja aplikasi dan mengurangi biaya karena lebih sedikit data yang ditransfer melalui jaringan.
Function Get-QueryCsv($ctx, $container, $blob, $query, $hasheaders) {
$tempfile = New-TemporaryFile
$informat = New-AzStorageBlobQueryConfig -AsCsv -HasHeader:$hasheaders
Get-AzStorageBlobQueryResult -Context $ctx -Container $container -Blob $blob -InputTextConfiguration $informat -OutputTextConfiguration (New-AzStorageBlobQueryConfig -AsCsv -HasHeader) -ResultFile $tempfile.FullName -QueryString $query -Force
Get-Content $tempfile.FullName
}
$container = "data"
$blob = "csv/csv-general/seattle-library-with-headers.csv"
Get-QueryCsv $ctx $container $blob "SELECT BibNum FROM BlobStorage" $true
static async Task QueryBibNum(BlockBlobClient blob)
{
string query = @"SELECT BibNum FROM BlobStorage";
await DumpQueryCsv(blob, query, true);
}
static void QueryBibNum(BlobClient blobClient)
{
String expression = "SELECT BibNum FROM BlobStorage";
DumpQueryCsv(blobClient, expression, true);
}
def query_bibnum(blob: BlobClient):
query = "SELECT BibNum FROM BlobStorage"
dump_query_csv(blob, query, True)
async function queryBibNum(blob)
{
const query = "SELECT BibNum FROM BlobStorage";
await dumpQueryCsv(blob, query, true);
}
Kode berikut menggabungkan pemfilteran baris dan proyeksi kolom ke dalam kueri yang sama.
Get-QueryCsv $ctx $container $blob $query $true
Function Get-QueryCsv($ctx, $container, $blob, $query, $hasheaders) {
$tempfile = New-TemporaryFile
$informat = New-AzStorageBlobQueryConfig -AsCsv -HasHeader:$hasheaders
Get-AzStorageBlobQueryResult -Context $ctx -Container $container -Blob $blob -InputTextConfiguration $informat -OutputTextConfiguration (New-AzStorageBlobQueryConfig -AsCsv -HasHeader) -ResultFile $tempfile.FullName -QueryString $query -Force
Get-Content $tempfile.FullName
}
$container = "data"
$query = "SELECT BibNum, Title, Author, ISBN, Publisher, ItemType
FROM BlobStorage
WHERE ItemType IN
('acdvd', 'cadvd', 'cadvdnf', 'calndvd', 'ccdvd', 'ccdvdnf', 'jcdvd', 'nadvd', 'nadvdnf', 'nalndvd', 'ncdvd', 'ncdvdnf')"
static async Task QueryDvds(BlockBlobClient blob)
{
string query = @"SELECT BibNum, Title, Author, ISBN, Publisher, ItemType
FROM BlobStorage
WHERE ItemType IN
('acdvd', 'cadvd', 'cadvdnf', 'calndvd', 'ccdvd', 'ccdvdnf', 'jcdvd', 'nadvd', 'nadvdnf', 'nalndvd', 'ncdvd', 'ncdvdnf')";
await DumpQueryCsv(blob, query, true);
}
static void QueryDvds(BlobClient blobClient)
{
String expression = "SELECT BibNum, Title, Author, ISBN, Publisher, ItemType " +
"FROM BlobStorage " +
"WHERE ItemType IN " +
" ('acdvd', 'cadvd', 'cadvdnf', 'calndvd', 'ccdvd', 'ccdvdnf', 'jcdvd', 'nadvd', 'nadvdnf', 'nalndvd', 'ncdvd', 'ncdvdnf')";
DumpQueryCsv(blobClient, expression, true);
}
def query_dvds(blob: BlobClient):
query = "SELECT BibNum, Title, Author, ISBN, Publisher, ItemType "\
"FROM BlobStorage "\
"WHERE ItemType IN "\
" ('acdvd', 'cadvd', 'cadvdnf', 'calndvd', 'ccdvd', 'ccdvdnf', 'jcdvd', 'nadvd', 'nadvdnf', 'nalndvd', 'ncdvd', 'ncdvdnf')"
dump_query_csv(blob, query, True)
async function queryDvds(blob)
{
const query = "SELECT BibNum, Title, Author, ISBN, Publisher, ItemType " +
"FROM BlobStorage " +
"WHERE ItemType IN " +
" ('acdvd', 'cadvd', 'cadvdnf', 'calndvd', 'ccdvd', 'ccdvdnf', 'jcdvd', 'nadvd', 'nadvdnf', 'nalndvd', 'ncdvd', 'ncdvdnf')";
await dumpQueryCsv(blob, query, true);
}