이 문서에서는 쿼리 가속을 사용하여 스토리지 계정에서 데이터의 하위 집합을 검색하는 방법을 보여 줍니다.
Az 모듈 버전 4.6.0 이상을 설치합니다.
Install-Module -Name Az -Repository PSGallery -Force
이전 버전의 Az에서 업데이트하려면 다음 명령을 실행합니다.
Update-Module -Name Az
명령 프롬프트를 열고 디렉터리(cd)를 프로젝트 폴더로 변경합니다. 예를 들면 다음과 같습니다.
cd myProject
12.5.0-preview.6 명령을 사용하여 .NET 패키지용 Azure Blob Storage 클라이언트 라이브러리의 버전 이상을 설치합니다dotnet add package.
dotnet add package Azure.Storage.Blobs -v 12.8.0
이 문서에 표시되는 예제는 CsvHelper 라이브러리를 사용하여 CSV 파일을 구문 분석합니다. 해당 라이브러리를 사용하려면 다음 명령을 사용합니다.
dotnet add package CsvHelper
텍스트 편집기에서 프로젝트의 pom.xml 파일을 엽니다. 종속성 그룹에 다음 종속성 요소를 추가합니다.
<!-- Request static dependencies from Maven -->
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-csv</artifactId>
<version>1.8</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-blob</artifactId>
<version>12.8.0-beta.1</version>
</dependency>
pip를 사용하여 Python용 Azure Data Lake Storage 클라이언트 라이브러리를 설치합니다.
pip install azure-storage-blob==12.4.0
터미널 창을 열고 다음 명령을 입력하여 JavaScript용 Data Lake 클라이언트 라이브러리를 설치합니다.
npm install @azure/storage-blob
npm install @fast-csv/parse
이러한 using 문을 코드 파일의 맨 위에 추가합니다.
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;
using Azure.Storage.Blobs.Specialized;
쿼리 가속은 CSV 및 JSON 형식의 데이터를 검색합니다. 따라서 사용하도록 선택한 CSV 또는 JSON 구문 분석 라이브러리에 using 문을 추가해야 합니다. 이 문서에 표시되는 예제는 NuGet에서 사용할 수 있는 CsvHelper 라이브러리를 사용하여 CSV 파일을 구문 분석합니다. 따라서 이러한 using 문을 코드 파일의 맨 위에 추가합니다.
using CsvHelper;
using CsvHelper.Configuration;
이 문서에 제시된 예제를 컴파일하려면 이러한 using 문도 추가해야 합니다.
using System.Threading.Tasks;
using System.IO;
using System.Globalization;
이러한 import 문을 코드 파일의 맨 위에 추가합니다.
import com.azure.storage.blob.*;
import com.azure.storage.blob.options.*;
import com.azure.storage.blob.models.*;
import com.azure.storage.common.*;
import java.io.*;
import java.util.function.Consumer;
import org.apache.commons.csv.*;
이러한 import 문을 코드 파일의 맨 위에 추가합니다.
import sys, csv
from azure.storage.blob import BlobServiceClient, ContainerClient, BlobClient, DelimitedTextDialect, BlobQueryError
storage-blob 코드 파일의 맨 위에 이 문을 배치하여 모듈을 포함합니다.
const { BlobServiceClient } = require("@azure/storage-blob");
쿼리 가속은 CSV 및 JSON 형식의 데이터를 검색합니다. 따라서 사용하도록 선택한 CSV 또는 JSON 구문 분석 모듈에 대한 문을 추가해야 합니다. 이 문서에 표시되는 예제는 fast-csv 모듈을 사용하여 CSV 파일을 구문 분석합니다. 따라서 코드 파일의 맨 위에 이 문을 추가합니다.
const csv = require('@fast-csv/parse');
SQL을 사용하여 쿼리 가속 요청에서 행 필터 조건자 및 열 프로젝션을 지정할 수 있습니다. 다음 코드는 스토리지의 CSV 파일을 쿼리하고 세 번째 열이 값 Hemingway, Ernest과 일치하는 모든 데이터 행을 반환합니다.
Function Get-QueryCsv($ctx, $container, $blob, $query, $hasheaders) {
$tempfile = New-TemporaryFile
$informat = New-AzStorageBlobQueryConfig -AsCsv -HasHeader:$hasheaders -RecordSeparator "`n" -ColumnSeparator "," -QuotationCharacter """" -EscapeCharacter "\"
Get-AzStorageBlobQueryResult -Context $ctx -Container $container -Blob $blob -InputTextConfiguration $informat -OutputTextConfiguration (New-AzStorageBlobQueryConfig -AsCsv -HasHeader -RecordSeparator "`n" -ColumnSeparator "," -QuotationCharacter """" -EscapeCharacter "\") -ResultFile $tempfile.FullName -QueryString $query -Force
Get-Content $tempfile.FullName
}
$container = "data"
$blob = "csv/csv-general/seattle-library.csv"
Get-QueryCsv $ctx $container $blob "SELECT * FROM BlobStorage WHERE _3 = 'Hemingway, Ernest, 1899-1961'" $false
비동기 메서드 BlockBlobClient.QueryAsync 는 쿼리를 쿼리 가속 API로 보낸 다음 결과를 Stream 개체로 애플리케이션으로 다시 스트리밍합니다.
static async Task QueryHemingway(BlockBlobClient blob)
{
string query = @"SELECT * FROM BlobStorage WHERE _3 = 'Hemingway, Ernest, 1899-1961'";
await DumpQueryCsv(blob, query, false);
}
private static async Task DumpQueryCsv(BlockBlobClient blob, string query, bool headers)
{
try
{
var options = new BlobQueryOptions()
{
InputTextConfiguration = new BlobQueryCsvTextOptions()
{
HasHeaders = true,
RecordSeparator = "\n",
ColumnSeparator = ",",
EscapeCharacter = '\\',
QuotationCharacter = '"'
},
OutputTextConfiguration = new BlobQueryCsvTextOptions()
{
HasHeaders = true,
RecordSeparator = "\n",
ColumnSeparator = ",",
EscapeCharacter = '\\',
QuotationCharacter = '"' },
ProgressHandler = new Progress<long>((finishedBytes) =>
Console.Error.WriteLine($"Data read: {finishedBytes}"))
};
options.ErrorHandler += (BlobQueryError err) => {
Console.ForegroundColor = ConsoleColor.Red;
Console.Error.WriteLine($"Error: {err.Position}:{err.Name}:{err.Description}");
Console.ResetColor();
};
// BlobDownloadInfo exposes a Stream that will make results available when received rather than blocking for the entire response.
using (var reader = new StreamReader((await blob.QueryAsync(
query,
options)).Value.Content))
{
using (var parser = new CsvReader
(reader, new CsvConfiguration(CultureInfo.CurrentCulture) { HasHeaderRecord = true }))
{
while (await parser.ReadAsync())
{
Console.Out.WriteLine(String.Join(" ", parser.Parser.Record));
}
}
}
}
catch (Exception ex)
{
System.Windows.Forms.MessageBox.Show("Exception: " + ex.ToString());
}
}
이 메서드 BlockBlobClient.openInputStream() 는 쿼리를 쿼리 가속 API로 보낸 다음 결과를 다른 InputStream 개체처럼 읽을 수 있는 개체로 애플리케이션으로 InputStream 다시 스트리밍합니다.
static void QueryHemingway(BlobClient blobClient) {
String expression = "SELECT * FROM BlobStorage WHERE _3 = 'Hemingway, Ernest, 1899-1961'";
DumpQueryCsv(blobClient, expression, true);
}
static void DumpQueryCsv(BlobClient blobClient, String query, Boolean headers) {
try {
BlobQuerySerialization input = new BlobQueryDelimitedSerialization()
.setRecordSeparator('\n')
.setColumnSeparator(',')
.setHeadersPresent(headers)
.setFieldQuote('\0')
.setEscapeChar('\\');
BlobQuerySerialization output = new BlobQueryDelimitedSerialization()
.setRecordSeparator('\n')
.setColumnSeparator(',')
.setHeadersPresent(true)
.setFieldQuote('\0')
.setEscapeChar('\n');
Consumer<BlobQueryError> errorConsumer = System.out::println;
Consumer<BlobQueryProgress> progressConsumer = progress -> System.out.println("total bytes read: " + progress.getBytesScanned());
BlobQueryOptions queryOptions = new BlobQueryOptions(query)
.setInputSerialization(input)
.setOutputSerialization(output)
.setErrorConsumer(errorConsumer)
.setProgressConsumer(progressConsumer);
/* Open the query input stream. */
InputStream stream = blobClient.openQueryInputStream(queryOptions).getValue();
try (BufferedReader reader = new BufferedReader(new InputStreamReader(stream))) {
/* Read from stream like you normally would. */
for (CSVRecord record : CSVParser.parse(reader, CSVFormat.EXCEL.withHeader())) {
System.out.println(record.toString());
}
}
} catch (Exception e) {
System.err.println("Exception: " + e.toString());
e.printStackTrace(System.err);
}
}
def query_hemingway(blob: BlobClient):
query = "SELECT * FROM BlobStorage WHERE _3 = 'Hemingway, Ernest, 1899-1961'"
dump_query_csv(blob, query, False)
def dump_query_csv(blob: BlobClient, query: str, headers: bool):
qa_reader = blob.query_blob(query, blob_format=DelimitedTextDialect(has_header=headers), on_error=report_error, encoding='utf-8')
# records() returns a generator that will stream results as received. It will not block pending all results.
csv_reader = csv.reader(qa_reader.records())
for row in csv_reader:
print("*".join(row))
이 예제에서는 쿼리 가속 API에 쿼리를 보낸 다음 결과를 다시 스트리밍합니다.
blob 도우미 함수에 queryHemingway 전달된 개체는 BlockBlobClient 형식입니다.
BlockBlobClient 개체를 가져오는 방법에 대한 자세한 내용은 빠른 시작을 참조하세요. Node.jsJavaScript v12 SDK를 사용하여 Blob 관리.
async function queryHemingway(blob)
{
const query = "SELECT * FROM BlobStorage WHERE _3 = 'Hemingway, Ernest, 1899-1961'";
await dumpQueryCsv(blob, query, false);
}
async function dumpQueryCsv(blob, query, headers)
{
var response = await blob.query(query, {
inputTextConfiguration: {
kind: "csv",
recordSeparator: '\n',
hasHeaders: headers
},
outputTextConfiguration: {
kind: "csv",
recordSeparator: '\n',
hasHeaders: true
},
onProgress: (progress) => console.log(`Data read: ${progress.loadedBytes}`),
onError: (err) => console.error(`Error: ${err.position}:${err.name}:${err.description}`)});
return new Promise(
function (resolve, reject) {
csv.parseStream(response.readableStreamBody)
.on('data', row => console.log(row))
.on('error', error => {
console.error(error);
reject(error);
})
.on('end', rowCount => resolve());
});
}
결과의 범위를 열의 하위 집합으로 지정할 수 있습니다. 이렇게 하면 지정된 계산을 수행하는 데 필요한 열만 검색합니다. 이렇게 하면 네트워크를 통해 전송되는 데이터가 줄어들기 때문에 애플리케이션 성능이 향상되고 비용이 절감됩니다.
Function Get-QueryCsv($ctx, $container, $blob, $query, $hasheaders) {
$tempfile = New-TemporaryFile
$informat = New-AzStorageBlobQueryConfig -AsCsv -HasHeader:$hasheaders
Get-AzStorageBlobQueryResult -Context $ctx -Container $container -Blob $blob -InputTextConfiguration $informat -OutputTextConfiguration (New-AzStorageBlobQueryConfig -AsCsv -HasHeader) -ResultFile $tempfile.FullName -QueryString $query -Force
Get-Content $tempfile.FullName
}
$container = "data"
$blob = "csv/csv-general/seattle-library-with-headers.csv"
Get-QueryCsv $ctx $container $blob "SELECT BibNum FROM BlobStorage" $true
static async Task QueryBibNum(BlockBlobClient blob)
{
string query = @"SELECT BibNum FROM BlobStorage";
await DumpQueryCsv(blob, query, true);
}
static void QueryBibNum(BlobClient blobClient)
{
String expression = "SELECT BibNum FROM BlobStorage";
DumpQueryCsv(blobClient, expression, true);
}
def query_bibnum(blob: BlobClient):
query = "SELECT BibNum FROM BlobStorage"
dump_query_csv(blob, query, True)
async function queryBibNum(blob)
{
const query = "SELECT BibNum FROM BlobStorage";
await dumpQueryCsv(blob, query, true);
}
다음 코드는 행 필터링 및 열 프로젝션을 동일한 쿼리로 결합합니다.
Get-QueryCsv $ctx $container $blob $query $true
Function Get-QueryCsv($ctx, $container, $blob, $query, $hasheaders) {
$tempfile = New-TemporaryFile
$informat = New-AzStorageBlobQueryConfig -AsCsv -HasHeader:$hasheaders
Get-AzStorageBlobQueryResult -Context $ctx -Container $container -Blob $blob -InputTextConfiguration $informat -OutputTextConfiguration (New-AzStorageBlobQueryConfig -AsCsv -HasHeader) -ResultFile $tempfile.FullName -QueryString $query -Force
Get-Content $tempfile.FullName
}
$container = "data"
$query = "SELECT BibNum, Title, Author, ISBN, Publisher, ItemType
FROM BlobStorage
WHERE ItemType IN
('acdvd', 'cadvd', 'cadvdnf', 'calndvd', 'ccdvd', 'ccdvdnf', 'jcdvd', 'nadvd', 'nadvdnf', 'nalndvd', 'ncdvd', 'ncdvdnf')"
static async Task QueryDvds(BlockBlobClient blob)
{
string query = @"SELECT BibNum, Title, Author, ISBN, Publisher, ItemType
FROM BlobStorage
WHERE ItemType IN
('acdvd', 'cadvd', 'cadvdnf', 'calndvd', 'ccdvd', 'ccdvdnf', 'jcdvd', 'nadvd', 'nadvdnf', 'nalndvd', 'ncdvd', 'ncdvdnf')";
await DumpQueryCsv(blob, query, true);
}
static void QueryDvds(BlobClient blobClient)
{
String expression = "SELECT BibNum, Title, Author, ISBN, Publisher, ItemType " +
"FROM BlobStorage " +
"WHERE ItemType IN " +
" ('acdvd', 'cadvd', 'cadvdnf', 'calndvd', 'ccdvd', 'ccdvdnf', 'jcdvd', 'nadvd', 'nadvdnf', 'nalndvd', 'ncdvd', 'ncdvdnf')";
DumpQueryCsv(blobClient, expression, true);
}
def query_dvds(blob: BlobClient):
query = "SELECT BibNum, Title, Author, ISBN, Publisher, ItemType "\
"FROM BlobStorage "\
"WHERE ItemType IN "\
" ('acdvd', 'cadvd', 'cadvdnf', 'calndvd', 'ccdvd', 'ccdvdnf', 'jcdvd', 'nadvd', 'nadvdnf', 'nalndvd', 'ncdvd', 'ncdvdnf')"
dump_query_csv(blob, query, True)
async function queryDvds(blob)
{
const query = "SELECT BibNum, Title, Author, ISBN, Publisher, ItemType " +
"FROM BlobStorage " +
"WHERE ItemType IN " +
" ('acdvd', 'cadvd', 'cadvdnf', 'calndvd', 'ccdvd', 'ccdvdnf', 'jcdvd', 'nadvd', 'nadvdnf', 'nalndvd', 'ncdvd', 'ncdvdnf')";
await dumpQueryCsv(blob, query, true);
}