Worker Task Assignment (Review Session)
Date: 2026-03-25 Category: Data Processing Parts Completed: 4/4 (review) Language: Python
Problem Summary
Task scheduling system with load balancing, specialty matching, account affinity, and worker availability (offline events).
Key Bugs Found on Review
1. Scanning all offline events every iteration — O(t × e)
# Bug: loops ALL events for EVERY task
for event in offlineEvents:
if task_index == afterTaskIndex:
workers.remove(worker)
# Fix: pre-process into dict for O(1) lookup
offline_at = defaultdict(list)
for event in offlineEvents:
offline_at[event["afterTaskIndex"]].append(event["worker"])
# In loop:
if task_index in offline_at:
for worker in offline_at[task_index]:
available.discard(worker)2. list.remove() is O(n) — use set
# Bug: O(n) per removal
workers.remove(worker)
# Fix: O(1) with set
available = set(worker["name"] for worker in self.workers)
available.discard(worker)3. Implicit tiebreaker
# Bug: relies on min() silently picking first item — fragile
min_worker = min(eligible, key=lambda x: self.worker_to_workload[x])
# Fix: explicit secondary sort key
worker_index = {w["name"]: i for i, w in enumerate(self.workers)}
min_worker = min(eligible, key=lambda x: (self.worker_to_workload[x], worker_index[x]))Final Clean Code
from collections import defaultdict
class TaskScheduler:
def __init__(self, workers, tasks, offlineEvents):
self.workers = workers
self.tasks = tasks
self.worker_to_workload = {worker["name"]: 0 for worker in self.workers}
self.worker_to_specialities = {worker["name"]: worker["specialty"] for worker in self.workers}
self.worker_to_account_history = defaultdict(set)
self.worker_index = {w["name"]: i for i, w in enumerate(self.workers)}
self.available = set(worker["name"] for worker in self.workers)
self.offline_at = defaultdict(list)
for event in offlineEvents:
self.offline_at[event["afterTaskIndex"]].append(event["worker"])
def assign_tasks(self):
result = []
for task_index, task in enumerate(self.tasks):
task_id = task["id"]
duration = task["duration"]
required_specialties = task["requiredSpecialties"]
accountId = task["accountId"]
eligible_workers = [
w for w in self.available
if self.worker_to_specialities[w] in required_specialties
]
if not eligible_workers:
continue
with_affinity = [w for w in eligible_workers if accountId in self.worker_to_account_history[w]]
candidates = with_affinity if with_affinity else eligible_workers
min_worker = min(candidates, key=lambda x: (self.worker_to_workload[x], self.worker_index[x]))
self.worker_to_workload[min_worker] += duration
self.worker_to_account_history[min_worker].add(accountId)
result.append({"taskId": task_id, "worker": min_worker})
if task_index in self.offline_at:
for worker in self.offline_at[task_index]:
self.available.discard(worker)
return resultKey Learnings
- Pre-process event lookups into dicts — avoid O(n) scans inside loops
- Use sets for membership checks and removal — O(1) vs O(n) for lists
- Make tiebreakers explicit in the sort key — don’t rely on implementation details of min()
- Separate static lookups (specialty map) from dynamic state (workload, availability)