Como é realizada a correspondência entre tarefas e trabalhos
Este documento descreve o registo dos trabalhadores, a apresentação de trabalhos e a forma como são combinados entre si.
Registo de Trabalhadores
Antes que um trabalhador possa receber ofertas para prestar serviços a um emprego, ele deve ser registrado primeiro, definindo availableForOffers
como true. Em seguida, precisamos especificar em quais filas o trabalhador escuta e quais canais ele pode lidar. Uma vez registrado, você recebe um evento RouterWorkerRegistered da Grade de Eventos e o status do trabalhador é alterado para active
.
No exemplo a seguir, registramos um trabalhador para:
- Ouça e
queue-1
queue-2
- Ser capaz de lidar com os canais de voz e chat. Neste caso, o trabalhador pode aceitar um único
voice
emprego ao mesmo tempo ou doischat
empregos ao mesmo tempo. Essa configuração é definida especificando a capacidade total do trabalhador e atribuindo um custo por trabalho para cada canal. - Tenha um conjunto de rótulos que descrevam coisas sobre o trabalhador que possam ajudar a determinar se é uma correspondência para um trabalho específico.
var worker = await client.CreateWorkerAsync(new CreateWorkerOptions(workerId: "worker-1", capacity: 2)
{
AvailableForOffers = true,
Queues = { "queue1", "queue2" },
Channels =
{
new RouterChannel(channelId: "voice", capacityCostPerJob: 2),
new RouterChannel(channelId: "chat", capacityCostPerJob: 1)
},
Labels =
{
["Skill"] = new RouterValue(11),
["English"] = new RouterValue(true),
["French"] = new RouterValue(false),
["Vendor"] = new RouterValue("Acme")
}
});
let worker = await client.path("/routing/workers/{workerId}", "worker-1").patch({
body: {
availableForOffers: true,
capacity: 2,
queues: ["queue1", "queue2"],
channels: [
{ channelId: "voice", capacityCostPerJob: 2 },
{ channelId: "chat", capacityCostPerJob: 1 }
],
labels: {
Skill: 11,
English: true,
French: false,
Vendor: "Acme"
}
},
contentType: "application/merge-patch+json"
});
worker = client.upsert_worker(
worker_id = "worker-1",
available_for_offers = True,
capacity = 2,
queues = ["queue1", "queue2"],
channels = [
RouterChannel(channel_id = "voice", capacity_cost_per_job = 2),
RouterChannel(channel_id = "chat", capacity_cost_per_job = 1)
],
labels = {
"Skill": 11,
"English": True,
"French": False,
"Vendor": "Acme"
}
)
RouterWorker worker = client.createWorker(new CreateWorkerOptions("worker-1", 2)
.setAvailableForOffers(true)
.setQueues(List.of("queue1", "queue2"))
.setChannels(List.of(
new RouterChannel("voice", 2),
new RouterChannel("chat", 1)))
.setLabels(Map.of(
"Skill", new RouterValue(11),
"English", new RouterValue(true),
"French", new RouterValue(false),
"Vendor", new RouterValue("Acme"))));
Submissão de Trabalhos
No exemplo a seguir, enviamos um trabalho que
- Vai diretamente para
queue1
. - Para o
chat
canal. - Com um seletor de trabalhador que especifica que qualquer trabalhador que preste serviços a este trabalho deve ter um rótulo definido
English
comotrue
. - Com um seletor de trabalhador que especifica que qualquer trabalhador que preste serviço a esse trabalho deve ter um rótulo maior
Skill
que10
e essa condição expirará após um minuto. - Com um rótulo de
name
definido comoJohn
.
await client.CreateJobAsync(new CreateJobOptions("job1", "chat", "queue1")
{
RequestedWorkerSelectors =
{
new RouterWorkerSelector(key: "English", labelOperator: LabelOperator.Equal, value: new RouterValue(true)),
new RouterWorkerSelector(key: "Skill", labelOperator: LabelOperator.GreaterThan, value: new RouterValue(10))
{ ExpiresAfter = TimeSpan.FromMinutes(5) }
},
Labels = { ["name"] = new RouterValue("John") }
});
await client.path("/routing/jobs/{jobId}", "job1").patch({
body: {
channelId: "chat",
queueId: "queue1",
requestedWorkerSelectors: [
{ key: "English", labelOperator: "equal", value: true },
{ key: "Skill", labelOperator: "greaterThan", value: 10, expiresAfterSeconds: 300 },
],
labels: { name: "John" }
},
contentType: "application/merge-patch+json"
})
client.upsert_job(
job_id = "job1",
channel_id = "chat",
queue_id = "queue1",
requested_worker_selectors = [
RouterWorkerSelector(
key = "English",
label_operator = LabelOperator.EQUAL,
value = True
),
RouterWorkerSelector(
key = "Skill",
label_operator = LabelOperator.GREATER_THAN,
value = True,
expires_after_seconds = 300
)
],
labels = { "name": "John" }
)
client.createJob(new CreateJobOptions("job1", "chat", "queue1")
.setRequestedWorkerSelectors(List.of(
new RouterWorkerSelector("English", LabelOperator.EQUAL, new RouterValue(true)),
new RouterWorkerSelector("Skill", LabelOperator.GREATER_THAN, new RouterValue(10))
.setExpiresAfter(Duration.ofMinutes(5))))
.setLabels(Map.of("name", new RouterValue("John"))));
O Job Router tenta fazer corresponder este trabalho a um trabalhador disponível a ouvir queue1
o canal, com English
definido como e Skill
superior a true
10
.chat
Assim que uma correspondência é feita, uma oferta é criada. A política de distribuição anexada à fila controla quantas ofertas ativas podem existir para um trabalho e por quanto tempo cada oferta é válida. Você recebe um Evento OfferIssued com esta aparência:
{
"workerId": "worker-1",
"jobId": "7f1df17b-570b-4ae5-9cf5-fe6ff64cc712",
"channelId": "chat",
"queueId": "queue1",
"offerId": "525fec06-ab81-4e60-b780-f364ed96ade1",
"offerTimeUtc": "2021-06-23T02:43:30.3847144Z",
"expiryTimeUtc": "2021-06-23T02:44:30.3847674Z",
"jobPriority": 1,
"jobLabels": {
"name": "John"
}
}
O Evento OfferIssued inclui detalhes sobre o emprego, o trabalhador, por quanto tempo a oferta é válida e o que você precisa para aceitar ou recusar o offerId
trabalho.
Nota
A vida útil máxima de um trabalho é de 90 dias, após os quais expirará automaticamente.
Cancelamento do Registo de Trabalhadores
Se um trabalhador quiser parar de receber ofertas, ele poderá ser cancelado definindo AvailableForOffers
como false
ao atualizar o trabalhador e você receberá um evento RouterWorkerDeregistered da Grade de Eventos. Todas as ofertas existentes para o trabalhador são revogadas e você recebe um evento RouterWorkerOfferRevoked para cada oferta.
worker.AvailableForOffers = false;
worker = await client.UpdateWorkerAsync(worker);
worker = await client.path("/routing/workers/{workerId}", worker.body.id).patch({
body: { availableForOffers: false },
contentType: "application/merge-patch+json"
});
worker = client.upsert_worker(worker_id = worker.id, available_for_offers = False)
client.updateWorker(worker.getId(), BinaryData.fromObject(worker.setAvailableForOffers(false)), null);
Nota
Se um trabalhador estiver registrado e parado por mais de 7 dias, ele será automaticamente cancelado. Uma vez cancelado, o status do trabalhador é draining
se um ou mais empregos ainda forem atribuídos, ou inactive
se nenhum trabalho for atribuído.