Python を使用してブロック BLOB をアップロードする

この記事では、Python 用の Azure Storage クライアント ライブラリを使用して BLOB をアップロードする方法について説明します。 ファイル パス、ストリーム、バイナリ オブジェクト、またはテキスト文字列からブロック BLOB にデータをアップロードできます。 インデックス タグを使用して BLOB をアップロードすることもできます。

非同期 API を使用した BLOB のアップロードの詳細については、「BLOB を非同期にアップロードする」に関する記事を参照してください。

前提条件

  • この記事では、Python 用の Azure Blob Storage クライアント ライブラリを操作するための設定が済んだプロジェクトが、既にあることを前提としています。 パッケージのインストール、import ステートメントの追加、認可されたクライアント オブジェクトの作成など、プロジェクトの設定については、「Azure Blob Storage と .Python の作業を始める」をご覧ください。
  • コードで非同期 API を使用するには、非同期プログラミングに関するセクションの要件を参照してください。
  • 認可メカニズムには、アップロード操作を実行するためのアクセス許可が必要です。 詳細については、次の REST API 操作の認可ガイダンスを参照してください。

ブロック BLOB にデータをアップロードする

ストリームまたはバイナリ オブジェクトを使用して BLOB をアップロードするには、次のメソッドを使用します。

このメソッドは、自動チャンクを使用してデータ ソースから新しい BLOB を作成します。つまり、データ ソースを小さなチャンクに分割してアップロードできます。 アップロードを実行するために、クライアント ライブラリは Put Blob または一連の Put Block 呼び出し後の Put Block List のどちらかを使用する可能性があります。 この動作は、オブジェクトの合計サイズと、データ転送オプションの設定方法によって異なります。

ローカル ファイル パスからブロック BLOB をアップロードする

次の例では、BlobClient オブジェクトを使用してブロック BLOB にファイルをアップロードします。

def upload_blob_file(self, blob_service_client: BlobServiceClient, container_name: str):
    container_client = blob_service_client.get_container_client(container=container_name)
    with open(file=os.path.join('filepath', 'filename'), mode="rb") as data:
        blob_client = container_client.upload_blob(name="sample-blob.txt", data=data, overwrite=True)

ストリームからブロック BLOB をアップロードする

次の例では、ランダムのデータ バイトを作成し、BlobClient オブジェクトを使用してブロック BLOB に BytesIO オブジェクトをアップロードします。

def upload_blob_stream(self, blob_service_client: BlobServiceClient, container_name: str):
    blob_client = blob_service_client.get_blob_client(container=container_name, blob="sample-blob.txt")
    input_stream = io.BytesIO(os.urandom(15))
    blob_client.upload_blob(input_stream, blob_type="BlockBlob")

ブロック BLOB にバイナリ データをアップロードする

次の例では、BlobClient オブジェクトを使用してブロック BLOB にバイナリ データをアップロードします。

def upload_blob_data(self, blob_service_client: BlobServiceClient, container_name: str):
    blob_client = blob_service_client.get_blob_client(container=container_name, blob="sample-blob.txt")
    data = b"Sample data for blob"

    # Upload the blob data - default blob type is BlockBlob
    blob_client.upload_blob(data, blob_type="BlockBlob")

インデックス タグ付きのブロック BLOB をアップロードする

次の例では、インデックス タグを含むブロック BLOB をアップロードします。

def upload_blob_tags(self, blob_service_client: BlobServiceClient, container_name: str):
    container_client = blob_service_client.get_container_client(container=container_name)
    sample_tags = {"Content": "image", "Date": "2022-01-01"}
    with open(file=os.path.join('filepath', 'filename'), mode="rb") as data:
        blob_client = container_client.upload_blob(name="sample-blob.txt", data=data, tags=sample_tags)

構成オプションを使用したブロック BLOB のアップロード

