Publicación de canalizaciones de aprendizaje automático
SE APLICA A: Azure ML del SDK de Python v1
En este artículo se muestra cómo compartir una canalización de aprendizaje automático con sus compañeros o clientes.
Las canalizaciones de aprendizaje automático son flujos de trabajo reutilizables para las tareas de aprendizaje automático. Una ventaja de las canalizaciones es aumentar la colaboración. También puede usar canalizaciones de versión, lo que permite a los clientes utilizar el modelo actual mientras trabaja en una nueva versión.
Requisitos previos
Cree un área de trabajo de Azure Machine Learning que contenga todos los recursos de canalización.
Configure su entorno de desarrollo para instalar el SDK de Azure Machine Learning o use una instancia de proceso de Azure Machine Learning con el SDK ya instalado.
Cree y ejecute una canalización de aprendizaje automático, como en el siguiente Tutorial: Compilación de una canalización de Azure Machine Learning para la puntuación por lotes. Para otras opciones, consulte Creación y ejecución de canalizaciones de Machine Learning con el SDK de Azure Machine Learning.
Publicar una canalización
Una vez que tenga una canalización en funcionamiento, puede publicarla para que se ejecute con diferentes entradas. Para que el punto de conexión REST de una canalización ya publicada acepte parámetros, tiene que configurar la canalización para que use objetos PipelineParameter
para los argumentos que varían.
Para crear un parámetro de canalización, use un objeto PipelineParameter con un valor predeterminado.
from azureml.pipeline.core.graph import PipelineParameter pipeline_param = PipelineParameter( name="pipeline_arg", default_value=10)
Agregue este objeto
PipelineParameter
como parámetro a cualquiera de los pasos de la canalización tal como se muestra a continuación:compareStep = PythonScriptStep( script_name="compare.py", arguments=["--comp_data1", comp_data1, "--comp_data2", comp_data2, "--output_data", out_data3, "--param1", pipeline_param], inputs=[ comp_data1, comp_data2], outputs=[out_data3], compute_target=compute_target, source_directory=project_folder)
Publique esta canalización para que acepte un parámetro cuando se invoque.
published_pipeline1 = pipeline_run1.publish_pipeline( name="My_Published_Pipeline", description="My Published Pipeline Description", version="1.0")
Después de publicar la canalización, puede comprobarla en la interfaz de usuario. El identificador de canalización es el único identificado de la canalización publicada.
Ejecutar una canalización publicada
Todas las canalizaciones publicadas tienen un punto de conexión REST. Con el punto de conexión de canalización, puede desencadenar una ejecución de la canalización desde cualquier sistema externo, incluidos los clientes que no sean de Python. Este punto de conexión habilita la "repetibilidad administrada" en los escenarios de puntuación y nuevo entrenamiento.
Importante
Si utiliza el control basado en roles de Azure (Azure RBAC) para administrar el acceso a la canalización, establezca los permisos del escenario de canalización (entrenamiento o puntuación).
Para invocar la ejecución de la canalización anterior, necesita un token de encabezado de autenticación de Microsoft Entra. La obtención de este tipo de token se describe en la referencia de clase AzureCliAuthentication y en el cuaderno Autenticación en Azure Machine Learning.
from azureml.pipeline.core import PublishedPipeline
import requests
response = requests.post(published_pipeline1.endpoint,
headers=aad_token,
json={"ExperimentName": "My_Pipeline",
"ParameterAssignments": {"pipeline_arg": 20}})
El argumento json
de la solicitud POST debe contener, para la clave ParameterAssignments
, un diccionario que contenga los parámetros de canalización y sus valores. Además, el argumento json
puede contener las claves siguientes:
Clave | Descripción |
---|---|
ExperimentName |
El nombre del experimento asociado a este punto de conexión |
Description |
Texto de forma libre que describe el punto de conexión |
Tags |
Pares clave-valor de forma libre que se pueden usar para etiquetar y anotar solicitudes |
DataSetDefinitionValueAssignments |
Diccionario usado para cambiar conjuntos de datos sin volver a entrenar (vea la siguiente explicación) |
DataPathAssignments |
Diccionario usado para cambiar rutas de acceso de datos sin volver a entrenar (vea la siguiente explicación) |
Ejecución de una canalización publicada con C#
En el código siguiente, se muestra cómo se llama de forma asincrónica a una canalización desde C#. Este fragmento de código parcial no forma parte de ningún ejemplo de Microsoft y solo ilustra la estructura de la llamada. No se muestra el proceso completo de control de errores o clases.
[DataContract]
public class SubmitPipelineRunRequest
{
[DataMember]
public string ExperimentName { get; set; }
[DataMember]
public string Description { get; set; }
[DataMember(IsRequired = false)]
public IDictionary<string, string> ParameterAssignments { get; set; }
}
// ... in its own class and method ...
const string RestEndpoint = "your-pipeline-endpoint";
using (HttpClient client = new HttpClient())
{
var submitPipelineRunRequest = new SubmitPipelineRunRequest()
{
ExperimentName = "YourExperimentName",
Description = "Asynchronous C# REST api call",
ParameterAssignments = new Dictionary<string, string>
{
{
// Replace with your pipeline parameter keys and values
"your-pipeline-parameter", "default-value"
}
}
};
string auth_key = "your-auth-key";
client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", auth_key);
// submit the job
var requestPayload = JsonConvert.SerializeObject(submitPipelineRunRequest);
var httpContent = new StringContent(requestPayload, Encoding.UTF8, "application/json");
var submitResponse = await client.PostAsync(RestEndpoint, httpContent).ConfigureAwait(false);
if (!submitResponse.IsSuccessStatusCode)
{
await WriteFailedResponse(submitResponse); // ... method not shown ...
return;
}
var result = await submitResponse.Content.ReadAsStringAsync().ConfigureAwait(false);
var obj = JObject.Parse(result);
// ... use `obj` dictionary to access results
}
Ejecución de una canalización publicada con Java
En el código siguiente, se muestra una llamada a una canalización que requiere autenticación (consulte Configuración de la autenticación para recursos y flujos de trabajo de Azure Machine Learning). Si la canalización está implementada públicamente, no se necesitan las llamadas que generan authKey
. En este fragmento de código parcial, no se muestra la clase de Java ni el código reutilizable de control de excepciones. El código utiliza Optional.flatMap
para encadenar funciones que pueden devolver un objeto Optional
vacío. Al utilizar flatMap
, el código es más corto y más claro, pero tenga en cuenta que getRequestBody()
pasa por alto las excepciones.
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.Optional;
// JSON library
import com.google.gson.Gson;
String scoringUri = "scoring-endpoint";
String tenantId = "your-tenant-id";
String clientId = "your-client-id";
String clientSecret = "your-client-secret";
String resourceManagerUrl = "https://management.azure.com";
String dataToBeScored = "{ \"ExperimentName\" : \"My_Pipeline\", \"ParameterAssignments\" : { \"pipeline_arg\" : \"20\" }}";
HttpClient client = HttpClient.newBuilder().build();
Gson gson = new Gson();
HttpRequest tokenAuthenticationRequest = tokenAuthenticationRequest(tenantId, clientId, clientSecret, resourceManagerUrl);
Optional<String> authBody = getRequestBody(client, tokenAuthenticationRequest);
Optional<String> authKey = authBody.flatMap(body -> Optional.of(gson.fromJson(body, AuthenticationBody.class).access_token);;
Optional<HttpRequest> scoringRequest = authKey.flatMap(key -> Optional.of(scoringRequest(key, scoringUri, dataToBeScored)));
Optional<String> scoringResult = scoringRequest.flatMap(req -> getRequestBody(client, req));
// ... etc (`scoringResult.orElse()`) ...
static HttpRequest tokenAuthenticationRequest(String tenantId, String clientId, String clientSecret, String resourceManagerUrl)
{
String authUrl = String.format("https://login.microsoftonline.com/%s/oauth2/token", tenantId);
String clientIdParam = String.format("client_id=%s", clientId);
String resourceParam = String.format("resource=%s", resourceManagerUrl);
String clientSecretParam = String.format("client_secret=%s", clientSecret);
String bodyString = String.format("grant_type=client_credentials&%s&%s&%s", clientIdParam, resourceParam, clientSecretParam);
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(authUrl))
.POST(HttpRequest.BodyPublishers.ofString(bodyString))
.build();
return request;
}
static HttpRequest scoringRequest(String authKey, String scoringUri, String dataToBeScored)
{
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(scoringUri))
.header("Authorization", String.format("Token %s", authKey))
.POST(HttpRequest.BodyPublishers.ofString(dataToBeScored))
.build();
return request;
}
static Optional<String> getRequestBody(HttpClient client, HttpRequest request) {
try {
HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString());
if (response.statusCode() != 200) {
System.out.println(String.format("Unexpected server response %d", response.statusCode()));
return Optional.empty();
}
return Optional.of(response.body());
}catch(Exception x)
{
System.out.println(x.toString());
return Optional.empty();
}
}
class AuthenticationBody {
String access_token;
String token_type;
int expires_in;
String scope;
String refresh_token;
String id_token;
AuthenticationBody() {}
}
Cambio de conjuntos de datos y rutas de acceso de datos sin volver a entrenar
Tal vez quiera entrenar e inferir en diferentes conjuntos de datos y rutas de acceso de datos. Por ejemplo, puede que desee entrenar un conjunto de datos menor, pero sacar inferencias del conjunto de datos completo. Puede cambiar los conjuntos de datos con la clave DataSetDefinitionValueAssignments
en el argumento json
de la solicitud. Puede cambiar las rutas de acceso de datos con DataPathAssignments
. La técnica para ambos es similar:
En el script de definición de la canalización, cree un parámetro
PipelineParameter
para el conjunto de datos. Cree unDatasetConsumptionConfig
oDataPath
a partir dePipelineParameter
:tabular_dataset = Dataset.Tabular.from_delimited_files('https://dprepdata.blob.core.windows.net/demo/Titanic.csv') tabular_pipeline_param = PipelineParameter(name="tabular_ds_param", default_value=tabular_dataset) tabular_ds_consumption = DatasetConsumptionConfig("tabular_dataset", tabular_pipeline_param)
En el script de ML, acceda al conjunto de datos especificado de forma dinámica mediante
Run.get_context().input_datasets
:from azureml.core import Run input_tabular_ds = Run.get_context().input_datasets['tabular_dataset'] dataframe = input_tabular_ds.to_pandas_dataframe() # ... etc ...
Observe que el script de ML tiene acceso al valor especificado para
DatasetConsumptionConfig
(tabular_dataset
) y no al valor dePipelineParameter
(tabular_ds_param
).En el script de definición de la canalización, establezca
DatasetConsumptionConfig
como un parámetro enPipelineScriptStep
:train_step = PythonScriptStep( name="train_step", script_name="train_with_dataset.py", arguments=["--param1", tabular_ds_consumption], inputs=[tabular_ds_consumption], compute_target=compute_target, source_directory=source_directory) pipeline = Pipeline(workspace=ws, steps=[train_step])
Para cambiar los conjuntos de datos de forma dinámica en la llamada de REST de inferencia, use
DataSetDefinitionValueAssignments
:tabular_ds1 = Dataset.Tabular.from_delimited_files('path_to_training_dataset') tabular_ds2 = Dataset.Tabular.from_delimited_files('path_to_inference_dataset') ds1_id = tabular_ds1.id d22_id = tabular_ds2.id response = requests.post(rest_endpoint, headers=aad_token, json={ "ExperimentName": "MyRestPipeline", "DataSetDefinitionValueAssignments": { "tabular_ds_param": { "SavedDataSetReference": {"Id": ds1_id #or ds2_id }}}})
Los cuadernos Presentación del conjunto de datos y PipelineParameter y Presentación de la ruta de acceso de datos y PipelineParameter contienen ejemplos completos de esta técnica.
Creación de un punto de conexión de canalización con versiones
Puede crear un punto de conexión de canalización con varias canalizaciones publicadas detrás. Esta técnica le proporciona un punto de conexión de REST fijo a medida que itera por las canalizaciones de ML y las actualiza.
from azureml.pipeline.core import PipelineEndpoint
published_pipeline = PublishedPipeline.get(workspace=ws, id="My_Published_Pipeline_id")
pipeline_endpoint = PipelineEndpoint.publish(workspace=ws, name="PipelineEndpointTest",
pipeline=published_pipeline, description="Test description Notebook")
Envío de un trabajo a un punto de conexión de canalización
Puede enviar un trabajo a la versión predeterminada de un punto de conexión de canalización:
pipeline_endpoint_by_name = PipelineEndpoint.get(workspace=ws, name="PipelineEndpointTest")
run_id = pipeline_endpoint_by_name.submit("PipelineEndpointExperiment")
print(run_id)
También puede enviar un trabajo a una versión específica:
run_id = pipeline_endpoint_by_name.submit("PipelineEndpointExperiment", pipeline_version="0")
print(run_id)
Lo mismo puede hacerse mediante la API REST:
rest_endpoint = pipeline_endpoint_by_name.endpoint
response = requests.post(rest_endpoint,
headers=aad_token,
json={"ExperimentName": "PipelineEndpointExperiment",
"RunSource": "API",
"ParameterAssignments": {"1": "united", "2":"city"}})
Uso de canalizaciones publicadas en Studio
También puede ejecutar una canalización publicada desde Studio:
Inicie sesión en Azure Machine Learning Studio.
En el panel izquierdo, seleccione Puntos de conexión.
En la parte superior, seleccione Pipeline endpoints (Puntos de conexión de la canalización).
Seleccione una canalización específica para ejecutar, utilizar o revisar los resultados de las ejecuciones anteriores del punto de conexión de la canalización.
Deshabilitar una canalización publicada
Para ocultar una canalización de la lista de canalizaciones publicadas hay que deshabilitarla, ya sea en Studio o en el SDK:
# Get the pipeline by using its ID from Azure Machine Learning studio
p = PublishedPipeline.get(ws, id="068f4885-7088-424b-8ce2-eeb9ba5381a6")
p.disable()
Puede habilitarla de nuevo con p.enable()
. Para más información, consulte la referencia de la clase PublishedPipeline.
Pasos siguientes
- Use estos cuadernos de Jupyter en GitHub para explorar aún más canalizaciones de Machine Learning.
- Consulte la ayuda de referencia del SDK para el paquete azureml-pipelines-core y el paquete azureml-pipelines-steps.
- Consulte los procedimientos para obtener sugerencias sobre la depuración y la solución de problemas de canalizaciones.