Job Trigger Port
Overview
The Job Trigger Port defines the contract for mechanisms that create jobs from external events. Triggers are adapters that respond to various event sources (HTTP webhooks, cron schedules, file system changes, message queues, etc.) and create background jobs using the JobCreator port.
Purpose: Enable pluggable event sources for job creation while maintaining hexagonal architecture by depending on the JobCreator port interface rather than concrete service implementations.
Domain: Background job automation, event-driven job creation, and scheduled task execution
Key Capabilities:
- Create jobs from external event sources (webhooks, schedules, queues, file systems)
- Lifecycle management (start/stop triggers)
- Decoupled from job execution (only creates jobs, doesn't process them)
- Integration with JobCreator port for dependency inversion
Port Type: Adapter
When to Use:
- When implementing event-driven job creation (webhooks, API callbacks)
- When building scheduled task systems (cron-like job scheduling)
- When integrating with external systems that should trigger job processing
- When you need automated job creation from various sources
Port Interface
JobTrigger
The JobTrigger abstract base class defines the contract for trigger implementations.
Location: portico.ports.job_trigger.JobTrigger
Methods
start
Start the trigger to begin creating jobs from events.
Returns: None
Purpose: Initialize the trigger and begin listening for events. This might start: - HTTP server listening for webhooks - Cron scheduler polling for scheduled times - File system watcher monitoring directories - Message queue consumer reading from topics
Example:
from portico.adapters.job_trigger.webhook_trigger import WebhookTrigger
# Create trigger
trigger = WebhookTrigger(job_creator=job_service)
# Start listening for webhook events
await trigger.start()
# Trigger is now running and creating jobs from webhooks
stop
Stop the trigger gracefully, ceasing job creation from events.
Returns: None
Purpose: Shutdown the trigger and stop listening for events. Should gracefully cleanup resources (close connections, unregister listeners, stop schedulers).
Example:
is_running
Check if trigger is currently running.
Returns: bool - True if trigger is active and creating jobs from events
Purpose: Query trigger status for monitoring, health checks, or preventing duplicate starts.
Example:
if trigger.is_running:
print("Trigger is active and processing events")
else:
print("Trigger is stopped")
Architectural Pattern
Job Triggers follow hexagonal architecture by depending on the JobCreator port rather than JobService directly:
┌────────────────────────────────┐
│ Job Triggers (Adapters) │
│ - WebhookTrigger │
│ - ScheduleTrigger │
│ - FileTrigger │
│ - MessageQueueTrigger │
└────────────────────────────────┘
↓ depends on
┌────────────────────────────────┐
│ JobCreator Port (Interface) │
│ - create_job() │
└────────────────────────────────┘
↑ implemented by
┌────────────────────────────────┐
│ JobService (Kit) │
│ - Implements JobCreator │
│ - Uses JobQueue adapter │
└────────────────────────────────┘
Why this matters: - ✅ Triggers don't depend on concrete JobService implementation - ✅ Can test triggers with mock JobCreator - ✅ Maintains clean separation of concerns - ✅ Follows dependency inversion principle
Common Patterns
1. Webhook Trigger for API Integration
from fastapi import FastAPI
from portico.adapters.job_trigger.webhook_trigger import (
WebhookTrigger,
WebhookConfig
)
from portico.kits.job.job_service import JobService
# Create job service (implements JobCreator)
job_service = JobService(job_queue=job_queue_adapter)
# Create webhook trigger
webhook_config = WebhookConfig(
prefix="/webhooks",
allowed_job_types=["email.send", "report.generate", "data.import"]
)
webhook_trigger = WebhookTrigger(
job_creator=job_service,
config=webhook_config
)
# Integrate with FastAPI
app = FastAPI()
app.include_router(webhook_trigger.router)
# Start trigger
await webhook_trigger.start()
# External systems can now create jobs via HTTP POST
# POST /webhooks/jobs
# {
# "job_type": "email.send",
# "payload": {"recipient": "user@example.com"},
# "queue_name": "emails",
# "priority": 5
# }
2. Scheduled Trigger for Cron Jobs
from portico.adapters.job_trigger.schedule_trigger import (
ScheduleTrigger,
ScheduleConfig
)
# Define schedule configurations
schedules = [
# Daily report at 9 AM
ScheduleConfig(
cron="0 9 * * *",
job_type="report.daily",
payload={"report_type": "daily_summary"},
queue_name="reports",
priority=5
),
# Cleanup every hour
ScheduleConfig(
cron="0 * * * *",
job_type="cleanup.temp_files",
payload={"max_age_hours": 24},
queue_name="maintenance",
priority=1
),
# Weekly backup on Sundays at 2 AM
ScheduleConfig(
cron="0 2 * * 0",
job_type="backup.full",
payload={"backup_type": "weekly"},
queue_name="backups",
priority=10
)
]
# Create schedule trigger
schedule_trigger = ScheduleTrigger(
job_creator=job_service,
schedules=schedules
)
# Start scheduler
await schedule_trigger.start()
# Jobs will now be created automatically on schedule
3. File System Trigger for Processing Uploads
import asyncio
from pathlib import Path
from portico.ports.job_creator import JobCreator
from portico.ports.job_trigger import JobTrigger
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
class FileTrigger(JobTrigger):
"""Trigger that creates jobs when files are added to a directory."""
def __init__(
self,
job_creator: JobCreator,
watch_path: Path,
job_type: str,
queue_name: str = "file_processing"
):
self.job_creator = job_creator
self.watch_path = watch_path
self.job_type = job_type
self.queue_name = queue_name
self._observer = None
self._running = False
async def start(self) -> None:
"""Start watching the directory."""
if self._running:
return
self._running = True
event_handler = FileCreatedHandler(
self.job_creator,
self.job_type,
self.queue_name
)
self._observer = Observer()
self._observer.schedule(
event_handler,
str(self.watch_path),
recursive=True
)
self._observer.start()
async def stop(self) -> None:
"""Stop watching the directory."""
if not self._running:
return
self._running = False
if self._observer:
self._observer.stop()
self._observer.join()
@property
def is_running(self) -> bool:
return self._running
class FileCreatedHandler(FileSystemEventHandler):
"""Handler that creates jobs for new files."""
def __init__(self, job_creator: JobCreator, job_type: str, queue_name: str):
self.job_creator = job_creator
self.job_type = job_type
self.queue_name = queue_name
def on_created(self, event):
"""Called when file is created."""
if not event.is_directory:
# Create job asynchronously
asyncio.create_task(
self.job_creator.create_job(
job_type=self.job_type,
payload={"file_path": event.src_path},
queue_name=self.queue_name
)
)
# Usage
file_trigger = FileTrigger(
job_creator=job_service,
watch_path=Path("/uploads"),
job_type="file.process",
queue_name="file_processing"
)
await file_trigger.start()
# Jobs will be created automatically when files are uploaded to /uploads
4. Message Queue Trigger for Event-Driven Processing
import json
from portico.ports.job_creator import JobCreator
from portico.ports.job_trigger import JobTrigger
class RabbitMQTrigger(JobTrigger):
"""Trigger that creates jobs from RabbitMQ messages."""
def __init__(
self,
job_creator: JobCreator,
rabbitmq_url: str,
queue_name: str,
job_type_mapping: dict[str, str]
):
self.job_creator = job_creator
self.rabbitmq_url = rabbitmq_url
self.queue_name = queue_name
self.job_type_mapping = job_type_mapping
self._connection = None
self._channel = None
self._running = False
async def start(self) -> None:
"""Start consuming messages from RabbitMQ."""
if self._running:
return
self._running = True
# Connect to RabbitMQ
self._connection = await aio_pika.connect_robust(self.rabbitmq_url)
self._channel = await self._connection.channel()
# Declare queue
queue = await self._channel.declare_queue(
self.queue_name,
durable=True
)
# Start consuming
await queue.consume(self._on_message)
async def stop(self) -> None:
"""Stop consuming messages."""
if not self._running:
return
self._running = False
if self._connection:
await self._connection.close()
@property
def is_running(self) -> bool:
return self._running
async def _on_message(self, message):
"""Handle incoming message by creating job."""
async with message.process():
try:
# Parse message
data = json.loads(message.body.decode())
# Map event type to job type
event_type = data.get("event_type")
job_type = self.job_type_mapping.get(event_type)
if job_type:
# Create job from message
await self.job_creator.create_job(
job_type=job_type,
payload=data.get("payload", {}),
queue_name=data.get("queue", "default"),
priority=data.get("priority", 0)
)
except Exception as e:
logger.error("error_processing_message", error=str(e))
# Usage
mq_trigger = RabbitMQTrigger(
job_creator=job_service,
rabbitmq_url="amqp://localhost",
queue_name="job_events",
job_type_mapping={
"user.created": "email.welcome",
"order.placed": "order.process",
"payment.received": "invoice.send"
}
)
await mq_trigger.start()
# Jobs will be created from RabbitMQ messages
5. Multiple Triggers with Application Lifecycle
from contextlib import asynccontextmanager
from fastapi import FastAPI
from portico.ports.job_trigger import JobTrigger
# Create multiple triggers
webhook_trigger = WebhookTrigger(job_creator=job_service)
schedule_trigger = ScheduleTrigger(job_creator=job_service, schedules=schedules)
file_trigger = FileTrigger(job_creator=job_service, watch_path=Path("/uploads"))
# Manage trigger lifecycle with application
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan manager."""
# Startup: Start all triggers
triggers = [webhook_trigger, schedule_trigger, file_trigger]
for trigger in triggers:
await trigger.start()
logger.info(
"trigger_started",
trigger_type=type(trigger).__name__,
is_running=trigger.is_running
)
yield # Application runs
# Shutdown: Stop all triggers
for trigger in triggers:
await trigger.stop()
logger.info(
"trigger_stopped",
trigger_type=type(trigger).__name__,
is_running=trigger.is_running
)
app = FastAPI(lifespan=lifespan)
app.include_router(webhook_trigger.router)
# All triggers start with app and stop gracefully on shutdown
6. Custom Database Change Trigger
from datetime import datetime, timedelta
from portico.ports.job_creator import JobCreator
from portico.ports.job_trigger import JobTrigger
class DatabasePollingTrigger(JobTrigger):
"""Trigger that polls database for new records and creates jobs."""
def __init__(
self,
job_creator: JobCreator,
database: Any,
poll_interval_seconds: int = 60
):
self.job_creator = job_creator
self.database = database
self.poll_interval = poll_interval_seconds
self._task = None
self._running = False
async def start(self) -> None:
"""Start polling database."""
if self._running:
return
self._running = True
self._task = asyncio.create_task(self._poll_loop())
async def stop(self) -> None:
"""Stop polling database."""
if not self._running:
return
self._running = False
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
@property
def is_running(self) -> bool:
return self._running
async def _poll_loop(self) -> None:
"""Poll database for new pending items."""
while self._running:
try:
# Query for pending items
pending_items = await self._get_pending_items()
for item in pending_items:
# Create job for each pending item
await self.job_creator.create_job(
job_type="item.process",
payload={
"item_id": str(item.id),
"data": item.data
},
queue_name="processing"
)
# Mark as queued
await self._mark_as_queued(item.id)
# Wait before next poll
await asyncio.sleep(self.poll_interval)
except asyncio.CancelledError:
break
except Exception as e:
logger.error("polling_error", error=str(e))
await asyncio.sleep(self.poll_interval)
async def _get_pending_items(self):
"""Get items that need processing."""
async with self.database.session() as session:
stmt = select(ItemModel).where(
ItemModel.status == "pending",
ItemModel.created_at > datetime.now() - timedelta(hours=1)
)
result = await session.execute(stmt)
return result.scalars().all()
async def _mark_as_queued(self, item_id):
"""Mark item as queued for processing."""
async with self.database.session() as session:
stmt = (
update(ItemModel)
.where(ItemModel.id == item_id)
.values(status="queued")
)
await session.execute(stmt)
await session.commit()
# Usage
db_trigger = DatabasePollingTrigger(
job_creator=job_service,
database=database,
poll_interval_seconds=30
)
await db_trigger.start()
# Polls database every 30 seconds and creates jobs for pending items
Best Practices
1. Depend on JobCreator Port, Not JobService
Triggers should depend on the JobCreator interface for testability and flexibility.
# ✅ GOOD - Depends on port interface
class WebhookTrigger(JobTrigger):
def __init__(self, job_creator: JobCreator):
self.job_creator = job_creator # Interface dependency
async def _create_job_endpoint(self, request):
job = await self.job_creator.create_job( # Uses interface
job_type=request.job_type,
payload=request.payload
)
# ❌ BAD - Depends on concrete implementation
class WebhookTrigger(JobTrigger):
def __init__(self, job_service: JobService): # ❌ Concrete dependency
self.job_service = job_service
async def _create_job_endpoint(self, request):
job = await self.job_service.create_job(...) # Tight coupling
2. Implement Graceful Shutdown
Properly cleanup resources in stop() method.
# ✅ GOOD - Graceful shutdown
class ScheduleTrigger(JobTrigger):
async def stop(self) -> None:
if not self._running:
return
logger.info("stopping_schedule_trigger")
# Shutdown scheduler gracefully
self._scheduler.shutdown(wait=True) # Wait for running jobs
self._running = False
logger.info("schedule_trigger_stopped")
# ❌ BAD - Abrupt shutdown
class ScheduleTrigger(JobTrigger):
async def stop(self) -> None:
self._scheduler.shutdown(wait=False) # ❌ Doesn't wait
self._running = False # ❌ No cleanup
3. Prevent Duplicate Starts
Check if already running before starting.
# ✅ GOOD - Idempotent start
async def start(self) -> None:
if self._running:
logger.warning("trigger_already_running")
return # Don't start again
logger.info("starting_trigger")
self._running = True
# Start logic...
# ❌ BAD - Can start multiple times
async def start(self) -> None:
# ❌ No check, could start multiple schedulers/listeners
self._running = True
self._scheduler.start()
4. Handle Errors in Event Processing
Catch and log errors when creating jobs from events.
# ✅ GOOD - Error handling
async def _on_webhook_request(self, request):
try:
job = await self.job_creator.create_job(
job_type=request.job_type,
payload=request.payload
)
logger.info("job_created_from_webhook", job_id=str(job.id))
return {"job_id": str(job.id)}
except ValidationError as e:
logger.warning("invalid_webhook_request", error=str(e))
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(
"webhook_job_creation_failed",
error=str(e),
exc_info=True
)
raise HTTPException(status_code=500, detail="Job creation failed")
# ❌ BAD - No error handling
async def _on_webhook_request(self, request):
job = await self.job_creator.create_job(...) # ❌ Unhandled errors
return {"job_id": str(job.id)}
5. Use Structured Logging
Log trigger lifecycle and job creation events with context.
# ✅ GOOD - Structured logging
async def start(self) -> None:
logger.info(
"starting_webhook_trigger",
prefix=self.config.prefix,
allowed_types=self.config.allowed_job_types
)
self._running = True
logger.info("webhook_trigger_started")
async def _create_job(self, config):
try:
job = await self.job_creator.create_job(...)
logger.info(
"job_created_from_schedule",
job_id=str(job.id),
job_type=config.job_type,
cron=config.cron
)
except Exception as e:
logger.error(
"scheduled_job_creation_failed",
job_type=config.job_type,
error=str(e),
exc_info=True
)
# ❌ BAD - Unstructured logging
async def start(self) -> None:
print("Starting trigger") # ❌ Not structured
self._running = True
6. Validate Input from External Sources
Validate data from webhooks, messages, files before creating jobs.
# ✅ GOOD - Input validation
async def _on_webhook_request(self, request: WebhookJobRequest):
# Validate job type allowlist
if self.config.allowed_job_types:
if request.job_type not in self.config.allowed_job_types:
raise HTTPException(
status_code=403,
detail=f"Job type '{request.job_type}' not allowed"
)
# Validate payload is JSON-serializable
try:
json.dumps(request.payload)
except TypeError:
raise HTTPException(
status_code=400,
detail="Payload must be JSON-serializable"
)
# Create job
job = await self.job_creator.create_job(...)
# ❌ BAD - No validation
async def _on_webhook_request(self, request):
# ❌ Accepts any job type
# ❌ Doesn't validate payload
job = await self.job_creator.create_job(
job_type=request.job_type,
payload=request.payload # Might not be serializable
)
7. Use Dependency Injection for Testing
Accept JobCreator in constructor for easy testing.
# ✅ GOOD - Dependency injection
class WebhookTrigger(JobTrigger):
def __init__(self, job_creator: JobCreator):
self.job_creator = job_creator
# Testing
from unittest.mock import AsyncMock
mock_creator = AsyncMock(spec=JobCreator)
trigger = WebhookTrigger(job_creator=mock_creator)
# Can verify job_creator.create_job was called
await trigger._create_job_endpoint(request)
mock_creator.create_job.assert_called_once()
# ❌ BAD - Creates dependencies internally
class WebhookTrigger(JobTrigger):
def __init__(self, job_queue_url: str):
# ❌ Hard to test
self.job_service = JobService(JobQueueAdapter(job_queue_url))
FAQs
What's the difference between JobTrigger and JobHandler?
JobTrigger (creates jobs): - Input: External events (webhooks, schedules, messages) - Output: Jobs in queue - Purpose: Trigger job creation from external sources - Depends on: JobCreator port - Examples: WebhookTrigger, ScheduleTrigger
JobHandler (processes jobs): - Input: Jobs from queue - Output: Job results - Purpose: Execute business logic for job types - Depends on: Business services - Examples: EmailHandler, ReportHandler
How do I test job triggers in isolation?
Use a mock JobCreator to verify trigger behavior:
import pytest
from unittest.mock import AsyncMock
from portico.ports.job_creator import JobCreator
@pytest.mark.asyncio
async def test_webhook_trigger_creates_job():
# Mock JobCreator
mock_creator = AsyncMock(spec=JobCreator)
mock_creator.create_job.return_value = Job(
id=uuid4(),
job_type="test.job",
queue_name="default",
payload={}
)
# Create trigger with mock
trigger = WebhookTrigger(job_creator=mock_creator)
# Simulate webhook request
await trigger._create_job_endpoint(
WebhookJobRequest(
job_type="test.job",
payload={"data": "test"}
)
)
# Verify create_job was called
mock_creator.create_job.assert_called_once_with(
job_type="test.job",
payload={"data": "test"},
queue_name="default",
priority=0
)
How do I prevent duplicate scheduled jobs?
Use idempotency keys or check for existing jobs before creating:
class ScheduleTrigger(JobTrigger):
async def _create_scheduled_job(self, config: ScheduleConfig):
# Option 1: Use deterministic job ID
job_id = uuid5(NAMESPACE_DNS, f"{config.job_type}:{config.cron}")
# Check if job already exists
existing = await self.job_creator.get_job(job_id)
if existing and existing.status in [JobStatus.PENDING, JobStatus.SCHEDULED]:
logger.debug("scheduled_job_already_exists", job_id=str(job_id))
return
# Create job with deterministic ID
await self.job_creator.create_job(
job_type=config.job_type,
payload={**config.payload, "scheduled_run": datetime.now().isoformat()},
metadata={"idempotency_key": str(job_id)}
)
Can triggers create jobs with different priorities?
Yes, triggers can set priority when creating jobs:
# Webhook trigger with dynamic priority
async def _create_job_endpoint(self, request: WebhookJobRequest):
# Map job type to priority
priority_map = {
"payment.process": 10, # High priority
"email.send": 5, # Medium priority
"cleanup.logs": 1 # Low priority
}
priority = priority_map.get(request.job_type, request.priority)
job = await self.job_creator.create_job(
job_type=request.job_type,
payload=request.payload,
priority=priority
)
# Schedule trigger with configured priorities
schedules = [
ScheduleConfig(
cron="*/5 * * * *",
job_type="health.check",
payload={},
priority=1 # Low priority
),
ScheduleConfig(
cron="0 9 * * *",
job_type="report.daily",
payload={},
priority=10 # High priority
)
]
How do I handle rate limiting in triggers?
Implement rate limiting before creating jobs:
from asyncio import Semaphore
from collections import defaultdict
from datetime import datetime, timedelta
class RateLimitedWebhookTrigger(JobTrigger):
def __init__(self, job_creator: JobCreator, max_requests_per_minute: int = 60):
self.job_creator = job_creator
self.max_requests = max_requests_per_minute
self.request_counts = defaultdict(list)
self._semaphore = Semaphore(max_requests_per_minute)
async def _create_job_endpoint(self, request: WebhookJobRequest):
# Check rate limit
now = datetime.now()
one_minute_ago = now - timedelta(minutes=1)
# Clean old requests
self.request_counts[request.job_type] = [
ts for ts in self.request_counts[request.job_type]
if ts > one_minute_ago
]
# Check limit
if len(self.request_counts[request.job_type]) >= self.max_requests:
raise HTTPException(
status_code=429,
detail=f"Rate limit exceeded for {request.job_type}"
)
# Record request
self.request_counts[request.job_type].append(now)
# Create job
async with self._semaphore:
return await self.job_creator.create_job(
job_type=request.job_type,
payload=request.payload
)
Should triggers validate job payloads?
Yes, triggers should validate inputs from external sources:
from pydantic import BaseModel, validator
class EmailJobPayload(BaseModel):
recipient: str
subject: str
body: str
@validator('recipient')
def validate_email(cls, v):
if '@' not in v:
raise ValueError('Invalid email address')
return v
class WebhookTrigger(JobTrigger):
def __init__(self, job_creator: JobCreator, payload_schemas: dict):
self.job_creator = job_creator
self.payload_schemas = payload_schemas # job_type -> Pydantic model
async def _create_job_endpoint(self, request: WebhookJobRequest):
# Validate payload against schema
schema = self.payload_schemas.get(request.job_type)
if schema:
try:
validated = schema(**request.payload)
payload = validated.dict()
except ValidationError as e:
raise HTTPException(
status_code=400,
detail=f"Invalid payload: {e}"
)
else:
payload = request.payload
# Create job with validated payload
return await self.job_creator.create_job(
job_type=request.job_type,
payload=payload
)
# Usage
trigger = WebhookTrigger(
job_creator=job_service,
payload_schemas={
"email.send": EmailJobPayload,
"report.generate": ReportJobPayload
}
)
How do I monitor trigger health?
Implement health checks and metrics:
from dataclasses import dataclass
from datetime import datetime
@dataclass
class TriggerHealth:
is_running: bool
last_job_created: datetime | None
jobs_created_count: int
errors_count: int
class MonitoredWebhookTrigger(JobTrigger):
def __init__(self, job_creator: JobCreator):
self.job_creator = job_creator
self._running = False
self._last_job_created = None
self._jobs_created = 0
self._errors = 0
async def _create_job_endpoint(self, request: WebhookJobRequest):
try:
job = await self.job_creator.create_job(...)
# Update metrics
self._last_job_created = datetime.now()
self._jobs_created += 1
logger.info(
"job_created",
total_jobs=self._jobs_created,
total_errors=self._errors
)
return job
except Exception as e:
self._errors += 1
logger.error("job_creation_error", total_errors=self._errors)
raise
def get_health(self) -> TriggerHealth:
"""Get trigger health status."""
return TriggerHealth(
is_running=self._running,
last_job_created=self._last_job_created,
jobs_created_count=self._jobs_created,
errors_count=self._errors
)
# Health check endpoint
@app.get("/health/triggers")
async def trigger_health():
return {
"webhook": webhook_trigger.get_health(),
"schedule": schedule_trigger.get_health()
}
How do I handle trigger failures?
Implement retry logic and alerting:
class ResilientScheduleTrigger(JobTrigger):
async def _create_scheduled_job(self, config: ScheduleConfig):
max_retries = 3
for attempt in range(max_retries):
try:
job = await self.job_creator.create_job(
job_type=config.job_type,
payload=config.payload
)
logger.info(
"scheduled_job_created",
job_id=str(job.id),
attempt=attempt + 1
)
return
except Exception as e:
logger.warning(
"scheduled_job_creation_failed",
job_type=config.job_type,
attempt=attempt + 1,
error=str(e)
)
if attempt < max_retries - 1:
await asyncio.sleep(2 ** attempt) # Exponential backoff
else:
# All retries exhausted, send alert
logger.error(
"scheduled_job_creation_failed_permanently",
job_type=config.job_type,
error=str(e),
exc_info=True
)
await self.alert_service.send_alert(
level="error",
message=f"Failed to create scheduled job: {config.job_type}",
context={"error": str(e)}
)
Related Ports
- Job Creator Port (
portico.ports.job_creator) - Interface for creating jobs (used by triggers) - Job Handler Port (
portico.ports.job_handler) - Interface for processing jobs - Job Queue Port (
portico.ports.job_queue) - Interface for queue adapters
Related Kits
- JobService (
portico.kits.job) - Implements JobCreator, used by triggers to create jobs
Adapters
Available implementations:
- WebhookTrigger (
portico.adapters.job_trigger.webhook_trigger) - HTTP webhook-based trigger - ScheduleTrigger (
portico.adapters.job_trigger.schedule_trigger) - Cron-style scheduled trigger
Architecture Notes
The Job Trigger Port follows hexagonal architecture principles:
- Triggers are adapters: They adapt external event sources to job creation
- Dependency inversion: Triggers depend on JobCreator port, not JobService
- Separation of concerns: Job creation (triggers) is separated from job execution (handlers)
- Testability: Triggers can be tested with mock JobCreator
This pattern enables: - Multiple event sources for job creation (webhooks, schedules, files, queues) - Easy testing without running actual schedulers or servers - Flexible job creation logic independent of execution - Clean separation between infrastructure and domain logic