Job Handler Port
Overview
The Job Handler Port defines the contract for implementing business logic that processes background jobs. This is the interface that application developers implement to execute specific job types.
Purpose: Enable custom business logic to process background jobs while maintaining separation between job orchestration (WorkerManager) and job execution (handler implementations).
Domain: Background job processing and asynchronous task execution
Key Capabilities:
- Process background jobs with custom business logic
- Handle job failures with custom cleanup/notification logic
- Type-safe job type identification
- Integration with WorkerManager for job orchestration
Port Type: Handler
When to Use:
- When implementing business logic to process specific job types (email sending, report generation, data import, etc.)
- When you need custom failure handling (cleanup, notifications, dead letter processing)
- When building asynchronous task processing systems
- When integrating with job queue systems
Domain Models
The Job Handler Port uses domain models from portico.ports.job:
- Job - Represents a background job with type, payload, status, retry configuration
- JobResult - Result of job execution (success/failure, result data, error information)
- JobStatus - Enumeration of job statuses (PENDING, SCHEDULED, RUNNING, COMPLETED, FAILED, RETRYING, DEAD_LETTER)
These models are shared across the job processing system and are defined in portico.ports.job.
Port Interface
JobHandler
The JobHandler abstract base class defines the contract for job processing business logic.
Location: portico.ports.job_handler.JobHandler
Properties
job_type
Returns the job type this handler processes (e.g., "email.send", "report.generate").
Returns: Job type string identifier
Purpose: Allows the WorkerManager to route jobs to the appropriate handler.
Example:
from portico.ports.job_handler import JobHandler
class EmailJobHandler(JobHandler):
@property
def job_type(self) -> str:
return "email.send"
Methods
handle
Process the job and return result.
Parameters:
job(Job): Job to process with type, payload, and metadata
Returns: JobResult indicating success/failure with optional result data
Raises: Exceptions are caught by WorkerManager and trigger retry logic
Purpose: Contains the core business logic for processing a specific job type.
Example:
async def handle(self, job: Job) -> JobResult:
"""Send email based on job payload."""
try:
recipient = job.payload["recipient"]
subject = job.payload["subject"]
body = job.payload["body"]
await self.email_service.send_email(
to=recipient,
subject=subject,
body=body
)
return JobResult(
success=True,
result_data={"sent_at": datetime.now().isoformat()}
)
except Exception as e:
logger.error("email_send_failed", error=str(e))
return JobResult(
success=False,
error=e,
result_data={}
)
on_failure
Called when job fails (for cleanup, notifications, dead letter processing, etc.).
Parameters:
job(Job): Job that failederror(Exception): Exception that caused the failure
Returns: None
Purpose: Perform cleanup actions, send notifications, or handle dead letter processing when a job fails.
Important Notes:
- Called on every failure, including retries
- Called even if the job will be retried
- Exceptions raised in
on_failureare logged but don't affect job retry logic - Useful for alerting, cleanup, or moving failed jobs to a dead letter queue
Example:
async def on_failure(self, job: Job, error: Exception) -> None:
"""Log failure and send alert if job exhausted retries."""
logger.error(
"job_failed",
job_id=str(job.id),
job_type=job.job_type,
error=str(error),
retry_count=job.retry_count,
max_retries=job.max_retries
)
# Send alert if job exhausted all retries
if job.retry_count >= job.max_retries:
await self.alert_service.send_alert(
level="error",
message=f"Job {job.id} failed after {job.max_retries} retries",
context={
"job_type": job.job_type,
"error": str(error),
"payload": job.payload
}
)
Integration with WorkerManager
The WorkerManager orchestrates job processing by routing jobs to registered handlers:
from portico.kits.job.worker_manager import WorkerManager
# Create handlers
email_handler = EmailJobHandler(email_service)
report_handler = ReportJobHandler(report_service)
# Register handlers with WorkerManager
manager = WorkerManager(
job_queue=job_queue_adapter,
handlers={
"email.send": email_handler,
"report.generate": report_handler,
},
concurrency=10,
queues=["default", "high_priority"]
)
# Start processing jobs
await manager.start()
# ... application runs ...
# Gracefully shutdown workers
await manager.stop()
How it works:
- WorkerManager dequeues jobs from configured queues
- Looks up the handler for the job's
job_type - Calls
handler.handle(job)to process the job - If successful, acknowledges the job
- If failed, calls
handler.on_failure(job, error)and applies retry logic
Common Patterns
1. Email Sending Handler
from typing import Any
from portico.ports.job import Job, JobResult
from portico.ports.job_handler import JobHandler
from portico.utils.logging import get_logger
logger = get_logger(__name__)
class EmailJobHandler(JobHandler):
"""Handler for sending emails asynchronously."""
def __init__(self, email_service: Any):
self.email_service = email_service
@property
def job_type(self) -> str:
return "email.send"
async def handle(self, job: Job) -> JobResult:
"""Send email from job payload."""
try:
recipient = job.payload["recipient"]
subject = job.payload["subject"]
body = job.payload["body"]
template_id = job.payload.get("template_id")
logger.info(
"sending_email",
job_id=str(job.id),
recipient=recipient
)
if template_id:
# Use template
await self.email_service.send_templated_email(
to=recipient,
template_id=template_id,
variables=job.payload.get("variables", {})
)
else:
# Direct email
await self.email_service.send_email(
to=recipient,
subject=subject,
body=body
)
return JobResult(
success=True,
result_data={
"sent_at": datetime.now().isoformat(),
"recipient": recipient
}
)
except Exception as e:
logger.error(
"email_send_failed",
job_id=str(job.id),
error=str(e)
)
return JobResult(
success=False,
error=e,
result_data={}
)
async def on_failure(self, job: Job, error: Exception) -> None:
"""Log email send failures."""
logger.error(
"email_job_failed",
job_id=str(job.id),
recipient=job.payload.get("recipient"),
error=str(error),
retry_count=job.retry_count
)
# Alert if exhausted retries
if job.retry_count >= job.max_retries:
await self.email_service.send_admin_alert(
subject="Email Job Failed",
body=f"Failed to send email after {job.max_retries} retries"
)
2. Data Processing Handler with Validation
from portico.exceptions import ValidationError
from portico.ports.job import Job, JobResult
from portico.ports.job_handler import JobHandler
class DataImportJobHandler(JobHandler):
"""Handler for importing data from external sources."""
def __init__(self, import_service: Any, storage_service: Any):
self.import_service = import_service
self.storage_service = storage_service
@property
def job_type(self) -> str:
return "data.import"
async def handle(self, job: Job) -> JobResult:
"""Import data from source URL."""
try:
# Validate payload
source_url = job.payload.get("source_url")
if not source_url:
raise ValidationError("source_url is required")
import_format = job.payload.get("format", "csv")
user_id = job.payload.get("user_id")
logger.info(
"starting_data_import",
job_id=str(job.id),
source_url=source_url,
format=import_format
)
# Download and parse data
data = await self.import_service.fetch_data(source_url)
parsed = await self.import_service.parse_data(data, import_format)
# Store results
result = await self.storage_service.store_import(
data=parsed,
user_id=user_id,
metadata=job.metadata
)
logger.info(
"data_import_completed",
job_id=str(job.id),
records_imported=len(parsed)
)
return JobResult(
success=True,
result_data={
"records_imported": len(parsed),
"import_id": str(result.id),
"completed_at": datetime.now().isoformat()
}
)
except ValidationError as e:
# Don't retry validation errors
logger.error("validation_error", job_id=str(job.id), error=str(e))
return JobResult(success=False, error=e, result_data={})
except Exception as e:
# Retry other errors
logger.error("import_error", job_id=str(job.id), error=str(e))
return JobResult(success=False, error=e, result_data={})
async def on_failure(self, job: Job, error: Exception) -> None:
"""Clean up and notify on import failure."""
# Clean up partial imports
if "import_id" in job.metadata:
await self.storage_service.cleanup_failed_import(
job.metadata["import_id"]
)
# Notify user if specified
if "user_id" in job.payload:
await self.notify_import_failure(
job.payload["user_id"],
job.payload.get("source_url"),
error
)
3. Report Generation Handler with Progress Tracking
from datetime import datetime
from portico.ports.job import Job, JobResult
from portico.ports.job_handler import JobHandler
class ReportJobHandler(JobHandler):
"""Handler for generating reports."""
def __init__(self, report_service: Any, file_storage: Any):
self.report_service = report_service
self.file_storage = file_storage
@property
def job_type(self) -> str:
return "report.generate"
async def handle(self, job: Job) -> JobResult:
"""Generate report and store file."""
try:
report_type = job.payload["report_type"]
start_date = job.payload["start_date"]
end_date = job.payload["end_date"]
user_id = job.payload["user_id"]
logger.info(
"generating_report",
job_id=str(job.id),
report_type=report_type,
date_range=f"{start_date} to {end_date}"
)
# Generate report (could take several minutes)
report_data = await self.report_service.generate_report(
report_type=report_type,
start_date=start_date,
end_date=end_date,
filters=job.payload.get("filters", {})
)
# Store report file
file_path = await self.file_storage.store_report(
data=report_data,
filename=f"{report_type}_{start_date}_{end_date}.pdf",
user_id=user_id
)
logger.info(
"report_generated",
job_id=str(job.id),
file_path=file_path
)
return JobResult(
success=True,
result_data={
"file_path": file_path,
"report_type": report_type,
"generated_at": datetime.now().isoformat(),
"size_bytes": len(report_data)
}
)
except Exception as e:
logger.error(
"report_generation_failed",
job_id=str(job.id),
error=str(e)
)
return JobResult(success=False, error=e, result_data={})
async def on_failure(self, job: Job, error: Exception) -> None:
"""Notify user of report generation failure."""
user_id = job.payload.get("user_id")
if user_id and job.retry_count >= job.max_retries:
# Send notification about failed report
await self.notification_service.send_notification(
user_id=user_id,
type="report_failed",
message=f"Report generation failed: {error}",
metadata={"job_id": str(job.id)}
)
4. Handler with External Service Retry Logic
import asyncio
from portico.ports.job import Job, JobResult
from portico.ports.job_handler import JobHandler
class WebhookJobHandler(JobHandler):
"""Handler for sending webhooks to external services."""
def __init__(self, http_client: Any):
self.http_client = http_client
@property
def job_type(self) -> str:
return "webhook.send"
async def handle(self, job: Job) -> JobResult:
"""Send webhook with exponential backoff."""
url = job.payload["url"]
payload = job.payload["payload"]
max_attempts = 3
for attempt in range(max_attempts):
try:
logger.info(
"sending_webhook",
job_id=str(job.id),
url=url,
attempt=attempt + 1
)
response = await self.http_client.post(
url,
json=payload,
timeout=30
)
if response.status_code < 500:
# Success or client error (don't retry)
return JobResult(
success=response.status_code < 400,
result_data={
"status_code": response.status_code,
"response": response.text[:1000]
}
)
# Server error - retry with backoff
if attempt < max_attempts - 1:
await asyncio.sleep(2 ** attempt) # 1s, 2s, 4s
except Exception as e:
logger.warning(
"webhook_attempt_failed",
job_id=str(job.id),
attempt=attempt + 1,
error=str(e)
)
if attempt < max_attempts - 1:
await asyncio.sleep(2 ** attempt)
# All attempts failed
return JobResult(
success=False,
error=Exception("Webhook failed after all attempts"),
result_data={"attempts": max_attempts}
)
async def on_failure(self, job: Job, error: Exception) -> None:
"""Log webhook failures for monitoring."""
logger.error(
"webhook_permanently_failed",
job_id=str(job.id),
url=job.payload.get("url"),
retry_count=job.retry_count,
error=str(error)
)
Best Practices
1. Keep Handlers Stateless
Handlers should not maintain state between job executions. All necessary data should come from the job payload.
# ✅ GOOD - Handler is stateless
class EmailHandler(JobHandler):
def __init__(self, email_service: Any):
self.email_service = email_service # Dependency injection
async def handle(self, job: Job) -> JobResult:
# All data from job payload
recipient = job.payload["recipient"]
await self.email_service.send(recipient)
return JobResult(success=True)
# ❌ BAD - Handler maintains state
class EmailHandler(JobHandler):
def __init__(self):
self.sent_count = 0 # ❌ State
async def handle(self, job: Job) -> JobResult:
self.sent_count += 1 # ❌ Won't work with multiple workers
return JobResult(success=True)
2. Validate Payload Early
Validate job payload at the start of handle() to catch errors before doing expensive work.
# ✅ GOOD - Validate immediately
async def handle(self, job: Job) -> JobResult:
# Validate required fields
if not job.payload.get("user_id"):
return JobResult(
success=False,
error=ValidationError("user_id required")
)
if not job.payload.get("amount") or job.payload["amount"] <= 0:
return JobResult(
success=False,
error=ValidationError("amount must be positive")
)
# Now proceed with expensive operations
result = await self.process_payment(job.payload)
return JobResult(success=True, result_data=result)
# ❌ BAD - Validation after expensive work
async def handle(self, job: Job) -> JobResult:
# Do expensive work first
data = await self.fetch_large_dataset()
processed = await self.process_data(data)
# Validate late ❌
if not job.payload.get("user_id"):
raise ValidationError("user_id required")
3. Use Structured Logging
Log key events with structured context for observability.
# ✅ GOOD - Structured logging with context
async def handle(self, job: Job) -> JobResult:
logger.info(
"processing_job",
job_id=str(job.id),
job_type=job.job_type,
retry_count=job.retry_count,
user_id=job.payload.get("user_id")
)
try:
result = await self.process(job)
logger.info(
"job_completed",
job_id=str(job.id),
duration_ms=result.duration
)
return JobResult(success=True, result_data=result)
except Exception as e:
logger.error(
"job_failed",
job_id=str(job.id),
error=str(e),
exc_info=True
)
return JobResult(success=False, error=e)
# ❌ BAD - Unstructured logging
async def handle(self, job: Job) -> JobResult:
print(f"Processing job {job.id}") # ❌ Not structured
result = await self.process(job)
return JobResult(success=True)
4. Return JobResult, Don't Raise Exceptions (Usually)
Return JobResult with success=False for expected failures. Only raise exceptions for unexpected errors.
# ✅ GOOD - Return JobResult for expected failures
async def handle(self, job: Job) -> JobResult:
try:
user = await self.get_user(job.payload["user_id"])
if not user:
# Expected failure - return JobResult
return JobResult(
success=False,
error=ResourceNotFoundError("User not found"),
result_data={}
)
if user.balance < job.payload["amount"]:
# Expected failure - return JobResult
return JobResult(
success=False,
error=ValidationError("Insufficient balance"),
result_data={}
)
# Process payment
result = await self.process_payment(user, job.payload["amount"])
return JobResult(success=True, result_data=result)
except DatabaseError as e:
# Unexpected error - re-raise for retry
raise
# ❌ BAD - Raising for expected conditions
async def handle(self, job: Job) -> JobResult:
user = await self.get_user(job.payload["user_id"])
if not user:
raise ResourceNotFoundError("User not found") # ❌ Will retry unnecessarily
if user.balance < job.payload["amount"]:
raise ValidationError("Insufficient balance") # ❌ Will retry unnecessarily
5. Use on_failure for Cleanup
Use on_failure for cleanup, alerting, and dead letter processing, not for core business logic.
# ✅ GOOD - on_failure for cleanup
async def on_failure(self, job: Job, error: Exception) -> None:
# Clean up resources
if "temp_file" in job.metadata:
await self.cleanup_temp_file(job.metadata["temp_file"])
# Send alerts if exhausted retries
if job.retry_count >= job.max_retries:
await self.alert_service.send_alert(
level="error",
message=f"Job {job.id} failed permanently",
context={"error": str(error)}
)
# Log for monitoring
logger.error(
"job_failed",
job_id=str(job.id),
error=str(error),
will_retry=job.retry_count < job.max_retries
)
# ❌ BAD - on_failure for business logic
async def on_failure(self, job: Job, error: Exception) -> None:
# Don't do business logic in on_failure ❌
await self.refund_payment(job.payload["payment_id"])
# This runs on EVERY failure, including retries!
# Could refund the same payment multiple times
6. Make Handlers Idempotent When Possible
Design handlers to safely retry without side effects.
# ✅ GOOD - Idempotent handler
async def handle(self, job: Job) -> JobResult:
payment_id = job.payload["payment_id"]
# Check if already processed
existing = await self.payment_service.get_payment(payment_id)
if existing and existing.status == "completed":
logger.info("payment_already_processed", payment_id=payment_id)
return JobResult(
success=True,
result_data={"payment_id": payment_id, "already_processed": True}
)
# Process payment with idempotency key
result = await self.payment_service.process_payment(
payment_id=payment_id,
idempotency_key=str(job.id) # Use job ID
)
return JobResult(success=True, result_data=result)
# ❌ BAD - Not idempotent
async def handle(self, job: Job) -> JobResult:
# Always creates new payment, even on retry ❌
payment = await self.payment_service.create_payment(
amount=job.payload["amount"]
)
return JobResult(success=True)
7. Use Dependency Injection
Inject dependencies through the constructor for testability.
# ✅ GOOD - Dependencies injected
class EmailHandler(JobHandler):
def __init__(
self,
email_service: Any,
template_service: Any,
audit_service: Any
):
self.email_service = email_service
self.template_service = template_service
self.audit_service = audit_service
async def handle(self, job: Job) -> JobResult:
# Use injected services
template = await self.template_service.get(job.payload["template_id"])
await self.email_service.send(template)
await self.audit_service.log("email_sent")
return JobResult(success=True)
# ❌ BAD - Creating dependencies inside handler
class EmailHandler(JobHandler):
async def handle(self, job: Job) -> JobResult:
# Creating service instances ❌
email_service = EmailService()
template_service = TemplateService()
# Hard to test, tight coupling
template = await template_service.get(job.payload["template_id"])
await email_service.send(template)
FAQs
When should I raise an exception vs return JobResult with success=False?
Return JobResult(success=False) for:
- Expected failures (validation errors, business rule violations)
- Failures that shouldn't be retried
- Client errors (400-level equivalents)
Raise exceptions for: - Unexpected errors (database connection failures, external service timeouts) - Errors that should trigger retries - Infrastructure/transient errors (500-level equivalents)
async def handle(self, job: Job) -> JobResult:
# Expected failure - return JobResult
if job.payload["amount"] < 0:
return JobResult(
success=False,
error=ValidationError("Amount must be positive")
)
try:
# Process payment
result = await self.payment_api.charge(job.payload["amount"])
return JobResult(success=True, result_data=result)
except ConnectionError:
# Transient error - raise for retry
raise
except PaymentDeclined as e:
# Expected failure - return JobResult
return JobResult(success=False, error=e)
How do I pass context to handlers (database sessions, user context, etc.)?
Use dependency injection in the handler constructor. Pass services/adapters that provide the context you need.
# Handler with database session factory
class DataHandler(JobHandler):
def __init__(self, session_factory: Any):
self.session_factory = session_factory
async def handle(self, job: Job) -> JobResult:
async with self.session_factory() as session:
# Use session for database operations
result = await session.execute(...)
await session.commit()
return JobResult(success=True)
# Handler with service that provides user context
class UserActionHandler(JobHandler):
def __init__(self, user_service: Any, action_service: Any):
self.user_service = user_service
self.action_service = action_service
async def handle(self, job: Job) -> JobResult:
user = await self.user_service.get_user(job.payload["user_id"])
result = await self.action_service.perform_action(user, job.payload["action"])
return JobResult(success=True, result_data=result)
What happens if on_failure raises an exception?
The WorkerManager logs the exception but continues with job retry logic. The on_failure exception doesn't affect whether the job is retried or moved to dead letter.
async def on_failure(self, job: Job, error: Exception) -> None:
try:
# Attempt cleanup
await self.cleanup_service.cleanup(job.id)
except Exception as e:
# Exception is logged but doesn't affect retry logic
logger.error("cleanup_failed", error=str(e))
# Job will still be retried or moved to dead letter based on retry count
How do I test handlers in isolation?
Mock the dependencies injected into the handler constructor. The handler's business logic can be tested independently of the job queue.
import pytest
from unittest.mock import AsyncMock, Mock
from portico.ports.job import Job, JobResult
@pytest.mark.asyncio
async def test_email_handler_success():
# Mock dependencies
email_service = AsyncMock()
email_service.send_email.return_value = {"sent": True}
# Create handler with mocks
handler = EmailJobHandler(email_service)
# Create test job
job = Job(
id=uuid4(),
queue_name="default",
job_type="email.send",
payload={
"recipient": "test@example.com",
"subject": "Test",
"body": "Test email"
}
)
# Test handle method
result = await handler.handle(job)
# Verify
assert result.success
assert "sent_at" in result.result_data
email_service.send_email.assert_called_once()
@pytest.mark.asyncio
async def test_email_handler_failure():
# Mock service to raise exception
email_service = AsyncMock()
email_service.send_email.side_effect = Exception("SMTP error")
handler = EmailJobHandler(email_service)
job = Job(
id=uuid4(),
queue_name="default",
job_type="email.send",
payload={"recipient": "test@example.com"}
)
# Test failure handling
result = await handler.handle(job)
assert not result.success
assert result.error is not None
How do I implement different handlers for the same job type?
You typically don't. Each job type should have one handler. If you need different processing logic, use different job types or use the job payload to control behavior.
# ✅ GOOD - Different job types
class EmailJobHandler(JobHandler):
@property
def job_type(self) -> str:
return "email.send"
class SmsJobHandler(JobHandler):
@property
def job_type(self) -> str:
return "sms.send"
# Register both
manager = WorkerManager(
job_queue=queue,
handlers={
"email.send": EmailJobHandler(email_service),
"sms.send": SmsJobHandler(sms_service),
}
)
# ✅ GOOD - Use payload to control behavior
class NotificationHandler(JobHandler):
@property
def job_type(self) -> str:
return "notification.send"
async def handle(self, job: Job) -> JobResult:
notification_type = job.payload["type"]
if notification_type == "email":
return await self.send_email(job.payload)
elif notification_type == "sms":
return await self.send_sms(job.payload)
elif notification_type == "push":
return await self.send_push(job.payload)
How do I handle long-running jobs that might exceed worker timeouts?
Break long jobs into smaller chunks, use progress tracking, or implement checkpoint/resume logic.
# ✅ GOOD - Chunked processing
class DataProcessingHandler(JobHandler):
@property
def job_type(self) -> str:
return "data.process_batch"
async def handle(self, job: Job) -> JobResult:
batch_id = job.payload["batch_id"]
offset = job.payload.get("offset", 0)
chunk_size = 1000
# Process one chunk
records = await self.fetch_records(batch_id, offset, chunk_size)
processed = await self.process_records(records)
# If more records exist, create next job
if len(records) == chunk_size:
await self.job_creator.create_job(
job_type="data.process_batch",
payload={
"batch_id": batch_id,
"offset": offset + chunk_size
}
)
return JobResult(
success=True,
result_data={
"processed": len(processed),
"offset": offset
}
)
How do I implement priority-based job processing?
Use different queues for different priorities and configure WorkerManager with multiple queues.
# Create jobs with different queues
await job_service.create_job(
job_type="email.send",
payload={"recipient": "user@example.com"},
queue_name="high_priority", # High priority queue
priority=10
)
await job_service.create_job(
job_type="report.generate",
payload={"report_id": "123"},
queue_name="low_priority", # Low priority queue
priority=1
)
# Configure WorkerManager to process queues in order
manager = WorkerManager(
job_queue=queue,
handlers=handlers,
queues=["high_priority", "default", "low_priority"], # Order matters
concurrency=10
)
# Workers will check high_priority first, then default, then low_priority
How do I implement a dead letter queue handler?
Monitor dead letter jobs and create a handler to reprocess or analyze them.
class DeadLetterHandler(JobHandler):
"""Handler for processing dead letter jobs."""
@property
def job_type(self) -> str:
return "system.deadletter.process"
async def handle(self, job: Job) -> JobResult:
"""Analyze and potentially requeue dead letter jobs."""
dead_job_id = job.payload["dead_job_id"]
# Get the dead letter job
dead_job = await self.job_queue.get_job(dead_job_id)
if not dead_job:
return JobResult(success=True) # Already processed
# Analyze why it failed
error_type = self._analyze_error(dead_job.error_message)
# Log to monitoring system
await self.monitoring_service.log_dead_letter(
job_type=dead_job.job_type,
error_type=error_type,
payload=dead_job.payload
)
# Optionally requeue if error was transient
if error_type == "transient":
await self.job_queue.enqueue(dead_job)
return JobResult(
success=True,
result_data={"analyzed": True, "requeued": error_type == "transient"}
)
async def on_failure(self, job: Job, error: Exception) -> None:
"""Alert if dead letter processing fails."""
await self.alert_service.send_critical_alert(
"Dead letter handler failed",
context={"job_id": str(job.id), "error": str(error)}
)
Related Ports
- Job Creator Port (
portico.ports.job_creator) - Interface for creating jobs (implemented by JobService) - Job Queue Port (
portico.ports.job_queue) - Interface for queue adapters (memory, database, Redis) - Audit Port - Audit logging for job creation and execution
Related Kits
- JobService (
portico.kits.job) - Implements JobCreator, manages job lifecycle - WorkerManager (
portico.kits.job.worker_manager) - Orchestrates workers and routes jobs to handlers
Architecture Notes
The Job Handler Port follows hexagonal architecture principles:
- Handlers are adapters: They adapt business logic to the job processing system
- Dependency inversion: WorkerManager depends on the JobHandler interface, not concrete implementations
- Separation of concerns: Job orchestration (WorkerManager) is separated from job execution (handlers)
- Testability: Handlers can be tested in isolation with mock dependencies
This pattern enables: - Multiple handler implementations for different job types - Easy testing of business logic without running workers - Flexible job routing and processing strategies - Clean separation between infrastructure and domain logic