Auth Kit
Overview
Purpose: Provide secure user authentication, password hashing with bcrypt, and session management for web applications.
Key Features:
- User authentication with username/email and password
- Secure password hashing using bcrypt
- Session creation and validation with configurable expiration
- Token-based session management
- Automatic session cleanup for expired sessions
- Event publishing for login/logout tracking
- Stateless or stateful session modes
Dependencies:
- Injected services: UserKit (for user data access)
- Port dependencies: None (uses UserRepository from UserKit)
- Note: Kits cannot directly import from other kits (enforced by import-linter contract #6). Dependencies are injected via constructor in
compose.py.
Quick Start
from portico import compose
# Basic configuration
app = compose.webapp(
database_url="postgresql://localhost/myapp",
kits=[
compose.user(password_min_length=8),
compose.auth(
session_secret="your-secret-key-minimum-32-characters-long",
session_timeout_minutes=60,
),
]
)
# Access the auth service
auth_service = app.kits["auth"].service
# Authenticate a user
from portico.exceptions import AuthenticationError
try:
result = await auth_service.authenticate(
username="user@example.com",
password="user_password"
)
# result.user, result.token, result.expires_at
except AuthenticationError:
# Invalid credentials
pass
Core Concepts
Authentication Flow
The Auth Kit provides a complete authentication workflow from credentials to session:
from portico.exceptions import AuthenticationError
# Step 1: Authenticate with credentials
try:
result = await auth_service.authenticate(
username="user@example.com", # Can be email or username
password="secure_password"
)
# result contains:
# - result.user: User domain model
# - result.token: Session token (opaque string)
# - result.expires_at: Expiration datetime
# Store token in cookie or response header
response.set_cookie("session_token", result.token, httponly=True)
except AuthenticationError as e:
# Credentials invalid - same error for user not found or wrong password
return {"error": str(e)}
The authentication process:
- Lookup: Finds user by email or username
- Verify: Checks password against bcrypt hash
- Create Session: Generates secure token and stores session
- Publish Event: Emits
UserLoggedInEventfor audit/tracking - Return Result: Provides user, token, and expiration
Password Hashing
Passwords are hashed using bcrypt with automatic salt generation:
# Hash a password (typically done by UserKit during registration)
hashed = AuthenticationService.hash_password("user_password")
# Returns: "$2b$12$..." (bcrypt hash with embedded salt)
# Verify a password (done automatically during authentication)
is_valid = await auth_service.verify_password(user_id, "user_password")
# Returns: True if password matches hash
Bcrypt properties:
- Adaptive: Configurable work factor (rounds) for future-proofing
- Salted: Unique salt per password prevents rainbow table attacks
- Slow: Intentionally slow to resist brute-force attacks
Session Management
Sessions store authenticated user state with expiration:
# Create a session (done automatically during authentication)
session = await auth_service.create_session(user_id)
# session.token, session.expires_at
# Validate a session (e.g., on each authenticated request)
user = await auth_service.validate_session(token)
if user:
# Session valid, user authenticated
pass
else:
# Session invalid, expired, or not found
pass
# Logout (invalidate session)
success = await auth_service.logout(token)
Sessions are:
- Time-limited: Expire after
session_timeout_minutes - Secure tokens: Generated with
secrets.token_urlsafe(32)(256 bits) - Database-backed: Stored in
auth_sessionstable (unless stateless mode) - User-scoped: Each session belongs to one user
Stateless vs Stateful Mode
The Auth Kit supports two session modes:
# Stateful (default) - sessions stored in database
compose.auth(
session_secret="...",
enable_session_storage=True # Default
)
# Stateless - no database storage (requires JWT or similar)
compose.auth(
session_secret="...",
enable_session_storage=False
)
Stateful mode (recommended):
- Sessions stored in
auth_sessionstable - Can validate and invalidate sessions
- Supports logout functionality
- Requires database queries for validation
Stateless mode:
- No database storage
- Cannot validate sessions (returns None)
- No logout support
- Requires external token system (JWT)
Configuration
Required Settings
| Setting | Type | Description | Example |
|---|---|---|---|
session_secret |
str |
Secret key for signing tokens (min 32 chars) | "your-secret-key-minimum-32-characters-long" |
Generate a secure secret:
Optional Settings
| Setting | Type | Default | Description |
|---|---|---|---|
session_timeout_minutes |
int |
30 |
Session expiration time in minutes |
enable_session_storage |
bool |
True |
Whether to store sessions in database |
Example:
from portico import compose
app = compose.webapp(
kits=[
compose.user(),
compose.auth(
session_secret="your-secret-key-minimum-32-characters-long",
session_timeout_minutes=1440, # 24 hours
enable_session_storage=True,
),
]
)
Usage Examples
Example 1: Login Endpoint
from fastapi import Form, Response
from portico.exceptions import AuthenticationError
@app.post("/login")
async def login(
response: Response,
email: str = Form(...),
password: str = Form(...),
):
auth_service = app.kits["auth"].service
try:
result = await auth_service.authenticate(
username=email, # Can be email or username
password=password
)
# Set session cookie
response.set_cookie(
key="session_token",
value=result.token,
httponly=True, # Prevent JavaScript access
secure=True, # HTTPS only
samesite="lax", # CSRF protection
max_age=60 * 60, # 1 hour
)
return {
"success": True,
"user_id": str(result.user.id),
"email": result.user.email,
"expires_at": result.expires_at.isoformat(),
}
except AuthenticationError as e:
return {"success": False, "error": str(e)}, 401
Example 2: Protected Endpoint with Session Validation
from fastapi import Cookie, HTTPException
@app.get("/profile")
async def get_profile(session_token: str = Cookie(None)):
if not session_token:
raise HTTPException(status_code=401, detail="Not authenticated")
auth_service = app.kits["auth"].service
user = await auth_service.validate_session(session_token)
if not user:
raise HTTPException(status_code=401, detail="Session invalid or expired")
return {
"user_id": str(user.id),
"email": user.email,
"username": user.username,
}
Example 3: Logout Endpoint
@app.post("/logout")
async def logout(
response: Response,
session_token: str = Cookie(None)
):
if session_token:
auth_service = app.kits["auth"].service
await auth_service.logout(session_token)
# Clear cookie
response.delete_cookie("session_token")
return {"success": True}
Example 4: Using FastAPI Dependency Injection
from portico.kits.fastapi import Dependencies
deps = Dependencies(app)
@app.get("/dashboard")
async def dashboard(current_user = deps.current_user):
# current_user is automatically validated from session
# Raises 401 if session invalid
return {
"welcome": f"Hello, {current_user.email}!",
"user_id": str(current_user.id),
}
@app.get("/public")
async def public_page(user = deps.optional_user):
# user is None if not authenticated, otherwise validated User
if user:
return {"message": f"Welcome back, {user.email}"}
else:
return {"message": "Welcome, guest"}
Domain Models
Session
Represents an active user session.
| Field | Type | Required | Default | Description |
|---|---|---|---|---|
id |
UUID |
Yes | Auto | Unique session identifier |
user_id |
UUID |
Yes | - | User who owns this session |
token |
str |
Yes | - | Session token (opaque string for client) |
expires_at |
datetime |
Yes | - | When the session expires (UTC) |
created_at |
datetime |
Yes | Auto | When the session was created (UTC) |
AuthResult
Result of successful authentication.
| Field | Type | Description |
|---|---|---|
user |
User |
Authenticated user (from UserKit) |
token |
str |
Session token to return to client |
expires_at |
datetime |
When the session expires (UTC) |
Database Models
SessionModel
Table: auth_sessions
Columns:
id: UUID, primary keyuser_id: UUID, foreign key tousers.id(cascade delete)token: String(255), unique, indexedexpires_at: DateTime with timezonecreated_at: DateTime with timezone
Indexes:
idx_sessions_token: Ontokencolumn for fast lookups
Relationships:
- Belongs to: User (via
user_id)
Events
This kit publishes the following events:
UserLoggedInEvent
Triggered when: A user successfully authenticates and creates a session.
Payload:
Listeners: Audit logging, security monitoring, usage analytics.
Example listener:
from portico.kits.auth.events import UserLoggedInEvent
async def log_login(event: UserLoggedInEvent):
print(f"User {event.user_id} logged in at {event.timestamp}")
events = app.events
await events.subscribe(UserLoggedInEvent, log_login)
UserLoggedOutEvent
Triggered when: A user explicitly logs out (session invalidated).
Payload:
Listeners: Audit logging, session cleanup handlers.
SessionExpiredEvent
Triggered when: A session expires (checked during validation).
Payload:
Listeners: Session cleanup, security monitoring.
Best Practices
1. Use Secure Session Cookies
Always set security flags on session cookies:
# ✅ GOOD - Secure cookie configuration
response.set_cookie(
key="session_token",
value=result.token,
httponly=True, # Prevent XSS attacks
secure=True, # HTTPS only
samesite="lax", # CSRF protection
max_age=3600, # 1 hour
)
# ❌ BAD - Insecure cookie
response.set_cookie("session_token", result.token)
# Vulnerable to XSS, works over HTTP, no CSRF protection
2. Store Session Secret in Environment Variables
Never hardcode session secrets in source code:
import os
# ✅ GOOD - Load from environment
app = compose.webapp(
kits=[
compose.auth(
session_secret=os.environ["SESSION_SECRET"],
session_timeout_minutes=60,
),
]
)
# ❌ BAD - Hardcoded secret
app = compose.webapp(
kits=[
compose.auth(
session_secret="my-secret-123", # In version control!
),
]
)
Generate and store securely:
# Generate secret
python -c "import secrets; print(secrets.token_urlsafe(32))"
# Add to .env file (never commit this file)
echo "SESSION_SECRET=<generated-secret>" >> .env
# Load in Python
from dotenv import load_dotenv
load_dotenv()
3. Implement Session Cleanup
Schedule periodic cleanup of expired sessions:
# ✅ GOOD - Scheduled cleanup
from apscheduler.schedulers.asyncio import AsyncIOScheduler
scheduler = AsyncIOScheduler()
async def cleanup_expired_sessions():
session_repo = app.kits["auth"].repository
count = await session_repo.delete_expired_sessions()
print(f"Cleaned up {count} expired sessions")
# Run daily at 3 AM
scheduler.add_job(cleanup_expired_sessions, "cron", hour=3)
scheduler.start()
# ❌ BAD - No cleanup
# Expired sessions accumulate in database indefinitely
4. Use Consistent Error Messages
Avoid revealing whether username exists:
# ✅ GOOD - Generic error message
try:
result = await auth_service.authenticate(username, password)
except AuthenticationError:
return {"error": "Invalid username or password"}, 401
# Doesn't reveal if user exists
# ❌ BAD - Reveals user existence
user = await user_service.get_by_email(email)
if not user:
return {"error": "User not found"}, 404 # Information leak
if not verify_password(password):
return {"error": "Wrong password"}, 401
5. Set Appropriate Session Timeouts
Balance security and user experience:
# ✅ GOOD - Context-appropriate timeouts
# Banking app - short timeout
compose.auth(session_timeout_minutes=15)
# Internal tool - medium timeout
compose.auth(session_timeout_minutes=480) # 8 hours
# Public forum - longer timeout
compose.auth(session_timeout_minutes=10080) # 7 days
# ❌ BAD - One size fits all
compose.auth(session_timeout_minutes=30) # Same for all apps
6. Handle Session Validation Errors Gracefully
Provide clear feedback for expired sessions:
# ✅ GOOD - Clear error handling
@app.get("/api/data")
async def get_data(session_token: str = Cookie(None)):
if not session_token:
raise HTTPException(
status_code=401,
detail="Not authenticated",
headers={"WWW-Authenticate": "Cookie"}
)
user = await auth_service.validate_session(session_token)
if not user:
raise HTTPException(
status_code=401,
detail="Session expired or invalid"
)
return {"data": "..."}
# ❌ BAD - Unclear errors
@app.get("/api/data")
async def get_data(session_token: str = Cookie(None)):
user = await auth_service.validate_session(session_token)
if not user:
return {"error": "error"} # What error? Why?
7. Use Dependency Injection for Protected Routes
Leverage FastAPI dependencies for cleaner code:
from portico.kits.fastapi import Dependencies
deps = Dependencies(app)
# ✅ GOOD - Dependency injection
@app.get("/protected")
async def protected_route(current_user = deps.current_user):
# current_user automatically validated
return {"user": current_user.email}
# ❌ BAD - Manual validation in every route
@app.get("/protected")
async def protected_route(session_token: str = Cookie(None)):
if not session_token:
raise HTTPException(status_code=401)
user = await auth_service.validate_session(session_token)
if not user:
raise HTTPException(status_code=401)
# Repeated boilerplate
return {"user": user.email}
Security Considerations
Password Security
The Auth Kit uses bcrypt for password hashing:
- Never store plaintext passwords
- Never log passwords (even in debug mode)
- Use bcrypt's adaptive cost (automatically future-proof)
- Password hashing is CPU-intensive (intentional anti-brute-force)
Session Token Security
Session tokens are generated with cryptographically secure random bytes:
- 256-bit tokens via
secrets.token_urlsafe(32) - Opaque tokens (not JWTs - no embedded data)
- Single-use (invalidated on logout)
- Time-limited (automatic expiration)
Common Attack Mitigations
Brute Force:
- Implement rate limiting on login endpoints
- Use slow password hashing (bcrypt)
- Consider account lockout after N failures
Session Fixation:
- Generate new token on each login
- Don't accept tokens from query parameters
Session Hijacking:
- Use
httponlycookies (prevent XSS) - Use
securecookies (HTTPS only) - Consider binding sessions to IP/User-Agent
CSRF:
- Use
samesitecookie attribute - Implement CSRF tokens for state-changing operations
HTTPS Requirement
Always use HTTPS in production:
# Production configuration
response.set_cookie(
"session_token",
token,
secure=True, # Requires HTTPS
httponly=True,
samesite="lax"
)
FAQs
Q: How do I implement "Remember Me" functionality?
A: Create two session types with different timeouts:
# Standard session (30 minutes)
result = await auth_service.authenticate(username, password)
# If "remember me" checked, create longer session
if remember_me:
# Override config temporarily or create custom session
long_session = await auth_service.create_session(user.id)
# Manually set longer expiration
Alternatively, use a separate "remember me" token table with longer expiration.
Q: Can I use JWT tokens instead of database sessions?
A: Yes, use stateless mode and implement JWT separately:
compose.auth(
session_secret="...",
enable_session_storage=False # Stateless mode
)
# Implement JWT in your application layer
import jwt
def create_jwt(user_id: UUID) -> str:
payload = {"user_id": str(user_id), "exp": datetime.now(UTC) + timedelta(hours=1)}
return jwt.encode(payload, secret_key, algorithm="HS256")
Q: How do I implement "logout from all devices"?
A: Query all sessions for a user and delete them:
async def logout_all_devices(user_id: UUID):
# Custom query in SessionRepository
async with database.transaction() as session:
await session.execute(
delete(SessionModel).where(SessionModel.user_id == user_id)
)
Q: Should I validate sessions on every request?
A: Yes, for protected endpoints. Use dependency injection to minimize boilerplate:
deps = Dependencies(app)
@app.get("/protected")
async def protected(user = deps.current_user):
# Session validated automatically on each request
pass
For high-traffic APIs, consider caching session validation with Redis.
Q: How do I handle concurrent login sessions?
A: By default, multiple sessions are allowed (user can login from multiple devices). To enforce single session:
async def authenticate_single_session(username: str, password: str):
result = await auth_service.authenticate(username, password)
# Delete other sessions for this user
async with database.transaction() as session:
await session.execute(
delete(SessionModel).where(
SessionModel.user_id == result.user.id,
SessionModel.token != result.token
)
)
return result
Q: What's the performance impact of bcrypt?
A: Bcrypt is intentionally slow (anti-brute-force). Expect ~100-300ms per hash/verify on modern hardware. This is acceptable for login (infrequent) but too slow for per-request operations. Never hash passwords on every request - use session tokens instead.
Q: How do I test authentication in unit tests?
A: Create test sessions directly or mock the auth service:
# Create test user and session
user = await user_kit.service.create_user(CreateUserRequest(...))
session = await auth_kit.service.create_session(user.id)
# Use session token in test requests
response = client.get("/protected", cookies={"session_token": session.token})
Q: Can I customize the session token format?
A: Yes, modify create_session in your application:
# Custom token format (e.g., JWT)
import jwt
async def create_jwt_session(user_id: UUID):
payload = {"user_id": str(user_id), "exp": ...}
token = jwt.encode(payload, secret_key, algorithm="HS256")
# Still store in database for validation/logout
return await session_repo.create_session(user_id, token, expires_at)