How jobs are matched to workers
This document describes the registration of workers, the submission of jobs and how they're matched to each other.
Worker Registration
Before a worker can receive offers to service a job, it must be registered first by setting availableForOffers
to true. Next, we need to specify which queues the worker listens on and which channels it can handle. Once registered, you receive a RouterWorkerRegistered event from Event Grid and the worker's status is changed to active
.
In the following example, we register a worker to:
- Listen on
queue-1
andqueue-2
- Be able to handle both the voice and chat channels. In this case, the worker could either take a single
voice
job at one time or twochat
jobs at the same time. This setting is configured by specifying the total capacity of the worker and assigning a cost per job for each channel. - Have a set of labels that describe things about the worker that could help determine if it's a match for a particular job.
var worker = await client.CreateWorkerAsync(new CreateWorkerOptions(workerId: "worker-1", capacity: 2)
{
AvailableForOffers = true,
Queues = { "queue1", "queue2" },
Channels =
{
new RouterChannel(channelId: "voice", capacityCostPerJob: 2),
new RouterChannel(channelId: "chat", capacityCostPerJob: 1)
},
Labels =
{
["Skill"] = new RouterValue(11),
["English"] = new RouterValue(true),
["French"] = new RouterValue(false),
["Vendor"] = new RouterValue("Acme")
}
});
let worker = await client.path("/routing/workers/{workerId}", "worker-1").patch({
body: {
availableForOffers: true,
capacity: 2,
queues: ["queue1", "queue2"],
channels: [
{ channelId: "voice", capacityCostPerJob: 2 },
{ channelId: "chat", capacityCostPerJob: 1 }
],
labels: {
Skill: 11,
English: true,
French: false,
Vendor: "Acme"
}
},
contentType: "application/merge-patch+json"
});
worker = client.upsert_worker(
worker_id = "worker-1",
available_for_offers = True,
capacity = 2,
queues = ["queue1", "queue2"],
channels = [
RouterChannel(channel_id = "voice", capacity_cost_per_job = 2),
RouterChannel(channel_id = "chat", capacity_cost_per_job = 1)
],
labels = {
"Skill": 11,
"English": True,
"French": False,
"Vendor": "Acme"
}
)
RouterWorker worker = client.createWorker(new CreateWorkerOptions("worker-1", 2)
.setAvailableForOffers(true)
.setQueues(List.of("queue1", "queue2"))
.setChannels(List.of(
new RouterChannel("voice", 2),
new RouterChannel("chat", 1)))
.setLabels(Map.of(
"Skill", new RouterValue(11),
"English", new RouterValue(true),
"French", new RouterValue(false),
"Vendor", new RouterValue("Acme"))));
Job Submission
In the following example, we submit a job that
- Goes directly to
queue1
. - For the
chat
channel. - With a worker selector that specifies that any worker servicing this job must have a label of
English
set totrue
. - With a worker selector that specifies that any worker servicing this job must have a label of
Skill
greater than10
and this condition will expire after one minute. - With a label of
name
set toJohn
.
await client.CreateJobAsync(new CreateJobOptions("job1", "chat", "queue1")
{
RequestedWorkerSelectors =
{
new RouterWorkerSelector(key: "English", labelOperator: LabelOperator.Equal, value: new RouterValue(true)),
new RouterWorkerSelector(key: "Skill", labelOperator: LabelOperator.GreaterThan, value: new RouterValue(10))
{ ExpiresAfter = TimeSpan.FromMinutes(5) }
},
Labels = { ["name"] = new RouterValue("John") }
});
await client.path("/routing/jobs/{jobId}", "job1").patch({
body: {
channelId: "chat",
queueId: "queue1",
requestedWorkerSelectors: [
{ key: "English", labelOperator: "equal", value: true },
{ key: "Skill", labelOperator: "greaterThan", value: 10, expiresAfterSeconds: 300 },
],
labels: { name: "John" }
},
contentType: "application/merge-patch+json"
})
client.upsert_job(
job_id = "job1",
channel_id = "chat",
queue_id = "queue1",
requested_worker_selectors = [
RouterWorkerSelector(
key = "English",
label_operator = LabelOperator.EQUAL,
value = True
),
RouterWorkerSelector(
key = "Skill",
label_operator = LabelOperator.GREATER_THAN,
value = True,
expires_after_seconds = 300
)
],
labels = { "name": "John" }
)
client.createJob(new CreateJobOptions("job1", "chat", "queue1")
.setRequestedWorkerSelectors(List.of(
new RouterWorkerSelector("English", LabelOperator.EQUAL, new RouterValue(true)),
new RouterWorkerSelector("Skill", LabelOperator.GREATER_THAN, new RouterValue(10))
.setExpiresAfter(Duration.ofMinutes(5))))
.setLabels(Map.of("name", new RouterValue("John"))));
Job Router tries to match this job to an available worker listening on queue1
for the chat
channel, with English
set to true
and Skill
greater than 10
.
Once a match is made, an offer is created. The distribution policy that is attached to the queue controls how many active offers there can be for a job and how long each offer is valid. You receive an OfferIssued Event that would look like this:
{
"workerId": "worker-1",
"jobId": "7f1df17b-570b-4ae5-9cf5-fe6ff64cc712",
"channelId": "chat",
"queueId": "queue1",
"offerId": "525fec06-ab81-4e60-b780-f364ed96ade1",
"offerTimeUtc": "2021-06-23T02:43:30.3847144Z",
"expiryTimeUtc": "2021-06-23T02:44:30.3847674Z",
"jobPriority": 1,
"jobLabels": {
"name": "John"
}
}
The OfferIssued Event includes details about the job, worker, how long the offer is valid and the offerId
that you need to accept or decline the job.
Note
The maximum lifetime of a job is 90 days, after which it'll automatically expire.
Worker Deregistration
If a worker would like to stop receiving offers, it can be deregistered by setting AvailableForOffers
to false
when updating the worker and you receive a RouterWorkerDeregistered event from Event Grid. Any existing offers for the worker are revoked and you receive a RouterWorkerOfferRevoked event for each offer.
worker.AvailableForOffers = false;
worker = await client.UpdateWorkerAsync(worker);
worker = await client.path("/routing/workers/{workerId}", worker.body.id).patch({
body: { availableForOffers: false },
contentType: "application/merge-patch+json"
});
worker = client.upsert_worker(worker_id = worker.id, available_for_offers = False)
client.updateWorker(worker.getId(), BinaryData.fromObject(worker.setAvailableForOffers(false)), null);
Note
If a worker is registered and idle for more than 7 days, it'll be automatically deregistered. Once deregistered, the worker's status is draining
if one or more jobs are still assigned, or inactive
if no jobs are assigned.