Python を使用してブロック BLOB をアップロードする
この記事では、Python 用の Azure Storage クライアント ライブラリを使用して BLOB をアップロードする方法について説明します。 ファイル パス、ストリーム、バイナリ オブジェクト、またはテキスト文字列からブロック BLOB にデータをアップロードできます。 インデックス タグを使用して BLOB をアップロードすることもできます。
非同期 API を使用した BLOB のアップロードの詳細については、「BLOB を非同期にアップロードする」に関する記事を参照してください。
前提条件
- この記事では、Python 用の Azure Blob Storage クライアント ライブラリを操作するための設定が済んだプロジェクトが、既にあることを前提としています。 パッケージのインストール、
import
ステートメントの追加、認可されたクライアント オブジェクトの作成など、プロジェクトの設定については、「Azure Blob Storage と .Python の作業を始める」をご覧ください。 - コードで非同期 API を使用するには、非同期プログラミングに関するセクションの要件を参照してください。
- 認可メカニズムには、アップロード操作を実行するためのアクセス許可が必要です。 詳細については、次の REST API 操作の認可ガイダンスを参照してください。
ブロック BLOB にデータをアップロードする
ストリームまたはバイナリ オブジェクトを使用して BLOB をアップロードするには、次のメソッドを使用します。
このメソッドは、自動チャンクを使用してデータ ソースから新しい BLOB を作成します。つまり、データ ソースを小さなチャンクに分割してアップロードできます。 アップロードを実行するために、クライアント ライブラリは Put Blob または一連の Put Block 呼び出し後の Put Block List のどちらかを使用する可能性があります。 この動作は、オブジェクトの合計サイズと、データ転送オプションの設定方法によって異なります。
ローカル ファイル パスからブロック BLOB をアップロードする
次の例では、BlobClient
オブジェクトを使用してブロック BLOB にファイルをアップロードします。
def upload_blob_file(self, blob_service_client: BlobServiceClient, container_name: str):
container_client = blob_service_client.get_container_client(container=container_name)
with open(file=os.path.join('filepath', 'filename'), mode="rb") as data:
blob_client = container_client.upload_blob(name="sample-blob.txt", data=data, overwrite=True)
ストリームからブロック BLOB をアップロードする
次の例では、ランダムのデータ バイトを作成し、BlobClient
オブジェクトを使用してブロック BLOB に BytesIO
オブジェクトをアップロードします。
def upload_blob_stream(self, blob_service_client: BlobServiceClient, container_name: str):
blob_client = blob_service_client.get_blob_client(container=container_name, blob="sample-blob.txt")
input_stream = io.BytesIO(os.urandom(15))
blob_client.upload_blob(input_stream, blob_type="BlockBlob")
ブロック BLOB にバイナリ データをアップロードする
次の例では、BlobClient
オブジェクトを使用してブロック BLOB にバイナリ データをアップロードします。
def upload_blob_data(self, blob_service_client: BlobServiceClient, container_name: str):
blob_client = blob_service_client.get_blob_client(container=container_name, blob="sample-blob.txt")
data = b"Sample data for blob"
# Upload the blob data - default blob type is BlockBlob
blob_client.upload_blob(data, blob_type="BlockBlob")
インデックス タグ付きのブロック BLOB をアップロードする
次の例では、インデックス タグを含むブロック BLOB をアップロードします。
def upload_blob_tags(self, blob_service_client: BlobServiceClient, container_name: str):
container_client = blob_service_client.get_container_client(container=container_name)
sample_tags = {"Content": "image", "Date": "2022-01-01"}
with open(file=os.path.join('filepath', 'filename'), mode="rb") as data:
blob_client = container_client.upload_blob(name="sample-blob.txt", data=data, tags=sample_tags)
構成オプションを使用したブロック BLOB のアップロード
BLOB をアップロードするときに、クライアント ライブラリの構成オプションを定義できます。 これらのオプションは、パフォーマンスを向上させたり、信頼性を高めたり、コストを最適化したりするために調整できます。 次のコード例では、BlobClient をインスタンス化するときに、メソッド レベルとクライアント レベルの両方でダウンロードの構成オプションを定義する方法を示します。 これらのオプションは、ContainerClient インスタンスまたは BlobServiceClient インスタンス用に構成することもできます。
アップロード時のデータ転送オプションの指定
クライアントをインスタンス化するときに構成オプションを設定して、データ転送操作のパフォーマンスを最適化できます。 Python でクライアント オブジェクトを構築するときに、次のキーワード引数を渡すことができます。
max_block_size
- ブロック BLOB をチャンク単位でアップロードするための最大チャンク サイズ。 既定値は 4 MiB です。max_single_put_size
- BLOB サイズがmax_single_put_size
以下 の場合、BLOB は 1 つのPut Blob
要求でアップロードされます。 BLOB サイズがmax_single_put_size
より大きい、または不明な場合、BLOB はPut Block
を使用してチャンク単位でアップロードされ、Put Block List
を使用してコミットされます。 既定値は 64 MiB です。
Blob Storage での転送サイズの制限の詳細については、「Blob Storage のスケール ターゲット」を参照してください。
アップロード操作の場合は、upload_blob を呼び出すときに max_concurrency
引数を渡すこともできます。 この引数は、BLOB サイズが 64 MiB を超える場合に使用する並列接続の最大数を定義します。
次のコード例は、BlobClient
オブジェクトの作成時にデータ転送オプションを指定する方法と、そのクライアント オブジェクトを使用してデータをアップロードする方法を示しています。 このサンプルで提供される値は、推奨を意図したものではありません。 これらの値を適切にチューニングするには、アプリの特定のニーズを考慮する必要があります。
def upload_blob_transfer_options(self, account_url: str, container_name: str, blob_name: str):
# Create a BlobClient object with data transfer options for upload
blob_client = BlobClient(
account_url=account_url,
container_name=container_name,
blob_name=blob_name,
credential=DefaultAzureCredential(),
max_block_size=1024*1024*4, # 4 MiB
max_single_put_size=1024*1024*8 # 8 MiB
)
with open(file=os.path.join(r'file_path', blob_name), mode="rb") as data:
blob_client = blob_client.upload_blob(data=data, overwrite=True, max_concurrency=2)
データ転送オプションのチューニングの詳細については、Python を使用したアップロードとダウンロードのパフォーマンス チューニングに関するページを参照してください。
アップロード時に BLOB のアクセス層を設定する
standard_blob_tier
キーワード引数を upload_blob に渡すことで、アップロード時の BLOB のアクセス層を設定できます。 Azure Storage には、使用方法に応じて最もコスト効率の高い方法で BLOB データを保存できるように複数のアクセス層が用意されています。
次のコード例は、BLOB をアップロードするときにアクセス層を設定する方法を示しています。
def upload_blob_access_tier(self, blob_service_client: BlobServiceClient, container_name: str, blob_name: str):
blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
#Upload blob to the cool tier
with open(file=os.path.join(r'file_path', blob_name), mode="rb") as data:
blob_client = blob_client.upload_blob(data=data, overwrite=True, standard_blob_tier=StandardBlobTier.COOL)
アクセス層の設定はブロック BLOB でのみ許可されています。 ブロック BLOB のアクセス層は、Hot
、Cool
、Cold
、または Archive
に設定できます。 アクセス層を Cold
に設定するには、最小クライアント ライブラリ バージョン 12.15.0 を使用する必要があります。
アクセス層の詳細については、「アクセス層の概要」を参照してください。
ブロックのステージングとコミットによってブロック BLOB をアップロードする
個々のデータ ブロックを手動でステージングすることによって、アップロードをブロックに分割する方法をより細かく制御できます。 BLOB を構成するすべてのブロックがステージングされたら、それらを Blob Storage にコミットできます。
BLOB の一部としてコミットされる新しいブロックを作成するには、次のメソッドを使用します。
BLOB を構成するブロック ID の一覧を指定して BLOB を書き込むには、次のメソッドを使用します。
次の例では、BLOB の一部としてコミットされるファイルとステージ ブロックからデータを読み取ります。
def upload_blocks(self, blob_container_client: ContainerClient, local_file_path: str, block_size: int):
file_name = os.path.basename(local_file_path)
blob_client = blob_container_client.get_blob_client(file_name)
with open(file=local_file_path, mode="rb") as file_stream:
block_id_list = []
while True:
buffer = file_stream.read(block_size)
if not buffer:
break
block_id = uuid.uuid4().hex
block_id_list.append(BlobBlock(block_id=block_id))
blob_client.stage_block(block_id=block_id, data=buffer, length=len(buffer))
blob_client.commit_block_list(block_id_list)
BLOB を非同期にアップロードする
Python 用 Azure Blob Storage クライアント ライブラリでは、BLOB の非同期アップロードがサポートされています。 プロジェクトのセットアップ要件の詳細については、非同期プログラミングに関する記事を参照してください。
非同期 API を使用して BLOB をアップロードするには、次の手順に従います。
次の import ステートメントを追加します。
import asyncio from azure.identity.aio import DefaultAzureCredential from azure.storage.blob.aio import BlobServiceClient, BlobClient, ContainerClient
asyncio.run
を使ってプログラムを実行するコードを追加します。 この関数では、この例で渡されたコルーチンmain()
を実行し、asyncio
イベント ループを管理します。 コルーチンは、async/await 構文で宣言されます。 この例のmain()
コルーチンでは、最初にasync with
を使用して最上位のBlobServiceClient
を作成し、次に BLOB をアップロードするメソッドを呼び出します。 最上位のクライアントのみでasync with
を使用する必要があることに注意してください。ここから作成された他のクライアントでは同じ接続プールを共有するためです。async def main(): sample = BlobSamples() # TODO: Replace <storage-account-name> with your actual storage account name account_url = "https://<storage-account-name>.blob.core.windows.net" credential = DefaultAzureCredential() async with BlobServiceClient(account_url, credential=credential) as blob_service_client: await sample.upload_blob_file(blob_service_client, "sample-container") if __name__ == '__main__': asyncio.run(main())
BLOB をアップロードするコードを追加します。 次の例では、
ContainerClient
オブジェクトを使用してローカル ファイル パスから BLOB をアップロードします。 このコードは同期の例と同じですが、async
キーワードを使用してメソッドが宣言され、upload_blob
メソッドを呼び出すときにawait
キーワードが使用される点が異なります。async def upload_blob_file(self, blob_service_client: BlobServiceClient, container_name: str): container_client = blob_service_client.get_container_client(container=container_name) with open(file=os.path.join('filepath', 'filename'), mode="rb") as data: blob_client = await container_client.upload_blob(name="sample-blob.txt", data=data, overwrite=True)
この基本的なセットアップが実施されている場合、async/await 構文を使用して、この記事の他の例をコルーチンとして実装できます。
リソース
Python 用 Azure Blob Storage クライアント ライブラリを使用した BLOB のアップロードの詳細については、次のリソースを参照してください。
REST API の操作
Azure SDK for Python には Azure REST API に基づき構築されたライブラリが含まれるため、使い慣れた Python パラダイムを通じて REST API 操作を実施できます。 BLOB をアップロードするためのクライアント ライブラリ メソッドでは、次の REST API 操作を使用します。