An Azure machine learning service for building and deploying models.
Hi CG-8750,
Only use a valid datastore URI inside the pipeline code while submitting the pipeline job. Inside the individual component, it is best to use Blob for file access inside the pipeline but only use a valid datastore URI while submitting the job.
The pipeline job only accepts a datastore URI, not a Blob URI, as shown below:
azureml://subscriptions<subscription-id>/resourcegroups/<resource-group-nane>/workspaces/<workspace-name>/datastores/workspaceblobstore/paths/LocalUpload/<folder>/<dataset-file>
An example pipeline structure is shown below:
│ environment.yml
│ pipeline.yml
│ run_pipeline.py
│
└───components
├───data_reader
│ component_spec.yaml
│ data_reader.py
│
├───data_writer
│ component_spec.yaml
│ data_writer.py
│
└───data_processor
component_spec.yaml
data_processor.py
example of run_pipeline.py is shown below:
from azure.ai.ml import MLClient, Input, Output, command, dsl
from azure.ai.ml.entities import Environment, AmlCompute
from azure.identity import DefaultAzureCredential
# Initialize ML Client
ml_client = MLClient(
DefaultAzureCredential(),
subscription_id="<subscription_id>",
resource_group_name="<resource_group_name>",
workspace_name="<workspace_name>"
)
env = Environment(
name="genai-test-env",
description="Environment for GenAI processing",
image="mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu22.04",
conda_file="environment.yml"
)
ml_client.environments.create_or_update(env)
# Ensure compute cluster exists
compute_name = "GenAI-pipeline-compute"
try:
compute = ml_client.compute.get(compute_name)
print(f"Using existing compute: {compute_name}")
except Exception:
print(f"Creating new compute: {compute_name}")
compute_config = AmlCompute(
name=compute_name,
size="Standard_DS3_v2",
min_instances=0,
max_instances=4,
)
ml_client.compute.begin_create_or_update(compute_config).result()
# Define components
data_reader_component = command(
name="data_reader",
display_name="Read Excel Data",
description="Reads prompts from Excel file in Blob Storage",
inputs={"input_path": Input(type="uri_file")},
outputs={"output_data": Output(type="uri_file")},
code="./components/data_reader",
command="python data_reader.py --input_path ${{inputs.input_path}} --output_path ${{outputs.output_data}}",
environment="genai-test-env@latest"
)
openai_processor_component = command(
name="openai_processor",
display_name="Process with OpenAI",
description="Generates responses using Azure OpenAI",
inputs={"input_data": Input(type="uri_file")},
outputs={"output_data": Output(type="uri_file")},
code="./components/openai_processor",
command="python openai_processor.py --input_data ${{inputs.input_data}} --output_data ${{outputs.output_data}}",
environment="genai-test-env@latest"
)
data_writer_component = command(
name="data_writer",
display_name="Write Excel Output",
description="Writes responses to Excel file",
inputs={"input_data": Input(type="uri_file")},
outputs={"output_path": Output(type="uri_file")},
code="./components/data_writer",
command="python data_writer.py --input_data ${{inputs.input_data}} --output_path ${{outputs.output_path}}",
environment="genai-test-env@latest"
)
# Build pipeline
@dsl.pipeline(
name="GenAI-Prompt-Pipeline",
description="End-to-end prompt processing pipeline",
default_compute_target=compute_name
)
def genai_pipeline():
reader = data_reader_component(
input_path=Input(
type="uri_file",
path="azureml://subscriptions<subscription-id>/resourcegroups/<resource-group-nane>/workspaces/<workspace-name>/datastores/workspaceblobstore/paths/LocalUpload/<folder>/<dataset-file>"
)
)
processor = openai_processor_component(input_data=reader.outputs.output_data)
writer = data_writer_component(input_data=processor.outputs.output_data)
return writer.outputs
# Submit pipeline
pipeline_job = ml_client.jobs.create_or_update(
genai_pipeline(),
experiment_name="genai-prompt-processing"
)
print(f"Pipeline submitted successfully! Job name: {pipeline_job.name}")
print(f"Monitor progress at: https://ml.azure.com/jobs/{pipeline_job.name}?wsid=/subscriptions/{ml_client.subscription_id}/resourcegroups/{ml_client.resource_group_name}/workspaces/{ml_client.workspace_name}")
As shown above, only use a valid datastore URI while submitting a pipeline job.
Below is an example of pipeline.yaml:
$schema: https://azuremlschemas.azureedge.net/latest/pipelineJob.schema.json
type: pipeline
display_name: GenAI-Prompt-Processing
description: Pipeline for processing prompts with Azure OpenAI using blob storage
inputs:
input_file:
type: uri_file
path: <datastore uri or any this doesn't do anything>
settings:
default_compute: GenAI-pipeline-compute
default_datastore: workspaceblobstore
jobs:
read_data:
component: azureml:data_reader_component@latest
inputs:
input_path: ${{parent.inputs.input_file}}
compute: GenAI-pipeline-compute
process_prompts:
component: azureml:openai_processor_component@latest
inputs:
input_data: ${{parent.jobs.read_data.outputs.output_data}}
compute: GenAI-pipeline-compute
write_results:
component: azureml:data_writer_component@latest
inputs:
input_data: ${{parent.jobs.process_prompts.outputs.output_data}}
compute: GenAI-pipeline-compute
Below is an example of component_spec.yaml:
$schema: https://azuremlschemas.azureedge.net/latest/commandComponent.schema.json
name: data_reader_component
display_name: Data Reader
version: 1.0.0
type: command
inputs:
input_path:
type: uri_file
description: Path to input Excel file
outputs:
output_data:
type: uri_file
description: Processed data in JSON format
code: ./components/data_reader
environment: azureml:genai-test-env@latest
command: >-
python data_reader.py
--input_path ${{inputs.input_path}}
--output_path ${{outputs.output_data}}
For .yaml schemas, please visit the below link: https://azuremlschemas.azureedge.net/latest/pipelineJob.schema.json
https://azuremlschemas.azureedge.net/latest/commandComponent.schema.json
To trigger the pipeline after successful creation, refer to the supported documentation:
https://learn.microsoft.com/en-us/azure/machine-learning/how-to-trigger-published-pipeline?view=azureml-api-1
Feel free to accept this as an answer.
Thank you for reaching out to the Microsoft QNA Portal.