BLOB をアップロードするときに、クライアント ライブラリの構成オプションを定義できます。 これらのオプションは、パフォーマンスを向上させたり、信頼性を高めたり、コストを最適化したりするために調整できます。 次のコード例では、BlobClient をインスタンス化するときに、メソッド レベルとクライアント レベルの両方でダウンロードの構成オプションを定義する方法を示します。 これらのオプションは、ContainerClient インスタンスまたは BlobServiceClient インスタンス用に構成することもできます。

アップロード時のデータ転送オプションの指定

クライアントをインスタンス化するときに構成オプションを設定して、データ転送操作のパフォーマンスを最適化できます。 Python でクライアント オブジェクトを構築するときに、次のキーワード引数を渡すことができます。

  • max_block_size - ブロック BLOB をチャンク単位でアップロードするための最大チャンク サイズ。 既定値は 4 MiB です。
  • max_single_put_size - BLOB サイズが max_single_put_size 以下 の場合、BLOB は 1 つの Put Blob 要求でアップロードされます。 BLOB サイズが max_single_put_size より大きい、または不明な場合、BLOB は Put Block を使用してチャンク単位でアップロードされ、Put Block List を使用してコミットされます。 既定値は 64 MiB です。

Blob Storage での転送サイズの制限の詳細については、「Blob Storage のスケール ターゲット」を参照してください。

アップロード操作の場合は、upload_blob を呼び出すときに max_concurrency 引数を渡すこともできます。 この引数は、BLOB サイズが 64 MiB を超える場合に使用する並列接続の最大数を定義します。

次のコード例は、BlobClient オブジェクトの作成時にデータ転送オプションを指定する方法と、そのクライアント オブジェクトを使用してデータをアップロードする方法を示しています。 このサンプルで提供される値は、推奨を意図したものではありません。 これらの値を適切にチューニングするには、アプリの特定のニーズを考慮する必要があります。

def upload_blob_transfer_options(self, account_url: str, container_name: str, blob_name: str):
    # Create a BlobClient object with data transfer options for upload
    blob_client = BlobClient(
        account_url=account_url, 
        container_name=container_name, 
        blob_name=blob_name,
        credential=DefaultAzureCredential(),
        max_block_size=1024*1024*4, # 4 MiB
        max_single_put_size=1024*1024*8 # 8 MiB
    )
    
    with open(file=os.path.join(r'file_path', blob_name), mode="rb") as data:
        blob_client = blob_client.upload_blob(data=data, overwrite=True, max_concurrency=2)

データ転送オプションのチューニングの詳細については、Python を使用したアップロードとダウンロードのパフォーマンス チューニングに関するページを参照してください。

アップロード時に BLOB のアクセス層を設定する

standard_blob_tier キーワード引数を upload_blob に渡すことで、アップロード時の BLOB のアクセス層を設定できます。 Azure Storage には、使用方法に応じて最もコスト効率の高い方法で BLOB データを保存できるように複数のアクセス層が用意されています。

次のコード例は、BLOB をアップロードするときにアクセス層を設定する方法を示しています。

def upload_blob_access_tier(self, blob_service_client: BlobServiceClient, container_name: str, blob_name: str):
    blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
    
    #Upload blob to the cool tier
    with open(file=os.path.join(r'file_path', blob_name), mode="rb") as data:
        blob_client = blob_client.upload_blob(data=data, overwrite=True, standard_blob_tier=StandardBlobTier.COOL)

アクセス層の設定はブロック BLOB でのみ許可されています。 ブロック BLOB のアクセス層は、HotCoolCold、または Archive に設定できます。 アクセス層を Cold に設定するには、最小クライアント ライブラリ バージョン 12.15.0 を使用する必要があります。

アクセス層の詳細については、「アクセス層の概要」を参照してください。

ブロックのステージングとコミットによってブロック BLOB をアップロードする

個々のデータ ブロックを手動でステージングすることによって、アップロードをブロックに分割する方法をより細かく制御できます。 BLOB を構成するすべてのブロックがステージングされたら、それらを Blob Storage にコミットできます。

BLOB の一部としてコミットされる新しいブロックを作成するには、次のメソッドを使用します。

BLOB を構成するブロック ID の一覧を指定して BLOB を書き込むには、次のメソッドを使用します。

次の例では、BLOB の一部としてコミットされるファイルとステージ ブロックからデータを読み取ります。

def upload_blocks(self, blob_container_client: ContainerClient, local_file_path: str, block_size: int):
    file_name = os.path.basename(local_file_path)
    blob_client = blob_container_client.get_blob_client(file_name)

    with open(file=local_file_path, mode="rb") as file_stream:
        block_id_list = []

        while True:
            buffer = file_stream.read(block_size)
            if not buffer:
                break

            block_id = uuid.uuid4().hex
            block_id_list.append(BlobBlock(block_id=block_id))

            blob_client.stage_block(block_id=block_id, data=buffer, length=len(buffer))

        blob_client.commit_block_list(block_id_list)

BLOB を非同期にアップロードする

Python 用 Azure Blob Storage クライアント ライブラリでは、BLOB の非同期アップロードがサポートされています。 プロジェクトのセットアップ要件の詳細については、非同期プログラミングに関する記事を参照してください。

非同期 API を使用して BLOB をアップロードするには、次の手順に従います。

  1. 次の import ステートメントを追加します。

    import asyncio
    
    from azure.identity.aio import DefaultAzureCredential
    from azure.storage.blob.aio import BlobServiceClient, BlobClient, ContainerClient
    
  2. asyncio.run を使ってプログラムを実行するコードを追加します。 この関数では、この例で渡されたコルーチン main() を実行し、asyncio イベント ループを管理します。 コルーチンは、async/await 構文で宣言されます。 この例の main() コルーチンでは、最初に async with を使用して最上位の BlobServiceClient を作成し、次に BLOB をアップロードするメソッドを呼び出します。 最上位のクライアントのみで async with を使用する必要があることに注意してください。ここから作成された他のクライアントでは同じ接続プールを共有するためです。

    async def main():
        sample = BlobSamples()
    
        # TODO: Replace <storage-account-name> with your actual storage account name
        account_url = "https://<storage-account-name>.blob.core.windows.net"
        credential = DefaultAzureCredential()
    
        async with BlobServiceClient(account_url, credential=credential) as blob_service_client:
            await sample.upload_blob_file(blob_service_client, "sample-container")
    
    if __name__ == '__main__':
        asyncio.run(main())
    
  3. BLOB をアップロードするコードを追加します。 次の例では、ContainerClient オブジェクトを使用してローカル ファイル パスから BLOB をアップロードします。 このコードは同期の例と同じですが、async キーワードを使用してメソッドが宣言され、upload_blob メソッドを呼び出すときに await キーワードが使用される点が異なります。

    async def upload_blob_file(self, blob_service_client: BlobServiceClient, container_name: str):
        container_client = blob_service_client.get_container_client(container=container_name)
        with open(file=os.path.join('filepath', 'filename'), mode="rb") as data:
            blob_client = await container_client.upload_blob(name="sample-blob.txt", data=data, overwrite=True)
    

この基本的なセットアップが実施されている場合、async/await 構文を使用して、この記事の他の例をコルーチンとして実装できます。

リソース

Python 用 Azure Blob Storage クライアント ライブラリを使用した BLOB のアップロードの詳細については、次のリソースを参照してください。

REST API の操作

Azure SDK for Python には Azure REST API に基づき構築されたライブラリが含まれるため、使い慣れた Python パラダイムを通じて REST API 操作を実施できます。 BLOB をアップロードするためのクライアント ライブラリ メソッドでは、次の REST API 操作を使用します。

コード サンプル

  • この記事の同期または非同期のサンプル コードを表示する (GitHub)

クライアント ライブラリのリソース

関連項目