이 Notebook은 이미지 데이터 및 주석을 Custom Vision Service 프로젝트의 작업 영역에서 스토리지 Blob의 자체 COCO 파일로 내보내 이미지 분석 모델 사용자 지정을 통해 학습할 준비를 합니다. 사용자 지정 Python 스크립트를 사용하여 이 섹션의 코드를 실행하거나 호환되는 플랫폼에서 Notebook을 다운로드하고 실행할 수 있습니다.
팁
export_cvs_data_to_blob_storage.ipynb의 내용. GitHub에서 열기.
Python 샘플 패키지 설치
다음 명령을 실행하여 필요한 Python 샘플 패키지를 설치합니다.
pip install cognitive-service-vision-model-customization-python-samples
인증
다음으로, Custom Vision 프로젝트 및 Blob Storage 컨테이너의 자격 증명을 제공합니다.
올바른 매개 변수 값을 입력해야 합니다. 다음 정보가 필요합니다.
- 새 사용자 지정 모델 프로젝트에 사용할 Azure Storage 계정의 이름
- 해당 스토리지 계정의 키
- 해당 스토리지 계정에서 사용하려는 컨테이너의 이름
- Custom Vision 학습 키
- Custom Vision 엔드포인트 URL
- Custom Vision 프로젝트의 프로젝트 ID
Azure Storage 자격 증명은 Azure Portal의 해당 리소스 페이지에서 찾을 수 있습니다. Custom Vision 자격 증명은 Custom Vision 웹 포털의 Custom Vision 프로젝트 설정 페이지에서 찾을 수 있습니다.
azure_storage_account_name = ''
azure_storage_account_key = ''
azure_storage_container_name = ''
custom_vision_training_key = ''
custom_vision_endpoint = ''
custom_vision_project_id = ''
마이그레이션 실행
마이그레이션 코드를 실행하면 Custom Vision 학습 이미지가 지정된 Azure Blob Storage 컨테이너의 {project_name}_{project_id}/images
폴더에 저장되고 COCO 파일이 동일한 컨테이너의 {project_name}_{project_id}/train.json
에 저장됩니다. 네거티브 태그가 지정된 이미지를 포함하여 태그가 지정된 이미지와 태그가 지정되지 않은 이미지 모두를 내보냅니다.
Important
현재 이미지 분석 모델 사용자 지정은 다중 레이블 분류 학습을 지원하지 않지만 Custom Vision 다중 레이블 분류 프로젝트에서 데이터를 계속 내보낼 수 있습니다.
from cognitive_service_vision_model_customization_python_samples import export_data
import logging
logging.getLogger().setLevel(logging.INFO)
logging.getLogger('azure.core.pipeline.policies.http_logging_policy').setLevel(logging.WARNING)
n_process = 8
export_data(azure_storage_account_name, azure_storage_account_key, azure_storage_container_name, custom_vision_endpoint, custom_vision_training_key, custom_vision_project_id, n_process)
라이브러리 설치
이 스크립트에는 특정 Python 라이브러리가 필요합니다. 다음 명령을 사용하여 프로젝트 디렉터리에 설치합니다.
pip install azure-storage-blob azure-cognitiveservices-vision-customvision cffi
마이그레이션 스크립트 준비
새 Python 파일(예: export-cvs-data-to-coco.py)을 만듭니다. 그런 다음, 텍스트 편집기에서 열고 다음 내용을 붙여 넣습니다.
from typing import List, Union
from azure.cognitiveservices.vision.customvision.training import CustomVisionTrainingClient
from azure.cognitiveservices.vision.customvision.training.models import Image, ImageTag, ImageRegion, Project
from msrest.authentication import ApiKeyCredentials
import argparse
import time
import json
import pathlib
import logging
from azure.storage.blob import ContainerClient, BlobClient
import multiprocessing
N_PROCESS = 8
def get_file_name(sub_folder, image_id):
return f'{sub_folder}/images/{image_id}'
def blob_copy(params):
container_client, sub_folder, image = params
blob_client: BlobClient = container_client.get_blob_client(get_file_name(sub_folder, image.id))
blob_client.start_copy_from_url(image.original_image_uri)
return blob_client
def wait_for_completion(blobs, time_out=5):
pendings = blobs
time_break = 0.5
while pendings and time_out > 0:
pendings = [b for b in pendings if b.get_blob_properties().copy.status == 'pending']
if pendings:
logging.info(f'{len(pendings)} pending copies. wait for {time_break} seconds.')
time.sleep(time_break)
time_out -= time_break
def copy_images_with_retry(pool, container_client, sub_folder, images: List, batch_id, n_retries=5):
retry_limit = n_retries
urls = []
while images and n_retries > 0:
params = [(container_client, sub_folder, image) for image in images]
img_and_blobs = zip(images, pool.map(blob_copy, params))
logging.info(f'Batch {batch_id}: Copied {len(images)} images.')
urls = urls or [b.url for _, b in img_and_blobs]
wait_for_completion([b for _, b in img_and_blobs])
images = [image for image, b in img_and_blobs if b.get_blob_properties().copy.status in ['failed', 'aborted']]
n_retries -= 1
if images:
time.sleep(0.5 * (retry_limit - n_retries))
if images:
raise RuntimeError(f'Copy failed for some images in batch {batch_id}')
return urls
class CocoOperator:
def __init__(self):
self._images = []
self._annotations = []
self._categories = []
self._category_name_to_id = {}
@property
def num_imges(self):
return len(self._images)
@property
def num_categories(self):
return len(self._categories)
@property
def num_annotations(self):
return len(self._annotations)
def add_image(self, width, height, coco_url, file_name):
self._images.append(
{
'id': len(self._images) + 1,
'width': width,
'height': height,
'coco_url': coco_url,
'file_name': file_name,
})
def add_annotation(self, image_id, category_id_or_name: Union[int, str], bbox: List[float] = None):
self._annotations.append({
'id': len(self._annotations) + 1,
'image_id': image_id,
'category_id': category_id_or_name if isinstance(category_id_or_name, int) else self._category_name_to_id[category_id_or_name]})
if bbox:
self._annotations[-1]['bbox'] = bbox
def add_category(self, name):
self._categories.append({
'id': len(self._categories) + 1,
'name': name
})
self._category_name_to_id[name] = len(self._categories)
def to_json(self) -> str:
coco_dict = {
'images': self._images,
'categories': self._categories,
'annotations': self._annotations,
}
return json.dumps(coco_dict, ensure_ascii=False, indent=2)
def log_project_info(training_client: CustomVisionTrainingClient, project_id):
project: Project = training_client.get_project(project_id)
proj_settings = project.settings
project.settings = None
logging.info(f'Project info dict: {project.__dict__}')
logging.info(f'Project setting dict: {proj_settings.__dict__}')
logging.info(f'Project info: n tags: {len(training_client.get_tags(project_id))},'
f' n images: {training_client.get_image_count(project_id)} (tagged: {training_client.get_tagged_image_count(project_id)},'
f' untagged: {training_client.get_untagged_image_count(project_id)})')
def export_data(azure_storage_account_name, azure_storage_key, azure_storage_container_name, custom_vision_endpoint, custom_vision_training_key, custom_vision_project_id, n_process):
azure_storage_account_url = f"https://{azure_storage_account_name}.blob.core.windows.net"
container_client = ContainerClient(azure_storage_account_url, azure_storage_container_name, credential=azure_storage_key)
credentials = ApiKeyCredentials(in_headers={"Training-key": custom_vision_training_key})
trainer = CustomVisionTrainingClient(custom_vision_endpoint, credentials)
coco_operator = CocoOperator()
for tag in trainer.get_tags(custom_vision_project_id):
coco_operator.add_category(tag.name)
skip = 0
batch_id = 0
project_name = trainer.get_project(custom_vision_project_id).name
log_project_info(trainer, custom_vision_project_id)
sub_folder = f'{project_name}_{custom_vision_project_id}'
with multiprocessing.Pool(n_process) as pool:
while True:
images: List[Image] = trainer.get_images(project_id=custom_vision_project_id, skip=skip)
if not images:
break
urls = copy_images_with_retry(pool, container_client, sub_folder, images, batch_id)
for i, image in enumerate(images):
coco_operator.add_image(image.width, image.height, urls[i], get_file_name(sub_folder, image.id))
image_tags: List[ImageTag] = image.tags
image_regions: List[ImageRegion] = image.regions
if image_regions:
for img_region in image_regions:
coco_operator.add_annotation(coco_operator.num_imges, img_region.tag_name, [img_region.left, img_region.top, img_region.width, img_region.height])
elif image_tags:
for img_tag in image_tags:
coco_operator.add_annotation(coco_operator.num_imges, img_tag.tag_name)
skip += len(images)
batch_id += 1
coco_json_file_name = 'train.json'
local_json = pathlib.Path(coco_json_file_name)
local_json.write_text(coco_operator.to_json(), encoding='utf-8')
coco_json_blob_client: BlobClient = container_client.get_blob_client(f'{sub_folder}/{coco_json_file_name}')
if coco_json_blob_client.exists():
logging.warning(f'coco json file exists in blob. Skipped uploading. If existing one is outdated, please manually upload your new coco json from ./train.json to {coco_json_blob_client.url}')
else:
coco_json_blob_client.upload_blob(local_json.read_bytes())
logging.info(f'coco file train.json uploaded to {coco_json_blob_client.url}.')
def parse_args():
parser = argparse.ArgumentParser('Export Custom Vision workspace data to blob storage.')
parser.add_argument('--custom_vision_project_id', '-p', type=str, required=True, help='Custom Vision Project Id.')
parser.add_argument('--custom_vision_training_key', '-k', type=str, required=True, help='Custom Vision training key.')
parser.add_argument('--custom_vision_endpoint', '-e', type=str, required=True, help='Custom Vision endpoint.')
parser.add_argument('--azure_storage_account_name', '-a', type=str, required=True, help='Azure storage account name.')
parser.add_argument('--azure_storage_account_key', '-t', type=str, required=True, help='Azure storage account key.')
parser.add_argument('--azure_storage_container_name', '-c', type=str, required=True, help='Azure storage container name.')
parser.add_argument('--n_process', '-n', type=int, required=False, default=8, help='Number of processes used in exporting data.')
return parser.parse_args()
def main():
args = parse_args()
export_data(args.azure_storage_account_name, args.azure_storage_account_key, args.azure_storage_container_name,
args.custom_vision_endpoint, args.custom_vision_training_key, args.custom_vision_project_id, args.n_process)
if __name__ == '__main__':
main()
스크립트 실행
python
명령을 사용하여 스크립트를 실행합니다.
python export-cvs-data-to-coco.py -p <project ID> -k <training key> -e <endpoint url> -a <storage account> -t <storage key> -c <container name>
올바른 매개 변수 값을 입력해야 합니다. 다음 정보가 필요합니다.
- Custom Vision 프로젝트의 프로젝트 ID
- Custom Vision 학습 키
- Custom Vision 엔드포인트 URL
- 새 사용자 지정 모델 프로젝트에 사용할 Azure Storage 계정의 이름
- 해당 스토리지 계정의 키
- 해당 스토리지 계정에서 사용하려는 컨테이너의 이름