Implementation Plan: Local/Remote Task Execution & Credits System
ARCHIVE — This document is historical reference only. It may contain outdated information. See docs/status.md for current project state.
Feature: Local/Remote Task Execution Tracking & Credits System Timeline: ~6-8 weeks (13 steps) Status: Approved – Ready to implement
Overview Map
┌─────────────────────────────────────────────────────────────┐
│ TaskRouter Service │
│ Evaluates: capability + execution_preference + quota │
│ Returns: location decision + estimated cost │
└──────────────────────┬──────────────────────────────────────┘
│
┌──────────────┼──────────────┐
▼ ▼ ▼
┌─────────┐ ┌────────────┐ ┌──────────────┐
│ LOCAL │ │ REMOTE │ │ HYBRID │
│ (Tauri) │ │ (Python) │ │ (Both) │
└─────────┘ └────────────┘ └──────────────┘
│ │ │
└──────────────┼──────────────┘
│
┌──────────────┴──────────────┐
▼ ▼
┌──────────────────┐ ┌────────────────────┐
│ TaskProcessor │ │ TaskCreditsService │
│ (Executes) │ │ (Track + Enforce) │
└──────────────────┘ └────────────────────┘
│ │
└─────────────┬───────────────┘
│
┌─────────────┴─────────────┐
▼ ▼
┌──────────────┐ ┌──────────────────────┐
│ task_credits │ │ space_quotas table │
│ (audit) │ │ (monthly + weekly) │
└──────────────┘ └──────────────────────┘
│
└─────► TaskStore ABC ─────► PostgresTaskStore
│
└─ ► (Future) RedisTaskStore
S3TaskStore
Step-by-Step Implementation
STEP 1: Database Migrations (2 days)
Files: supabase/migrations/019_task_execution_credits.sql
Scope: M
Dependencies: None
Create/Modify:
-- Add execution tracking columns to tasks table
ALTER TABLE tasks ADD COLUMN IF NOT EXISTS execution_preference TEXT DEFAULT 'auto'
CHECK(execution_preference IN ('local', 'remote', 'auto', 'cost_optimized', 'performance_optimized'));
ALTER TABLE tasks ADD COLUMN IF NOT EXISTS execution_location TEXT
CHECK(execution_location IN ('local', 'remote', 'hybrid', null));
ALTER TABLE tasks ADD COLUMN IF NOT EXISTS executor_id TEXT; -- Identifies which worker (tauri:device-id or python:backend-1)
ALTER TABLE tasks ADD COLUMN IF NOT EXISTS estimated_credits NUMERIC(10,2);
ALTER TABLE tasks ADD COLUMN IF NOT EXISTS actual_credits NUMERIC(10,2);
ALTER TABLE tasks ADD COLUMN IF NOT EXISTS execution_metadata JSONB DEFAULT '{}';
-- New table: task_credits (audit trail)
CREATE TABLE task_credits (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
task_id UUID NOT NULL REFERENCES tasks(id) ON DELETE CASCADE,
space_id UUID NOT NULL REFERENCES spaces(id) ON DELETE CASCADE,
group_id UUID NOT NULL REFERENCES groups(id) ON DELETE CASCADE,
interface_id UUID, -- which integration charged
action_name TEXT, -- which action (send, read, chat, etc.)
base_credit NUMERIC(10,3) NOT NULL,
multiplier NUMERIC(5,2) DEFAULT 1.0, -- location surcharge or token-based
final_credit NUMERIC(10,3) NOT NULL,
reason TEXT, -- "execution", "retry", "api_call", etc.
created_at TIMESTAMPTZ DEFAULT NOW(),
CONSTRAINT positive_credits CHECK(final_credit >= 0)
);
-- New table: space_quotas (billing limits)
CREATE TABLE space_quotas (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
space_id UUID NOT NULL UNIQUE REFERENCES spaces(id) ON DELETE CASCADE,
group_id UUID NOT NULL REFERENCES groups(id) ON DELETE CASCADE,
-- Hard limit (monthly)
monthly_limit NUMERIC(10,2) NOT NULL DEFAULT 1000.0,
monthly_used NUMERIC(10,2) NOT NULL DEFAULT 0.0,
monthly_reset_date DATE NOT NULL DEFAULT CURRENT_DATE,
-- Soft limit (weekly)
weekly_limit NUMERIC(10,2) NOT NULL DEFAULT 250.0,
weekly_used NUMERIC(10,2) NOT NULL DEFAULT 0.0,
weekly_reset_date DATE NOT NULL DEFAULT CURRENT_DATE,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),
CONSTRAINT positive_limits CHECK(monthly_limit > 0 AND weekly_limit > 0)
);
-- Indexes
CREATE INDEX idx_task_credits_task ON task_credits(task_id);
CREATE INDEX idx_task_credits_space ON task_credits(space_id);
CREATE INDEX idx_task_credits_interface ON task_credits(interface_id);
CREATE INDEX idx_task_credits_created ON task_credits(created_at DESC);
CREATE INDEX idx_space_quotas_space ON space_quotas(space_id);
CREATE INDEX idx_tasks_execution_location ON tasks(execution_location);
CREATE INDEX idx_tasks_execution_preference ON tasks(execution_preference);
Tests: Migration test (verify columns exist, indexes created)
STEP 2: Pydantic Models (1 day)
Files:
backend/tasks/models.py(extend existing)backend/tasks/credits.py(new file)
Scope: S
Add to TaskBase/Task:
class TaskBase(BaseModel):
# ... existing fields ...
execution_preference: ExecutionPreference = ExecutionPreference.AUTO # NEW
class Task(TaskBase):
# ... existing fields ...
execution_location: Optional[ExecutionLocation] = None # NEW
executor_id: Optional[str] = None # NEW (e.g., "tauri:macbook-pro-seb", "python:worker-1")
estimated_credits: Optional[float] = None # NEW
actual_credits: Optional[float] = None # NEW
execution_metadata: dict[str, Any] = Field(default_factory=dict) # NEW
New Enums:
class ExecutionPreference(str, Enum):
"""How user prefers task to run"""
LOCAL = "local" # Force local
REMOTE = "remote" # Force remote
AUTO = "auto" # Router decides
COST_OPTIMIZED = "cost_optimized" # Prefer cheap (local)
PERFORMANCE_OPTIMIZED = "performance_optimized" # Prefer fast (remote)
class ExecutionLocation(str, Enum):
"""Where task actually runs"""
LOCAL = "local"
REMOTE = "remote"
HYBRID = "hybrid" # Split (e.g., local embedding + remote LLM)
New Models:
# credits.py
class TaskCreditRecord(BaseModel):
"""Audit record of credits used"""
task_id: UUID
space_id: UUID
interface_id: Optional[UUID]
action_name: str
base_credit: float
multiplier: float
final_credit: float
reason: str # "execution", "api_call", etc.
class SpaceQuota(BaseModel):
"""Billing quota for a space"""
space_id: UUID
monthly_limit: float
monthly_used: float
monthly_reset_date: date
weekly_limit: float
weekly_used: float
weekly_reset_date: date
@property
def monthly_remaining(self) -> float:
return max(0, self.monthly_limit - self.monthly_used)
@property
def weekly_remaining(self) -> float:
return max(0, self.weekly_limit - self.weekly_used)
STEP 3: TaskStore Abstract Base Class (2 days)
Files:
backend/tasks/store.py(new file)
Scope: S
Key Classes:
from abc import ABC, abstractmethod
from typing import Optional, List
class TaskStore(ABC):
"""Abstract task storage backend"""
@abstractmethod
async def create(self, task: Task) -> Task:
"""Persist a new task"""
pass
@abstractmethod
async def get(self, task_id: UUID) -> Optional[Task]:
"""Retrieve task by ID"""
pass
@abstractmethod
async def list(
self,
space_id: Optional[UUID] = None,
status: Optional[TaskStatus] = None,
limit: int = 50
) -> List[Task]:
"""List tasks with optional filters"""
pass
@abstractmethod
async def update(self, task_id: UUID, updates: TaskUpdate) -> Task:
"""Update task fields"""
pass
@abstractmethod
async def delete(self, task_id: UUID) -> bool:
"""Delete task"""
pass
@abstractmethod
async def list_pending(self, limit: int = 10) -> List[Task]:
"""List pending tasks for processing"""
pass
class PostgresTaskStore(TaskStore):
"""PostgreSQL implementation"""
def __init__(self, db):
self.db = db
async def create(self, task: Task) -> Task:
# ... implement using existing TaskService code ...
pass
# ... implement all abstract methods ...
Benefits:
- Clean separation of storage from business logic
- Easy to add
RedisTaskStorelater without changing TaskService/TaskProcessor - Future-proof for migration to other backends
STEP 4: TaskRouter Service (3 days)
Files:
backend/tasks/router.py(new file)
Scope: M
Key Logic:
class TaskRouter:
"""
Decides where a task should run (local vs. remote)
Takes into account: capability + user preference + quota
"""
def __init__(self, interface_manager, credits_service):
self.interface_manager = interface_manager
self.credits_service = credits_service
async def route_task(
self,
task: Task,
device_is_tauri: bool
) -> RoutingDecision:
"""
Returns: RoutingDecision(
execution_location: ExecutionLocation,
estimated_credits: float,
estimated_duration_seconds: float,
rationale: str
)
Rules:
1. If execution_preference='local' and device is Tauri → LOCAL
2. If execution_preference='remote' → REMOTE
3. If execution_preference='auto':
- Check if interface has LOCAL handler
- If yes: → LOCAL (cheaper)
- If no: → REMOTE
4. If execution_preference='cost_optimized':
- Prefer LOCAL → REMOTE (pick cheapest capable)
5. If execution_preference='performance_optimized':
- Prefer REMOTE → LOCAL (pick fastest)
6. Check quota:
- If monthly quota exceeded → BLOCKED (don't execute)
- If weekly soft limit approached → WARN (but allow)
"""
pass
Routing Decision Structure:
@dataclass
class RoutingDecision:
execution_location: ExecutionLocation
estimated_credits: float
estimated_duration_ms: float
executor_id: Optional[str] # "tauri:device-id" or "python:worker-1"
rationale: str # Human-readable explanation
quota_status: QuotaStatus # OK / WARNING / BLOCKED
quota_message: str # "120 credits remaining this week"
Example Flow:
Task: LLM chat (input_tokens=500, output_tokens=300)
Interface: LLM (supports both local + remote)
Device: Desktop with Tauri
User preference: cost_optimized
Router decision:
1. Has LOCAL handler? YES (can use local LLM)
2. cost_optimized preference? → Pick LOCAL (free)
3. Estimated credits:
- Local: 0 credits (free)
- Remote: 0.01 * (500+300) / 1000 = 0.008 credits
4. Pick LOCAL (cheaper)
5. Check quota: Space has 500/1000 monthly → OK
6. Return: RoutingDecision(
location=LOCAL,
credits=0,
duration=~2000ms,
rationale="local handler available + cost_optimized preference"
)
STEP 5: TaskCreditsService (3 days)
Files:
backend/tasks/credits_service.py(new file)
Scope: M
Key Methods:
class TaskCreditsService:
"""
Tracks and enforces credit usage
Charges before execution, updates after completion
"""
def __init__(self, db, interface_manager):
self.db = db
self.interface_manager = interface_manager
async def estimate_credits(
self,
interface_id: UUID,
action_name: str,
execution_location: ExecutionLocation,
params: dict # includes token count for LLM
) -> float:
"""
Calculate estimated credits for a task
LLM: 0.01 per 1000 tokens (base) * multiplier (location)
Other: base_credit per action * multiplier
"""
pass
async def charge_credits(
self,
task_id: UUID,
space_id: UUID,
interface_id: Optional[UUID],
action_name: str,
credits: float,
reason: str = "execution"
) -> None:
"""Record credit debit and update space quota"""
# Insert into task_credits
# Update space_quotas (monthly_used, weekly_used)
# Emit event for dashboard
pass
async def check_quota(
self,
space_id: UUID,
estimated_credits: float
) -> QuotaStatus:
"""
Returns: OK | WARNING | BLOCKED
BLOCKED if: monthly_used + estimated >= monthly_limit
WARNING if: weekly_used + estimated >= weekly_limit
OK otherwise
"""
pass
async def get_quota(self, space_id: UUID) -> SpaceQuota:
"""Get current quota for a space"""
pass
async def reset_quotas(self) -> None:
"""
Cron job (daily):
- Check if monthly_reset_date is today → reset monthly_used=0
- Check if weekly_reset_date is today → reset weekly_used=0
"""
pass
async def override_quota(
self,
space_id: UUID,
reason: str,
override_until: datetime
) -> None:
"""Admin override quota temporarily"""
pass
Credit Config (yaml or env):
# config/credits.yaml
integrations:
gmail:
send: 1.0
read: 0.5
draft: 0.3
llm:
chat: 0.01 # per 1000 tokens
embed: 0.005 # per 1000 tokens
complete: 0.01 # per 1000 tokens
local_embedding:
embed: 0 # free (runs locally)
google_calendar:
list_events: 0.1
create_event: 0.2
slack:
send_message: 0.5
read_messages: 0.3
# Execution location surcharge
execution_surcharge:
local: 0 # 1x = no surcharge
remote: 1.0 # 1x base
peak_hours: 1.2 # 1.2x during busy times (future)
STEP 6: Refactor TaskService to Use TaskStore (3 days)
Files:
backend/tasks/service.py(modify)backend/tasks/store.py(implement PostgresTaskStore)
Scope: M Dependencies: STEP 1, 2, 3
Change:
# BEFORE
class TaskService:
def __init__(self):
self.db = get_db()
# AFTER
class TaskService:
def __init__(self, store: TaskStore = None):
self.store = store or PostgresTaskStore(get_db())
self.db = get_db() # For backward compat, queries not via store
async def create_task(self, task_data: TaskCreate) -> Task:
task = Task(**task_data.model_dump())
created = await self.store.create(task)
# Emit event, etc.
return created
# Similar for get_task, list_tasks, update_task, delete_task
Key: Keep existing TaskService methods, just delegate to store.
STEP 7: Update TaskProcessor to Use Router + Credits (3 days)
Files:
backend/tasks/processor.py(modify)
Scope: M Dependencies: STEP 4, 5
Change Flow:
async def process_task(self, task: Task) -> None:
task_id = task.id
try:
# STEP 1: Route task (decide local vs. remote)
routing_decision = await self.task_router.route_task(
task=task,
device_is_tauri=... # from frontend session
)
# STEP 2: Check quota
if routing_decision.quota_status == QuotaStatus.BLOCKED:
await self.task_service.update_task_status(
task_id,
TaskStatus.BLOCKED,
blocked_reason="quota_exceeded_monthly",
blocked_data={
"monthly_limit": quota.monthly_limit,
"monthly_used": quota.monthly_used,
"estimated_credits": routing_decision.estimated_credits
}
)
return
if routing_decision.quota_status == QuotaStatus.WARNING:
# Emit warning event to frontend (show banner)
await self.event_bus.emit("quota.warning", {
"space_id": task.space_id,
"weekly_used": quota.weekly_used,
"weekly_limit": quota.weekly_limit
})
# STEP 3: Update task with routing decision
await self.task_service.update_task(
task.id,
TaskUpdate(
status=TaskStatus.RUNNING,
execution_location=routing_decision.execution_location,
executor_id=routing_decision.executor_id,
estimated_credits=routing_decision.estimated_credits,
execution_metadata={
"routing_rationale": routing_decision.rationale,
"routed_at": datetime.now(timezone.utc).isoformat()
}
)
)
# STEP 4: Charge estimated credits upfront
await self.credits_service.charge_credits(
task_id=task_id,
space_id=task.space_id,
interface_id=task.assigned_interface_id,
action_name=task.type,
credits=routing_decision.estimated_credits,
reason="task_execution"
)
# STEP 5: Execute task (same as before)
# ... existing execution logic ...
# STEP 6: Update actual credits (if different from estimate)
# E.g., LLM token count was lower than expected
actual_credits = calculate_actual_credits(...)
await self.task_service.update_task(
task.id,
TaskUpdate(
actual_credits=actual_credits,
status=TaskStatus.COMPLETED
)
)
# Emit credit adjustment if actual < estimated
if actual_credits < routing_decision.estimated_credits:
refund = routing_decision.estimated_credits - actual_credits
await self.credits_service.refund_credits(task_id, refund)
except Exception as e:
# ... existing error handling ...
STEP 8: Integrate Router + Credits into Startup (1 day)
Files:
backend/main.py(modify)backend/config.py(modify)
Scope: S Dependencies: STEP 4, 5
Add:
# main.py
from tasks.router import TaskRouter
from tasks.credits_service import TaskCreditsService
async def lifespan(app: FastAPI):
# Startup
app.state.task_router = TaskRouter(app.state.interface_manager, app.state.credits_service)
app.state.credits_service = TaskCreditsService(get_db(), app.state.interface_manager)
app.state.task_processor = TaskProcessor()
# ... start task processor, cron for quota reset ...
yield
# Shutdown
await app.state.task_processor.stop()
STEP 9: Frontend: Task Card Updates (2 days)
Files:
frontend/src/components/tasks/TaskDetail.tsx(modify)frontend/src/components/tasks/TaskCard.tsx(modify)frontend/src/hooks/useTasks.ts(modify)
Scope: S Dependencies: STEP 1 (DB schema)
Add Display:
// Show execution location
<div className="flex items-center gap-2">
{task.execution_location === 'local' && (
<>
<Zap className="w-4 h-4 text-green-500" />
<span className="text-sm">Running locally</span>
</>
)}
{task.execution_location === 'remote' && (
<>
<Cloud className="w-4 h-4 text-blue-500" />
<span className="text-sm">Running in cloud</span>
</>
)}
</div>
// Show credits
<div className="text-xs text-gray-500">
Estimated: {task.estimated_credits} credits
{task.actual_credits !== null && (
<> (Used: {task.actual_credits})</>
)}
</div>
STEP 10: Frontend: Quota Indicator + Settings (2 days)
Files:
frontend/src/components/settings/QuotaCard.tsx(new)frontend/src/pages/Settings.tsx(modify)frontend/src/store/settingsStore.ts(modify)frontend/src/hooks/useQuota.ts(new)
Scope: S
Show:
// Quota progress
<div>
<h3>Weekly Credits</h3>
<ProgressBar
value={quotaUsed.weekly}
max={quota.weekly_limit}
warning={0.8}
/>
<p>{quota.weekly_remaining} credits remaining</p>
</div>
// Monthly quota
<div>
<h3>Monthly Credits</h3>
<ProgressBar
value={quotaUsed.monthly}
max={quota.monthly_limit}
/>
<p>{quota.monthly_remaining} credits remaining</p>
</div>
STEP 11: API Endpoint: POST /api/tasks/estimate (1 day)
Files:
backend/api/tasks.py(modify)
Scope: S Dependencies: STEP 4, 5
New Route:
@router.post("/tasks/estimate")
async def estimate_task_cost(
request: TaskCreate,
current_user: AuthUser = Depends(get_current_user)
) -> RoutingDecision:
"""
Estimate credits and routing decision BEFORE executing
Frontend calls this to show user: "This will cost 5 credits and run in the cloud"
"""
task = Task(**request.model_dump())
routing = await task_router.route_task(task)
return routing
Frontend calls before submitting:
async function estimateTaskCost(task: TaskCreate) {
const response = await fetch('/api/tasks/estimate', {
method: 'POST',
body: JSON.stringify(task)
});
return response.json(); // RoutingDecision
}
STEP 12: Migration Script: Backfill Data (1 day)
Files:
backend/scripts/backfill_task_execution.py(new)
Scope: S
Script:
async def backfill_execution_data():
"""
One-time script to populate existing tasks with defaults
- Set execution_preference='auto' for all existing tasks
- Set execution_location='remote' (assumption: all ran remote before)
- Set executor_id='python:legacy-processor'
- Create default space quotas
"""
# Insert default quota for each space
# Update all tasks with defaults
# Log results
Run: python -m backend.scripts.backfill_task_execution
STEP 13: Integration Tests + Documentation (2 days)
Files:
backend/tests/test_task_router.py(new)backend/tests/test_credits_service.py(new)backend/tests/test_task_store.py(new)docs/api.md(update)docs/architecture.md(update)docs/features/2026-02-15-IMPLEMENTATION_PLAN.md(this file - mark complete)
Scope: M
Test Coverage:
# test_task_router.py
- Test routing decision for auto/cost_optimized/performance_optimized
- Test local handler detection
- Test quota blocking
- Test executor_id assignment
# test_credits_service.py
- Test credit estimation (base + multiplier)
- Test LLM token-based calculation
- Test quota enforcement (monthly hard, weekly soft)
- Test credit refund on actual vs. estimated
# test_task_store.py
- Test PostgresTaskStore CRUD
- Test list_pending, filtering
- Test data persistence
Docs:
- Add task routing section to architecture.md
- Add credits & quotas section to architecture.md
- Add /api/tasks/estimate endpoint to api.md
- Add quota management section to user guide
Effort Breakdown
| Step | Task | Effort | Duration |
|---|---|---|---|
| 1 | DB Migrations | S | 2 days |
| 2 | Pydantic Models | S | 1 day |
| 3 | TaskStore ABC | S | 2 days |
| 4 | TaskRouter Service | M | 3 days |
| 5 | TaskCreditsService | M | 3 days |
| 6 | Refactor TaskService | M | 3 days |
| 7 | Update TaskProcessor | M | 3 days |
| 8 | Integrate Router/Credits | S | 1 day |
| 9 | Frontend Task Cards | S | 2 days |
| 10 | Frontend Quota UI | S | 2 days |
| 11 | API Estimate Endpoint | S | 1 day |
| 12 | Migration Script | S | 1 day |
| 13 | Tests + Docs | M | 2 days |
| TOTAL | ~30 days (6 weeks) |
Testing Strategy
Unit Tests
- Router decision logic (all preferences, all scenarios)
- Credit calculations (base, multiplier, LLM tokens)
- Quota enforcement (hard, soft, refund)
- TaskStore CRUD operations
Integration Tests
- Task creation → routing → execution → credit charge → completion
- Quota exceeded scenario (task blocked)
- Quota warning scenario (task proceeds with warning)
- Router choosing local vs. remote based on capability
E2E Tests
- User creates task → sees estimate → confirms → watches execution badge change → sees credits deducted
- Quota warning banner appears when weekly limit approached
- Monthly quota resets on schedule
Performance Tests
- Router decision latency (<100ms)
- Credit calculation latency (<50ms)
- Quota queries indexed properly
Risks & Mitigation
| Risk | Mitigation |
|---|---|
| Credit calculation wrong | Start with simple flat-rate, validate with tests, iterate |
| Quota reset timing bugs | Cron job tested, manual reset available |
| TaskStore refactor breaks existing tests | Gradual refactoring, backwards-compat layer |
| Frontend quota UI confusing | User testing, clear copy + tooltips |
| Local routing decision fails | Fallback to remote if local execution fails |
Deliverables (at completion)
- ✅ Tasks track execution location (local/remote/hybrid)
- ✅ Tasks estimate and charge credits
- ✅ Spaces have monthly hard + weekly soft quotas
- ✅ User can choose execution preference (cost vs. performance)
- ✅ Frontend shows location + credits + quota status
- ✅ Storage abstraction ready for Redis/S3 backends
- ✅ Complete test coverage (>80%)
- ✅ Updated architecture docs
Future Extensions (Deferred)
- Redis queue backend (Phase 4 distributed workers)
- S3 archive backend (Phase 4 long-term storage)
- Usage reports & analytics (Phase 3e.5 dashboard)
- Predictions API (estimate time + cost before execution)
- Crypto integration (Phase 3m marketplace, credits → MorphCoin)
- Alerts & notifications (quota approaching, daily summary)
- Time-based pricing (peak hour surcharges)
- Team/enterprise quotas (shared vs. per-seat)