Python을 사용하여 블록 Blob 업로드
이 문서에서는 Python용 Azure Storage 클라이언트 라이브러리를 사용하여 Blob을 업로드하는 방법을 보여 줍니다. 파일 경로, 스트림, 이진 개체 또는 텍스트 문자열에서 블록 Blob에 데이터를 업로드할 수 있습니다. 인덱스 태그를 사용하여 Blob을 업로드할 수도 있습니다.
비동기 API를 사용하여 Blob을 업로드하는 방법에 대한 자세한 내용은 비동기적으로 Blob 업로드를 참조하세요.
필수 조건
- 이 문서에서는 Python용 Azure Blob Storage 클라이언트 라이브러리로 작업하도록 프로젝트가 이미 설정되어 있다고 가정합니다. 패키지 설치,
import
문 추가 및 권한 있는 클라이언트 개체 만들기를 포함하여 프로젝트를 설정하는 방법에 대한 자세한 내용은 Azure Blob Storage 및 Python 시작을 참조하세요. - 코드에서 비동기 API를 사용하려면 비동기 프로그래밍 섹션의 요구 사항을 참조하세요.
- 권한 부여 메커니즘에는 업로드 작업을 수행할 수 있는 권한이 있어야 합니다. 자세한 내용은 다음 REST API 작업에 대한 권한 부여 지침을 참조하세요.
블록 Blob에 데이터 업로드
스트림 또는 이진 개체를 사용하여 Blob을 업로드하려면 다음 메서드를 사용합니다.
이 메서드는 자동 청크를 사용하여 데이터 원본에서 새 Blob을 만듭니다. 즉, 데이터 원본을 더 작은 청크로 분할하고 업로드할 수 있습니다. 업로드를 수행하기 위해 클라이언트 라이브러리는 Put Blob 또는 일련의 Put Block 호출과 Put Block List를 사용할 수 있습니다. 이 동작은 개체의 전체 크기와 데이터 전송 옵션을 설정한 방법에 따라 달라집니다.
로컬 파일 경로에서 블록 Blob 업로드
다음 예제에서는 개체를 사용하여 BlobClient
블록 Blob에 파일을 업로드합니다.
def upload_blob_file(self, blob_service_client: BlobServiceClient, container_name: str):
container_client = blob_service_client.get_container_client(container=container_name)
with open(file=os.path.join('filepath', 'filename'), mode="rb") as data:
blob_client = container_client.upload_blob(name="sample-blob.txt", data=data, overwrite=True)
스트림에서 블록 Blob 업로드
다음 예제에서는 임의의 데이터 바이트를 만들고 개체를 BytesIO
사용하여 블록 Blob에 개체를 BlobClient
업로드합니다.
def upload_blob_stream(self, blob_service_client: BlobServiceClient, container_name: str):
blob_client = blob_service_client.get_blob_client(container=container_name, blob="sample-blob.txt")
input_stream = io.BytesIO(os.urandom(15))
blob_client.upload_blob(input_stream, blob_type="BlockBlob")
이진 데이터를 블록 Blob에 업로드
다음 예제에서는 BlobClient
개체를 사용하여 이진 데이터를 블록 Blob에 업로드합니다.
def upload_blob_data(self, blob_service_client: BlobServiceClient, container_name: str):
blob_client = blob_service_client.get_blob_client(container=container_name, blob="sample-blob.txt")
data = b"Sample data for blob"
# Upload the blob data - default blob type is BlockBlob
blob_client.upload_blob(data, blob_type="BlockBlob")
인덱스 태그가 있는 블록 Blob 업로드
다음 예제에서는 인덱스 태그가 있는 블록 Blob을 업로드합니다.
def upload_blob_tags(self, blob_service_client: BlobServiceClient, container_name: str):
container_client = blob_service_client.get_container_client(container=container_name)
sample_tags = {"Content": "image", "Date": "2022-01-01"}
with open(file=os.path.join('filepath', 'filename'), mode="rb") as data:
blob_client = container_client.upload_blob(name="sample-blob.txt", data=data, tags=sample_tags)
구성 옵션을 사용하여 블록 Blob 업로드
Blob을 업로드할 때 클라이언트 라이브러리 구성 옵션을 정의할 수 있습니다. 이러한 옵션을 조정하여 성능을 개선하고 안정성을 향상시키고 비용을 최적화할 수 있습니다. 다음 코드 예제에서는 BlobClient를 인스턴스화할 때 클라이언트 수준과 메서드 수준 모두에서 업로드에 대한 구성 옵션을 정의하는 방법을 보여 줍니다. ContainerClient 인스턴스 또는 BlobServiceClient 인스턴스에 대해서도 이러한 옵션을 구성할 수 있습니다.
업로드를 위한 데이터 전송 옵션 지정
클라이언트를 인스턴스화하여 데이터 전송 작업에 대한 성능을 최적화할 때 구성 옵션을 설정할 수 있습니다. Python에서 클라이언트 개체를 생성할 때 다음 키워드 인수를 전달할 수 있습니다.
max_block_size
- 블록 Blob을 청크로 업로드하기 위한 최대 청크 크기입니다. 기본값은 4MiB입니다.max_single_put_size
- Blob 크기가max_single_put_size
보다 작거나 같은 경우, Blob은 단일Put Blob
요청으로 업로드됩니다. Blob 크기가max_single_put_size
보다 크거나 알 수 없는 경우 Blob은Put Block
을 사용하여 청크로 업로드되고Put Block List
를 사용하여 커밋됩니다. 기본값은 64MiB입니다.
Blob Storage 전송 크기 제한에 대한 자세한 내용은 의 스케일링 대상을 참조하세요.
업로드 작업의 경우 upload_blob을 호출할 때 max_concurrency
인수도 전달할 수 있습니다. 이 인수는 Blob 크기가 64MiB를 초과할 때 사용할 최대 병렬 연결 수를 정의합니다.
다음 코드 예제에서는 BlobClient
개체를 만들 때 데이터 전송 옵션을 지정하는 방법과 해당 클라이언트 개체를 사용하여 데이터를 업로드하는 방법을 보여 줍니다. 이 샘플에 제공된 값은 권장 사항이 아닙니다. 이러한 값을 올바르게 조정하려면 앱의 특정 요구 사항을 고려해야 합니다.
def upload_blob_transfer_options(self, account_url: str, container_name: str, blob_name: str):
# Create a BlobClient object with data transfer options for upload
blob_client = BlobClient(
account_url=account_url,
container_name=container_name,
blob_name=blob_name,
credential=DefaultAzureCredential(),
max_block_size=1024*1024*4, # 4 MiB
max_single_put_size=1024*1024*8 # 8 MiB
)
with open(file=os.path.join(r'file_path', blob_name), mode="rb") as data:
blob_client = blob_client.upload_blob(data=data, overwrite=True, max_concurrency=2)
데이터 전송 옵션 튜닝에 대한 자세한 내용은 Python을 사용하여 업로드 및 다운로드에 대한 성능 조정을 참조하세요.
업로드 시 Blob의 액세스 계층 설정
standard_blob_tier
키워드 인수를 upload_blob으로 전달하여 업로드 시 Blob의 액세스 계층을 설정할 수 있습니다. Azure Storage는 사용 방법에 따라 Blob 데이터를 가장 비용 효율적인 방식으로 저장할 수 있도록 다양한 액세스 계층을 제공합니다.
다음 코드 예제에서는 BLOB을 업로드할 때 액세스 계층을 설정하는 방법을 보여 줍니다.
def upload_blob_access_tier(self, blob_service_client: BlobServiceClient, container_name: str, blob_name: str):
blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
#Upload blob to the cool tier
with open(file=os.path.join(r'file_path', blob_name), mode="rb") as data:
blob_client = blob_client.upload_blob(data=data, overwrite=True, standard_blob_tier=StandardBlobTier.COOL)
액세스 계층 설정은 블록 Blob에서만 허용됩니다. 블록 Blob에 대한 액세스 계층을 Hot
, Cool
, Cold
또는 Archive
로 설정할 수 있습니다. 액세스 계층을 Cold
로 설정하려면 최소 클라이언트 라이브러리 버전, 12.15.0을 사용해야 합니다.
액세스 계층에 대한 자세한 내용은 액세스 계층 개요를 참조하세요.
블록을 준비하고 커밋하여 블록 Blob 업로드
개별 데이터 블록을 수동으로 스테이징하여 업로드를 블록으로 나누는 방법을 보다 효율적으로 제어할 수 있습니다. Blob을 구성하는 모든 블록이 준비되면 Blob Storage에 커밋할 수 있습니다.
다음 메서드를 사용하여 Blob의 일부로 커밋할 새 블록을 만듭니다.
다음 메서드를 사용하여 Blob을 구성하는 블록 ID 목록을 지정하여 Blob을 작성합니다.
다음 예제에서는 파일에서 데이터를 읽고 Blob의 일부로 커밋할 블록을 스테이징합니다.
def upload_blocks(self, blob_container_client: ContainerClient, local_file_path: str, block_size: int):
file_name = os.path.basename(local_file_path)
blob_client = blob_container_client.get_blob_client(file_name)
with open(file=local_file_path, mode="rb") as file_stream:
block_id_list = []
while True:
buffer = file_stream.read(block_size)
if not buffer:
break
block_id = uuid.uuid4().hex
block_id_list.append(BlobBlock(block_id=block_id))
blob_client.stage_block(block_id=block_id, data=buffer, length=len(buffer))
blob_client.commit_block_list(block_id_list)
비동기적으로 Blob 업로드
Python용 Azure Blob Storage 클라이언트 라이브러리는 비동기적으로 Blob 업로드를 지원합니다. 프로젝트 설정 요구 사항에 대한 자세한 내용은 비동기 프로그래밍을 참조 하세요.
비동기 API를 사용하여 Blob을 업로드하려면 다음 단계를 수행합니다.
다음 import 문을 추가합니다.
import asyncio from azure.identity.aio import DefaultAzureCredential from azure.storage.blob.aio import BlobServiceClient, BlobClient, ContainerClient
를 사용하여
asyncio.run
프로그램을 실행하는 코드를 추가합니다. 이 함수는 이 예제에서 전달된 코루틴main()
을 실행하고 이벤트 루프를asyncio
관리합니다. 코루틴은 비동기/await 구문을 사용하여 선언됩니다. 이 예제에서 코루틴은main()
먼저 최상위 수준을BlobServiceClient
사용하여async with
만든 다음 Blob을 업로드하는 메서드를 호출합니다. 최상위 클라이언트에서 만든 다른 클라이언트가 동일한 연결 풀을 공유하므로 최상위 클라이언트만 사용해야async with
합니다.async def main(): sample = BlobSamples() # TODO: Replace <storage-account-name> with your actual storage account name account_url = "https://<storage-account-name>.blob.core.windows.net" credential = DefaultAzureCredential() async with BlobServiceClient(account_url, credential=credential) as blob_service_client: await sample.upload_blob_file(blob_service_client, "sample-container") if __name__ == '__main__': asyncio.run(main())
Blob을 업로드하는 코드를 추가합니다. 다음 예제에서는 개체를 사용하여 로컬 파일 경로에서 Blob을
ContainerClient
업로드합니다. 이 코드는 메서드가 키워드(keyword) 선언되고async
메서드를 호출upload_blob
할 때 키워드(keyword) 사용된다는 점을 제외하고 동기 예제와await
동일합니다.async def upload_blob_file(self, blob_service_client: BlobServiceClient, container_name: str): container_client = blob_service_client.get_container_client(container=container_name) with open(file=os.path.join('filepath', 'filename'), mode="rb") as data: blob_client = await container_client.upload_blob(name="sample-blob.txt", data=data, overwrite=True)
이 기본 설정을 사용하면 비동기/await 구문을 사용하여 이 문서의 다른 예제를 코루틴으로 구현할 수 있습니다.
리소스
Python용 Azure Blob Storage 클라이언트 라이브러리를 사용하여 Blob을 업로드하는 방법에 대한 자세한 내용은 다음 리소스를 참조하세요.
REST API 작업
Python용 Azure SDK에는 Azure REST API를 기반으로 빌드되는 라이브러리가 포함되어 있으므로 익숙한 Python 패러다임을 통해 REST API 작업과 상호 작용할 수 있습니다. Blob을 업로드하기 위한 클라이언트 라이브러리 메서드는 다음 REST API 작업을 사용합니다.