Job Creator Port
Overview
The Job Creator Port defines the contract for creating and enqueueing background jobs. It serves as a dependency inversion interface allowing job triggers (adapters) to create jobs without depending on the concrete JobService implementation.
Purpose: Enable job creation while maintaining hexagonal architecture - adapters depend on this port, not on the service implementation.
Domain: Background job processing, task queuing, asynchronous operations
Key Capabilities:
- Job Creation: Create jobs with type, payload, and priority
- Queue Selection: Route jobs to specific named queues
- Priority Management: Set job priority for execution ordering
- Scheduled Jobs: Create jobs scheduled for future execution
- Extensible: Accepts implementation-specific kwargs (max_retries, created_by, metadata)
- Hexagonal Architecture: Adapters depend on ports, not kits/services
Port Type: Provider (Factory pattern)
When to Use:
- Job triggers (adapters) that create background jobs
- External systems scheduling tasks (webhooks, cron, API endpoints)
- Event handlers that need to enqueue work
- Systems requiring dependency inversion between adapters and services
- Any code that creates jobs but shouldn't depend on JobService directly
When NOT to Use:
- Direct job execution (use job handlers/workers)
- Job status queries (use JobService or job repository)
- Job cancellation or management operations
- If you're implementing the job service itself (implement the interface instead)
Domain Models
Job
Domain model representing a background job (defined in portico.ports.job).
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
id |
UUID |
Yes | - | Unique job identifier |
queue_name |
str |
Yes | - | Queue name for job routing |
job_type |
str |
Yes | - | Job type identifier (e.g., "email.send", "document.process") |
payload |
dict[str, Any] |
Yes | - | Job payload data (must be JSON-serializable) |
priority |
int |
No | 0 |
Job priority (higher = more urgent) |
max_retries |
int |
No | 3 |
Maximum retry attempts on failure |
retry_count |
int |
No | 0 |
Current retry attempt count |
scheduled_at |
Optional[datetime] |
No | None |
Scheduled execution time (None = immediate) |
started_at |
Optional[datetime] |
No | None |
Job start timestamp |
completed_at |
Optional[datetime] |
No | None |
Job completion timestamp |
failed_at |
Optional[datetime] |
No | None |
Job failure timestamp |
status |
JobStatus |
No | PENDING |
Current job status |
error_message |
Optional[str] |
No | None |
Error message if failed |
created_by |
Optional[UUID] |
No | None |
User ID that created the job |
created_at |
Optional[datetime] |
No | None |
Job creation timestamp |
metadata |
dict[str, Any] |
No | {} |
Additional job metadata |
Example:
from portico.ports.job import Job, JobStatus
from datetime import datetime
from uuid import uuid4
job = Job(
id=uuid4(),
queue_name="emails",
job_type="email.send",
payload={
"to": "user@example.com",
"subject": "Welcome!",
"template": "welcome_email",
},
priority=5,
max_retries=3,
status=JobStatus.PENDING,
created_by=uuid4(),
created_at=datetime.now(),
metadata={"campaign": "onboarding"},
)
Enumerations
JobStatus
Job execution status (defined in portico.ports.job).
| Value | Description |
|---|---|
PENDING |
Job created and waiting to be processed |
SCHEDULED |
Job scheduled for future execution |
RUNNING |
Job currently executing |
COMPLETED |
Job completed successfully |
FAILED |
Job failed (may retry) |
RETRYING |
Job failed and is being retried |
DEAD_LETTER |
Job failed after all retries exhausted |
Example:
from portico.ports.job import JobStatus
# Check job status
if job.status == JobStatus.PENDING:
print("Job waiting to execute")
elif job.status == JobStatus.COMPLETED:
print("Job finished successfully")
elif job.status == JobStatus.FAILED:
print(f"Job failed: {job.error_message}")
Port Interface
JobCreator
The JobCreator abstract base class defines the contract for creating jobs. Job triggers (adapters) use this interface to create jobs without depending on the concrete JobService implementation, maintaining hexagonal architecture.
Location: portico.ports.job_creator.JobCreator
Architectural Pattern:
┌─────────────────────────────────┐
│ Job Triggers (Adapters) │
│ - ScheduleTrigger │
│ - WebhookTrigger │
│ - EventTrigger │
└────────────┬────────────────────┘
│ depends on
↓
┌─────────────────────────────────┐
│ JobCreator (Port/Interface) │ ← Dependency inversion
│ - create_job() │
└────────────┬────────────────────┘
↑ implements
│
┌─────────────────────────────────┐
│ JobService (Kit/Implementation) │
│ - create_job() │
│ - get_job() │
│ - process_jobs() │
└─────────────────────────────────┘
Key Method
create_job
async def create_job(
job_type: str,
payload: Dict[str, Any],
queue_name: str = "default",
priority: int = 0,
**kwargs: Any,
) -> Job
Create a job and enqueue it for processing.
Parameters:
job_type: Job type identifier (must have registered handler, e.g., "email.send", "document.process")payload: Job payload data (must be JSON-serializable dict)queue_name: Queue name to submit job to (default: "default")priority: Job priority for execution ordering (higher = more urgent, default: 0)**kwargs: Implementation-specific options:scheduled_at(datetime): Schedule job for future executionmax_retries(int): Override default retry limitcreated_by(UUID): User ID that created the jobmetadata(dict): Additional job metadata
Returns: Created Job object with generated ID and PENDING/SCHEDULED status
Raises:
- ValueError: If job_type is not registered
- ValidationError: If payload is not JSON-serializable
Example:
from portico.ports.job_creator import JobCreator
from datetime import datetime, timedelta
from uuid import uuid4
# Basic job creation
job = await job_creator.create_job(
job_type="email.send",
payload={
"to": "user@example.com",
"subject": "Order Confirmation",
"template": "order_confirmation",
"order_id": "12345",
},
)
# High-priority job with custom queue
job = await job_creator.create_job(
job_type="payment.process",
payload={"transaction_id": "txn_789", "amount": 99.99},
queue_name="payments",
priority=10,
)
# Scheduled job with metadata
job = await job_creator.create_job(
job_type="report.generate",
payload={"report_type": "monthly", "month": "2025-03"},
scheduled_at=datetime.now() + timedelta(days=1),
created_by=uuid4(),
metadata={"department": "finance"},
)
# Job with custom retry limit
job = await job_creator.create_job(
job_type="api.webhook",
payload={"url": "https://api.example.com/webhook", "event": "user.created"},
max_retries=5,
)
Common Patterns
Job Trigger Using JobCreator
from portico.ports.job_creator import JobCreator
from portico.utils.logging import get_logger
logger = get_logger(__name__)
class ScheduleTrigger:
"""Job trigger that creates scheduled jobs.
This adapter depends on JobCreator port, not JobService directly.
This maintains hexagonal architecture.
"""
def __init__(self, job_creator: JobCreator, schedules: list):
self.job_creator = job_creator
self.schedules = schedules
async def trigger_daily_report(self):
"""Trigger daily report generation job."""
job = await self.job_creator.create_job(
job_type="report.daily",
payload={
"date": datetime.now().isoformat(),
"report_type": "sales",
},
queue_name="reports",
priority=5,
)
logger.info(
"daily_report_scheduled",
job_id=str(job.id),
scheduled_for=job.scheduled_at,
)
return job
async def trigger_user_cleanup(self):
"""Trigger user cleanup job."""
job = await self.job_creator.create_job(
job_type="user.cleanup",
payload={"inactive_days": 90},
queue_name="maintenance",
metadata={"trigger": "schedule", "frequency": "weekly"},
)
logger.info("cleanup_job_created", job_id=str(job.id))
return job
Webhook Handler Creating Jobs
from fastapi import APIRouter, HTTPException
from portico.ports.job_creator import JobCreator
from pydantic import BaseModel
class WebhookPayload(BaseModel):
event: str
data: dict
def create_webhook_router(job_creator: JobCreator) -> APIRouter:
"""Create webhook router that enqueues jobs."""
router = APIRouter()
@router.post("/webhooks/external")
async def handle_external_webhook(payload: WebhookPayload):
"""Handle incoming webhook by creating a job."""
try:
# Create job to process webhook asynchronously
job = await job_creator.create_job(
job_type=f"webhook.{payload.event}",
payload=payload.data,
queue_name="webhooks",
priority=7,
metadata={
"source": "external_api",
"event": payload.event,
},
)
return {
"status": "accepted",
"job_id": str(job.id),
"message": "Webhook queued for processing",
}
except ValueError as e:
# Job type not registered
raise HTTPException(status_code=400, detail=str(e))
return router
Event Handler Creating Background Jobs
from portico.ports.job_creator import JobCreator
from portico.events import EventBus, Event
class UserEventHandler:
"""Event handler that creates background jobs for user events."""
def __init__(self, job_creator: JobCreator, event_bus: EventBus):
self.job_creator = job_creator
self.event_bus = event_bus
# Subscribe to events
event_bus.subscribe("user.created", self.on_user_created)
event_bus.subscribe("user.deleted", self.on_user_deleted)
async def on_user_created(self, event: Event):
"""Handle user creation by enqueuing welcome email."""
user_id = event.data["user_id"]
user_email = event.data["email"]
# Create welcome email job
await self.job_creator.create_job(
job_type="email.send",
payload={
"to": user_email,
"template": "welcome_email",
"user_id": str(user_id),
},
queue_name="emails",
priority=5,
created_by=user_id,
)
# Create onboarding tasks job
await self.job_creator.create_job(
job_type="onboarding.create_tasks",
payload={"user_id": str(user_id)},
queue_name="default",
metadata={"trigger": "user_created"},
)
async def on_user_deleted(self, event: Event):
"""Handle user deletion by enqueuing cleanup."""
user_id = event.data["user_id"]
await self.job_creator.create_job(
job_type="user.cleanup_data",
payload={"user_id": str(user_id)},
queue_name="maintenance",
priority=3,
metadata={"trigger": "user_deleted"},
)
Mock JobCreator for Testing
from portico.ports.job_creator import JobCreator
from portico.ports.job import Job, JobStatus
from uuid import uuid4
class MockJobCreator(JobCreator):
"""Mock JobCreator for testing adapters without real job service."""
def __init__(self):
self.created_jobs = []
async def create_job(
self,
job_type: str,
payload: dict,
queue_name: str = "default",
priority: int = 0,
**kwargs,
) -> Job:
"""Mock job creation that tracks all created jobs."""
job_id = uuid4()
job = Job(
id=job_id,
queue_name=queue_name,
job_type=job_type,
payload=payload,
priority=priority,
status=JobStatus.PENDING,
**kwargs,
)
self.created_jobs.append(job)
return job
def get_created_jobs(self, job_type: str = None) -> list[Job]:
"""Get all created jobs, optionally filtered by type."""
if job_type:
return [j for j in self.created_jobs if j.job_type == job_type]
return self.created_jobs
def reset(self):
"""Reset tracked jobs."""
self.created_jobs.clear()
# Usage in tests
@pytest.mark.asyncio
async def test_webhook_handler_creates_job():
"""Test webhook handler creates correct job."""
mock_creator = MockJobCreator()
handler = WebhookHandler(job_creator=mock_creator)
await handler.process_webhook({"event": "user.created", "data": {...}})
# Verify job was created
assert len(mock_creator.created_jobs) == 1
job = mock_creator.created_jobs[0]
assert job.job_type == "webhook.user.created"
assert job.queue_name == "webhooks"
Integration with Kits
The JobCreator Port is implemented by the Job Service (JobService kit).
from portico.kits.job import JobService
from portico.adapters.job_queue import MemoryJobQueueAdapter
# JobService implements JobCreator interface
job_queue = MemoryJobQueueAdapter()
job_service = JobService(job_queue=job_queue)
# JobService is a JobCreator
assert isinstance(job_service, JobCreator)
# Use as JobCreator
job = await job_service.create_job(
job_type="email.send",
payload={"to": "user@example.com"},
)
# Job triggers receive JobCreator, not JobService
# This maintains dependency inversion
trigger = ScheduleTrigger(job_creator=job_service, schedules=[...])
Hexagonal Architecture Benefits:
# ✅ GOOD - Adapter depends on port
class MyTrigger:
def __init__(self, job_creator: JobCreator): # Port interface
self.job_creator = job_creator
async def trigger(self):
await self.job_creator.create_job(...)
# ❌ BAD - Adapter depends on concrete implementation
class MyTrigger:
def __init__(self, job_service: JobService): # Concrete kit
self.job_service = job_service
async def trigger(self):
await self.job_service.create_job(...)
Best Practices
- Use JobCreator Interface in Adapters: Depend on the port, not the service implementation
# ✅ GOOD - Depends on port interface
class WebhookAdapter:
def __init__(self, job_creator: JobCreator):
self.job_creator = job_creator
async def handle_webhook(self, payload):
await self.job_creator.create_job("webhook.process", payload)
# ❌ BAD - Depends on concrete service
from portico.kits.job import JobService
class WebhookAdapter:
def __init__(self, job_service: JobService):
self.job_service = job_service
- Use Descriptive Job Types: Use namespaced job type identifiers
# ✅ GOOD - Clear, namespaced job types
await job_creator.create_job(
job_type="email.send",
payload={...},
)
await job_creator.create_job(
job_type="document.process",
payload={...},
)
# ❌ BAD - Generic, unclear job types
await job_creator.create_job(
job_type="send",
payload={...},
)
- Ensure Payload is JSON-Serializable: Only include serializable data
# ✅ GOOD - Serializable payload
await job_creator.create_job(
job_type="user.notify",
payload={
"user_id": str(user.id), # Convert UUID to string
"timestamp": datetime.now().isoformat(), # ISO format
"metadata": {"key": "value"},
},
)
# ❌ BAD - Non-serializable payload
await job_creator.create_job(
job_type="user.notify",
payload={
"user_id": user.id, # UUID object
"timestamp": datetime.now(), # datetime object
"user": user, # Complex object
},
)
- Use Appropriate Priorities: Reserve high priorities for critical jobs
# ✅ GOOD - Priorities match importance
# Critical payment processing
await job_creator.create_job(
job_type="payment.process",
payload={...},
priority=10,
)
# Normal notification
await job_creator.create_job(
job_type="email.send",
payload={...},
priority=5,
)
# Low priority cleanup
await job_creator.create_job(
job_type="data.cleanup",
payload={...},
priority=1,
)
# ❌ BAD - Everything high priority
await job_creator.create_job(
job_type="email.send",
payload={...},
priority=10, # Not actually critical
)
- Route Jobs to Appropriate Queues: Use dedicated queues for different job types
# ✅ GOOD - Jobs routed to appropriate queues
await job_creator.create_job(
job_type="email.send",
payload={...},
queue_name="emails",
)
await job_creator.create_job(
job_type="report.generate",
payload={...},
queue_name="reports",
)
# ❌ BAD - Everything in default queue
await job_creator.create_job(
job_type="email.send",
payload={...},
# Uses default queue - may block other jobs
)
- Add Metadata for Tracking: Include metadata for debugging and analytics
# ✅ GOOD - Rich metadata
await job_creator.create_job(
job_type="webhook.process",
payload={...},
created_by=current_user.id,
metadata={
"source": "external_api",
"webhook_id": "wh_123",
"retry_strategy": "exponential",
},
)
# ❌ BAD - No context
await job_creator.create_job(
job_type="webhook.process",
payload={...},
)
- Use MockJobCreator for Testing: Test adapters without real job infrastructure
# ✅ GOOD - Mock for testing
@pytest.mark.asyncio
async def test_trigger_creates_job():
mock_creator = MockJobCreator()
trigger = MyTrigger(job_creator=mock_creator)
await trigger.execute()
# Verify job creation
assert len(mock_creator.created_jobs) == 1
assert mock_creator.created_jobs[0].job_type == "expected.type"
# ❌ BAD - Requires real job service
async def test_trigger_creates_job():
job_service = JobService(...) # Complex setup
trigger = MyTrigger(job_creator=job_service)
FAQs
What's the difference between JobCreator and JobService?
JobCreator is a port (interface) with a single method create_job(), while JobService is the full service implementation (kit) that includes job creation, querying, processing, and management.
- JobCreator: Port interface for creating jobs (used by adapters)
- JobService: Complete job service implementation (implements JobCreator + more)
Adapters should depend on JobCreator to maintain hexagonal architecture - they don't need the full service functionality.
Why use JobCreator instead of calling JobService directly?
This maintains hexagonal architecture and dependency inversion:
- Adapters (outer layer) depend on Ports (interfaces), not Kits (services)
- Makes adapters testable with mock implementations
- Reduces coupling between layers
- Follows SOLID principles (Dependency Inversion Principle)
# ✅ Hexagonal Architecture
class MyAdapter:
def __init__(self, job_creator: JobCreator): # Depends on port
pass
# ❌ Violates Hexagonal Architecture
class MyAdapter:
def __init__(self, job_service: JobService): # Depends on kit
pass
How do I schedule a job for future execution?
Use the scheduled_at kwarg with a future datetime:
from datetime import datetime, timedelta
# Schedule for 1 hour from now
job = await job_creator.create_job(
job_type="reminder.send",
payload={"user_id": "123", "message": "Don't forget!"},
scheduled_at=datetime.now() + timedelta(hours=1),
)
# Job status will be SCHEDULED (not PENDING)
assert job.status == JobStatus.SCHEDULED
What kwargs does create_job() accept?
The base interface requires job_type, payload, queue_name, and priority. Implementations may accept additional kwargs:
scheduled_at(datetime): Schedule for future executionmax_retries(int): Override retry limitcreated_by(UUID): User who created the jobmetadata(dict): Additional tracking information
Check your JobService implementation documentation for supported kwargs.
How do I ensure my payload is JSON-serializable?
Convert all non-serializable types before creating the job:
from datetime import datetime
from uuid import UUID
# Convert types to serializable formats
job = await job_creator.create_job(
job_type="user.process",
payload={
"user_id": str(user_id), # UUID → str
"timestamp": datetime.now().isoformat(), # datetime → ISO string
"amount": float(amount), # Decimal → float
"data": dict(data), # Custom object → dict
},
)
What happens if I create a job with an unregistered job_type?
The implementation will raise ValueError if the job type doesn't have a registered handler:
try:
job = await job_creator.create_job(
job_type="nonexistent.job",
payload={...},
)
except ValueError as e:
print(f"Job type not registered: {e}")
# Handle error - maybe log and skip, or notify admin
Register job handlers before creating jobs of that type.
How do I test code that uses JobCreator?
Use MockJobCreator to test without real job infrastructure:
from portico.ports.job_creator import JobCreator
from portico.ports.job import Job, JobStatus
class MockJobCreator(JobCreator):
def __init__(self):
self.created_jobs = []
async def create_job(self, job_type, payload, **kwargs):
job = Job(id=uuid4(), job_type=job_type, payload=payload, ...)
self.created_jobs.append(job)
return job
# Use in tests
@pytest.mark.asyncio
async def test_my_adapter():
mock_creator = MockJobCreator()
adapter = MyAdapter(job_creator=mock_creator)
await adapter.do_something()
# Verify job creation
assert len(mock_creator.created_jobs) == 1
assert mock_creator.created_jobs[0].job_type == "expected.type"
Can I create jobs synchronously?
No, create_job() is an async method and must be awaited. Job creation involves I/O operations (database, queue):
# ✅ CORRECT - Async usage
job = await job_creator.create_job(...)
# ❌ WRONG - Synchronous attempt
job = job_creator.create_job(...) # Returns coroutine, doesn't execute
If you need to create jobs from synchronous code, you'll need to run it in an async context: