Build Prod-Ready Huey Task Queue with SQLite
Step-by-step code to create a self-contained background task system using Huey + SQLite: handle retries, priorities, pipelines, locking, scheduling, and monitoring—all runnable in a Colab notebook without Redis.
Configure Lightweight SQLite Huey for Production Tasks
Huey provides a Celery-like task queue but lighter, using SQLite as a file-based broker for zero-dependency setups. Start by installing huey and initializing SqliteHuey:
!pip -q install -U huey
import os
from huey import SqliteHuey
DB_PATH = "/content/huey_demo.db"
if os.path.exists(DB_PATH): os.remove(DB_PATH)
huey = SqliteHuey(
name="colab-huey",
filename=DB_PATH,
results=True, # Store task results
store_none=False,
utc=True,
)
This creates a persistent queue in huey_demo.db. Key principle: SQLite handles scheduling, results, and locking atomically, making it suitable for single-node production without Redis. Trade-off: Not distributed; scale via multiple consumers on shared DB (with WAL mode for concurrency). Assumes basic Python; fits early in async workflows before heavy infra.
Enable observability early with a global signal handler logging task events:
EVENT_LOG = []
@huey.signal()
def _log_all_signals(signal, task, exc=None):
EVENT_LOG.append({
"ts": datetime.utcnow().isoformat() + "Z",
"signal": str(signal),
"task": getattr(task, "name", None),
"id": getattr(task, "id", None),
# ... args, kwargs, exc
})
def print_latest_events(n=10):
# Print formatted log
Signals fire on execution phases (e.g., task_executed, task_error). This captures IDs, args, exceptions for debugging—critical for production where logs reveal retry loops or deadlocks.
Design Tasks with Retries, Priorities, and Context Awareness
Tasks are decorated with @huey.task() and configured for real workloads. Priorities (0-100, higher first) ensure urgent jobs like error alerts run before batch processing. Retries handle flakiness:
@huey.task(priority=50)
def quick_add(a, b): return a + b
@huey.task(priority=10)
def slow_io(seconds=1.0): time.sleep(seconds); return f"slept={seconds}"
@huey.task(retries=3, retry_delay=1, priority=100)
def flaky_network_call(p_fail=0.6):
if random.random() < p_fail:
raise RuntimeError("Transient failure")
return "OK"
@huey.task(context=True, priority=60)
def cpu_pi_estimate(samples=200_000, task=None):
# Monte Carlo pi approx
inside = sum(1 for _ in range(samples) if random()**2 + random()**2 <= 1)
est = 4.0 * inside / samples
return {"task_id": task.id if task else None, "pi_estimate": est}
Principles: Assign high priority + retries to unreliable external calls (APIs, DB writes). Use context=True to inject task object for metadata like ID—avoids re-fetching from storage. Common mistake: Forgetting utc=True leads to timezone bugs in scheduling. Test with task(blocking=True, timeout=5) to simulate sync calls.
Before: Naive functions crash on failure. After: Retries succeed 40% of flaky calls; priorities order mixed queues correctly.
Prevent Races with Locks and Orchestrate Pipelines
Locks serialize critical sections, e.g., daily syncs:
@huey.lock_task("demo:daily-sync")
@huey.task()
def locked_sync_job(tag="sync"):
time.sleep(1.0)
return f"locked-job-done:{tag}:{datetime.utcnow().isoformat()}Z"
Key: Lock key ("demo:daily-sync") is global; concurrent enqueues wait. Expires implicitly on success/fail.
Pipelines chain tasks dependently:
fetch = huey.task()(lambda seed: random.randint(1,100))
transform = huey.task()(lambda x, scale: x * scale)
store = huey.task()(lambda x: {"stored": x})
pipeline = (fetch.s(7).then(transform.s(3)).then(store.s()))
huey.enqueue(pipeline)
.s() creates signatures; .then() wires output-to-input. Principle: Use for ETL (extract-transform-load); fails fast if upstream errors. Mistake: Mutable shared state breaks isolation—pass data explicitly. Quality check: Pipeline result holds final output; intermediates queryable via ID.
Schedule One-Offs, Periodic Jobs, and Heartbeats
Delay execution: task.schedule(delay=3) or eta=datetime. Revoke with .revoke() before run.
Periodic via crontab:
@huey.periodic_task(crontab(minute="*"))
def heartbeat_minutely(): print("Minute tick")
Sub-minute simulation with timer (not native Huey):
TICK = {"count": 0}
@huey.task()
def heartbeat(): TICK["count"] += 1; print(f"tick={TICK['count']}")
def start_seconds_heartbeat(interval=15):
def _tick():
if running: huey.enqueue(heartbeat.s())
threading.Timer(interval, _tick).start()
_tick()
Principle: Crontab for cron-like reliability; timers for demos. Consumer must have periodic=True. Trade-off: SQLite polls efficiently but locks on high-frequency schedules.
Run Multi-Worker Consumer and Validate Full System
Launch threaded consumer (Colab-friendly):
consumer = huey.create_consumer(
workers=4,
worker_type=WORKER_THREAD,
periodic=True,
initial_delay=0.1,
backoff=1.15, max_delay=2.0,
scheduler_interval=1,
check_worker_health=True,
health_check_interval=10,
)
consumer_thread = threading.Thread(target=consumer.run, daemon=True)
consumer_thread.start()
Demos enqueue mixed tasks, block for results, test retries (flaky succeeds after 3 tries), locks (3 jobs serialize), pipelines (7 -> 21 -> stored), schedules (delay+revoke). Print events: Reveals task_enqueued, task_executed, retrying.
Shutdown: consumer.stop(graceful=True) drains queue. Mistake: Abrupt kill loses in-flight tasks—graceful waits for completion.
"We start a threaded consumer inside the notebook to process tasks asynchronously. We enqueue tasks, test retries, demonstrate scheduling and revocation, execute pipelines, and observe logged signals."
Quality: Events log confirms ordering, retries; results match expectations (pi ~3.14, locked tags sequential).
Scale to Production: From Notebook to Deployment
Notebook proves concepts self-contained. Production: Run consumer as service (Docker, systemd), shared SQLite (enable WAL: PRAGMA journal_mode=WAL;), monitor DB size/growth. Extend: Multiple DBs per app, migrate to PostgresHuey for sharding. Fits indie/SaaS backends needing async email, reports without Redis ops overhead.
"Through this approach, we gained a clear understanding of how to use Huey to manage background workloads efficiently and extend this architecture to real-world production deployments."
Prerequisites: Python threading knowledge; post-DB basics. Practice: Copy notebook, add your tasks, scale workers=8, measure throughput.
Key Takeaways
- Initialize
SqliteHueywithresults=True, utc=Truefor persistent, timezone-safe queues—no Redis needed. - Always attach
@huey.signal()handlers for full lifecycle logging; queryEVENT_LOGto debug races/retries. - Set
priority,retries=3,retry_delay=1on flaky tasks; higher priority pulls them forward in queues. - Use
@huey.lock_task(unique_key)for mutexes; pipelines with.s().then()for dependent workflows. - Schedule via
delay/crontab; revoke pending tasks; runworkers=4, periodic=Trueconsumer threaded. - Gracefully stop consumers; test blocking calls with timeouts to validate end-to-end.
- Common pitfall: Shared mutable state—pass args explicitly; monitor DB locks under load.
- Production tip: WAL mode for SQLite concurrency; start with notebook, deploy via supervisor.
"By doing this, we establish a lightweight yet production-style task queue setup without external dependencies."
"We track execution details, including task IDs, arguments, and exceptions, to improve observability."
Full notebook: https://github.com/Marktechpost/AI-Agents-Projects-Tutorials/blob/main/Distributed%20Systems/huey_async_tasks_Marktechpost.ipynb