Skip to main content

Implementation Plan: Local/Remote Task Execution & Credits System

ARCHIVE — This document is historical reference only. It may contain outdated information. See docs/status.md for current project state.

Feature: Local/Remote Task Execution Tracking & Credits System Timeline: ~6-8 weeks (13 steps) Status: Approved – Ready to implement


Overview Map

┌─────────────────────────────────────────────────────────────┐
│ TaskRouter Service │
│ Evaluates: capability + execution_preference + quota │
│ Returns: location decision + estimated cost │
└──────────────────────┬──────────────────────────────────────┘

┌──────────────┼──────────────┐
▼ ▼ ▼
┌─────────┐ ┌────────────┐ ┌──────────────┐
│ LOCAL │ │ REMOTE │ │ HYBRID │
│ (Tauri) │ │ (Python) │ │ (Both) │
└─────────┘ └────────────┘ └──────────────┘
│ │ │
└──────────────┼──────────────┘

┌──────────────┴──────────────┐
▼ ▼
┌──────────────────┐ ┌────────────────────┐
│ TaskProcessor │ │ TaskCreditsService │
│ (Executes) │ │ (Track + Enforce) │
└──────────────────┘ └────────────────────┘
│ │
└─────────────┬───────────────┘

┌─────────────┴─────────────┐
▼ ▼
┌──────────────┐ ┌──────────────────────┐
│ task_credits │ │ space_quotas table │
│ (audit) │ │ (monthly + weekly) │
└──────────────┘ └──────────────────────┘

└─────► TaskStore ABC ─────► PostgresTaskStore

└─► (Future) RedisTaskStore
S3TaskStore

Step-by-Step Implementation

STEP 1: Database Migrations (2 days)

Files: supabase/migrations/019_task_execution_credits.sql Scope: M Dependencies: None

Create/Modify:

-- Add execution tracking columns to tasks table
ALTER TABLE tasks ADD COLUMN IF NOT EXISTS execution_preference TEXT DEFAULT 'auto'
CHECK(execution_preference IN ('local', 'remote', 'auto', 'cost_optimized', 'performance_optimized'));
ALTER TABLE tasks ADD COLUMN IF NOT EXISTS execution_location TEXT
CHECK(execution_location IN ('local', 'remote', 'hybrid', null));
ALTER TABLE tasks ADD COLUMN IF NOT EXISTS executor_id TEXT; -- Identifies which worker (tauri:device-id or python:backend-1)
ALTER TABLE tasks ADD COLUMN IF NOT EXISTS estimated_credits NUMERIC(10,2);
ALTER TABLE tasks ADD COLUMN IF NOT EXISTS actual_credits NUMERIC(10,2);
ALTER TABLE tasks ADD COLUMN IF NOT EXISTS execution_metadata JSONB DEFAULT '{}';

-- New table: task_credits (audit trail)
CREATE TABLE task_credits (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
task_id UUID NOT NULL REFERENCES tasks(id) ON DELETE CASCADE,
space_id UUID NOT NULL REFERENCES spaces(id) ON DELETE CASCADE,
group_id UUID NOT NULL REFERENCES groups(id) ON DELETE CASCADE,
interface_id UUID, -- which integration charged
action_name TEXT, -- which action (send, read, chat, etc.)
base_credit NUMERIC(10,3) NOT NULL,
multiplier NUMERIC(5,2) DEFAULT 1.0, -- location surcharge or token-based
final_credit NUMERIC(10,3) NOT NULL,
reason TEXT, -- "execution", "retry", "api_call", etc.
created_at TIMESTAMPTZ DEFAULT NOW(),

CONSTRAINT positive_credits CHECK(final_credit >= 0)
);

-- New table: space_quotas (billing limits)
CREATE TABLE space_quotas (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
space_id UUID NOT NULL UNIQUE REFERENCES spaces(id) ON DELETE CASCADE,
group_id UUID NOT NULL REFERENCES groups(id) ON DELETE CASCADE,

-- Hard limit (monthly)
monthly_limit NUMERIC(10,2) NOT NULL DEFAULT 1000.0,
monthly_used NUMERIC(10,2) NOT NULL DEFAULT 0.0,
monthly_reset_date DATE NOT NULL DEFAULT CURRENT_DATE,

-- Soft limit (weekly)
weekly_limit NUMERIC(10,2) NOT NULL DEFAULT 250.0,
weekly_used NUMERIC(10,2) NOT NULL DEFAULT 0.0,
weekly_reset_date DATE NOT NULL DEFAULT CURRENT_DATE,

created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),

CONSTRAINT positive_limits CHECK(monthly_limit > 0 AND weekly_limit > 0)
);

-- Indexes
CREATE INDEX idx_task_credits_task ON task_credits(task_id);
CREATE INDEX idx_task_credits_space ON task_credits(space_id);
CREATE INDEX idx_task_credits_interface ON task_credits(interface_id);
CREATE INDEX idx_task_credits_created ON task_credits(created_at DESC);
CREATE INDEX idx_space_quotas_space ON space_quotas(space_id);
CREATE INDEX idx_tasks_execution_location ON tasks(execution_location);
CREATE INDEX idx_tasks_execution_preference ON tasks(execution_preference);

Tests: Migration test (verify columns exist, indexes created)


STEP 2: Pydantic Models (1 day)

Files:

  • backend/tasks/models.py (extend existing)
  • backend/tasks/credits.py (new file)

Scope: S

Add to TaskBase/Task:

class TaskBase(BaseModel):
# ... existing fields ...
execution_preference: ExecutionPreference = ExecutionPreference.AUTO # NEW

class Task(TaskBase):
# ... existing fields ...
execution_location: Optional[ExecutionLocation] = None # NEW
executor_id: Optional[str] = None # NEW (e.g., "tauri:macbook-pro-seb", "python:worker-1")
estimated_credits: Optional[float] = None # NEW
actual_credits: Optional[float] = None # NEW
execution_metadata: dict[str, Any] = Field(default_factory=dict) # NEW

New Enums:

class ExecutionPreference(str, Enum):
"""How user prefers task to run"""
LOCAL = "local" # Force local
REMOTE = "remote" # Force remote
AUTO = "auto" # Router decides
COST_OPTIMIZED = "cost_optimized" # Prefer cheap (local)
PERFORMANCE_OPTIMIZED = "performance_optimized" # Prefer fast (remote)

class ExecutionLocation(str, Enum):
"""Where task actually runs"""
LOCAL = "local"
REMOTE = "remote"
HYBRID = "hybrid" # Split (e.g., local embedding + remote LLM)

New Models:

# credits.py
class TaskCreditRecord(BaseModel):
"""Audit record of credits used"""
task_id: UUID
space_id: UUID
interface_id: Optional[UUID]
action_name: str
base_credit: float
multiplier: float
final_credit: float
reason: str # "execution", "api_call", etc.

class SpaceQuota(BaseModel):
"""Billing quota for a space"""
space_id: UUID
monthly_limit: float
monthly_used: float
monthly_reset_date: date
weekly_limit: float
weekly_used: float
weekly_reset_date: date

@property
def monthly_remaining(self) -> float:
return max(0, self.monthly_limit - self.monthly_used)

@property
def weekly_remaining(self) -> float:
return max(0, self.weekly_limit - self.weekly_used)

STEP 3: TaskStore Abstract Base Class (2 days)

Files:

  • backend/tasks/store.py (new file)

Scope: S

Key Classes:

from abc import ABC, abstractmethod
from typing import Optional, List

class TaskStore(ABC):
"""Abstract task storage backend"""

@abstractmethod
async def create(self, task: Task) -> Task:
"""Persist a new task"""
pass

@abstractmethod
async def get(self, task_id: UUID) -> Optional[Task]:
"""Retrieve task by ID"""
pass

@abstractmethod
async def list(
self,
space_id: Optional[UUID] = None,
status: Optional[TaskStatus] = None,
limit: int = 50
) -> List[Task]:
"""List tasks with optional filters"""
pass

@abstractmethod
async def update(self, task_id: UUID, updates: TaskUpdate) -> Task:
"""Update task fields"""
pass

@abstractmethod
async def delete(self, task_id: UUID) -> bool:
"""Delete task"""
pass

@abstractmethod
async def list_pending(self, limit: int = 10) -> List[Task]:
"""List pending tasks for processing"""
pass


class PostgresTaskStore(TaskStore):
"""PostgreSQL implementation"""

def __init__(self, db):
self.db = db

async def create(self, task: Task) -> Task:
# ... implement using existing TaskService code ...
pass

# ... implement all abstract methods ...

Benefits:

  • Clean separation of storage from business logic
  • Easy to add RedisTaskStore later without changing TaskService/TaskProcessor
  • Future-proof for migration to other backends

STEP 4: TaskRouter Service (3 days)

Files:

  • backend/tasks/router.py (new file)

Scope: M

Key Logic:

class TaskRouter:
"""
Decides where a task should run (local vs. remote)
Takes into account: capability + user preference + quota
"""

def __init__(self, interface_manager, credits_service):
self.interface_manager = interface_manager
self.credits_service = credits_service

async def route_task(
self,
task: Task,
device_is_tauri: bool
) -> RoutingDecision:
"""
Returns: RoutingDecision(
execution_location: ExecutionLocation,
estimated_credits: float,
estimated_duration_seconds: float,
rationale: str
)

Rules:
1. If execution_preference='local' and device is Tauri → LOCAL
2. If execution_preference='remote' → REMOTE
3. If execution_preference='auto':
- Check if interface has LOCAL handler
- If yes: → LOCAL (cheaper)
- If no: → REMOTE
4. If execution_preference='cost_optimized':
- Prefer LOCAL → REMOTE (pick cheapest capable)
5. If execution_preference='performance_optimized':
- Prefer REMOTE → LOCAL (pick fastest)
6. Check quota:
- If monthly quota exceeded → BLOCKED (don't execute)
- If weekly soft limit approached → WARN (but allow)
"""
pass

Routing Decision Structure:

@dataclass
class RoutingDecision:
execution_location: ExecutionLocation
estimated_credits: float
estimated_duration_ms: float
executor_id: Optional[str] # "tauri:device-id" or "python:worker-1"
rationale: str # Human-readable explanation
quota_status: QuotaStatus # OK / WARNING / BLOCKED
quota_message: str # "120 credits remaining this week"

Example Flow:

Task: LLM chat (input_tokens=500, output_tokens=300)
Interface: LLM (supports both local + remote)
Device: Desktop with Tauri
User preference: cost_optimized

Router decision:
1. Has LOCAL handler? YES (can use local LLM)
2. cost_optimized preference? → Pick LOCAL (free)
3. Estimated credits:
- Local: 0 credits (free)
- Remote: 0.01 * (500+300) / 1000 = 0.008 credits
4. Pick LOCAL (cheaper)
5. Check quota: Space has 500/1000 monthly → OK
6. Return: RoutingDecision(
location=LOCAL,
credits=0,
duration=~2000ms,
rationale="local handler available + cost_optimized preference"
)

STEP 5: TaskCreditsService (3 days)

Files:

  • backend/tasks/credits_service.py (new file)

Scope: M

Key Methods:

class TaskCreditsService:
"""
Tracks and enforces credit usage
Charges before execution, updates after completion
"""

def __init__(self, db, interface_manager):
self.db = db
self.interface_manager = interface_manager

async def estimate_credits(
self,
interface_id: UUID,
action_name: str,
execution_location: ExecutionLocation,
params: dict # includes token count for LLM
) -> float:
"""
Calculate estimated credits for a task
LLM: 0.01 per 1000 tokens (base) * multiplier (location)
Other: base_credit per action * multiplier
"""
pass

async def charge_credits(
self,
task_id: UUID,
space_id: UUID,
interface_id: Optional[UUID],
action_name: str,
credits: float,
reason: str = "execution"
) -> None:
"""Record credit debit and update space quota"""
# Insert into task_credits
# Update space_quotas (monthly_used, weekly_used)
# Emit event for dashboard
pass

async def check_quota(
self,
space_id: UUID,
estimated_credits: float
) -> QuotaStatus:
"""
Returns: OK | WARNING | BLOCKED

BLOCKED if: monthly_used + estimated >= monthly_limit
WARNING if: weekly_used + estimated >= weekly_limit
OK otherwise
"""
pass

async def get_quota(self, space_id: UUID) -> SpaceQuota:
"""Get current quota for a space"""
pass

async def reset_quotas(self) -> None:
"""
Cron job (daily):
- Check if monthly_reset_date is today → reset monthly_used=0
- Check if weekly_reset_date is today → reset weekly_used=0
"""
pass

async def override_quota(
self,
space_id: UUID,
reason: str,
override_until: datetime
) -> None:
"""Admin override quota temporarily"""
pass

Credit Config (yaml or env):

# config/credits.yaml
integrations:
gmail:
send: 1.0
read: 0.5
draft: 0.3

llm:
chat: 0.01 # per 1000 tokens
embed: 0.005 # per 1000 tokens
complete: 0.01 # per 1000 tokens

local_embedding:
embed: 0 # free (runs locally)

google_calendar:
list_events: 0.1
create_event: 0.2

slack:
send_message: 0.5
read_messages: 0.3

# Execution location surcharge
execution_surcharge:
local: 0 # 1x = no surcharge
remote: 1.0 # 1x base
peak_hours: 1.2 # 1.2x during busy times (future)

STEP 6: Refactor TaskService to Use TaskStore (3 days)

Files:

  • backend/tasks/service.py (modify)
  • backend/tasks/store.py (implement PostgresTaskStore)

Scope: M Dependencies: STEP 1, 2, 3

Change:

# BEFORE
class TaskService:
def __init__(self):
self.db = get_db()

# AFTER
class TaskService:
def __init__(self, store: TaskStore = None):
self.store = store or PostgresTaskStore(get_db())
self.db = get_db() # For backward compat, queries not via store

async def create_task(self, task_data: TaskCreate) -> Task:
task = Task(**task_data.model_dump())
created = await self.store.create(task)
# Emit event, etc.
return created

# Similar for get_task, list_tasks, update_task, delete_task

Key: Keep existing TaskService methods, just delegate to store.


STEP 7: Update TaskProcessor to Use Router + Credits (3 days)

Files:

  • backend/tasks/processor.py (modify)

Scope: M Dependencies: STEP 4, 5

Change Flow:

async def process_task(self, task: Task) -> None:
task_id = task.id

try:
# STEP 1: Route task (decide local vs. remote)
routing_decision = await self.task_router.route_task(
task=task,
device_is_tauri=... # from frontend session
)

# STEP 2: Check quota
if routing_decision.quota_status == QuotaStatus.BLOCKED:
await self.task_service.update_task_status(
task_id,
TaskStatus.BLOCKED,
blocked_reason="quota_exceeded_monthly",
blocked_data={
"monthly_limit": quota.monthly_limit,
"monthly_used": quota.monthly_used,
"estimated_credits": routing_decision.estimated_credits
}
)
return

if routing_decision.quota_status == QuotaStatus.WARNING:
# Emit warning event to frontend (show banner)
await self.event_bus.emit("quota.warning", {
"space_id": task.space_id,
"weekly_used": quota.weekly_used,
"weekly_limit": quota.weekly_limit
})

# STEP 3: Update task with routing decision
await self.task_service.update_task(
task.id,
TaskUpdate(
status=TaskStatus.RUNNING,
execution_location=routing_decision.execution_location,
executor_id=routing_decision.executor_id,
estimated_credits=routing_decision.estimated_credits,
execution_metadata={
"routing_rationale": routing_decision.rationale,
"routed_at": datetime.now(timezone.utc).isoformat()
}
)
)

# STEP 4: Charge estimated credits upfront
await self.credits_service.charge_credits(
task_id=task_id,
space_id=task.space_id,
interface_id=task.assigned_interface_id,
action_name=task.type,
credits=routing_decision.estimated_credits,
reason="task_execution"
)

# STEP 5: Execute task (same as before)
# ... existing execution logic ...

# STEP 6: Update actual credits (if different from estimate)
# E.g., LLM token count was lower than expected
actual_credits = calculate_actual_credits(...)
await self.task_service.update_task(
task.id,
TaskUpdate(
actual_credits=actual_credits,
status=TaskStatus.COMPLETED
)
)

# Emit credit adjustment if actual < estimated
if actual_credits < routing_decision.estimated_credits:
refund = routing_decision.estimated_credits - actual_credits
await self.credits_service.refund_credits(task_id, refund)

except Exception as e:
# ... existing error handling ...

STEP 8: Integrate Router + Credits into Startup (1 day)

Files:

  • backend/main.py (modify)
  • backend/config.py (modify)

Scope: S Dependencies: STEP 4, 5

Add:

# main.py
from tasks.router import TaskRouter
from tasks.credits_service import TaskCreditsService

async def lifespan(app: FastAPI):
# Startup
app.state.task_router = TaskRouter(app.state.interface_manager, app.state.credits_service)
app.state.credits_service = TaskCreditsService(get_db(), app.state.interface_manager)
app.state.task_processor = TaskProcessor()
# ... start task processor, cron for quota reset ...

yield

# Shutdown
await app.state.task_processor.stop()

STEP 9: Frontend: Task Card Updates (2 days)

Files:

  • frontend/src/components/tasks/TaskDetail.tsx (modify)
  • frontend/src/components/tasks/TaskCard.tsx (modify)
  • frontend/src/hooks/useTasks.ts (modify)

Scope: S Dependencies: STEP 1 (DB schema)

Add Display:

// Show execution location
<div className="flex items-center gap-2">
{task.execution_location === 'local' && (
<>
<Zap className="w-4 h-4 text-green-500" />
<span className="text-sm">Running locally</span>
</>
)}
{task.execution_location === 'remote' && (
<>
<Cloud className="w-4 h-4 text-blue-500" />
<span className="text-sm">Running in cloud</span>
</>
)}
</div>

// Show credits
<div className="text-xs text-gray-500">
Estimated: {task.estimated_credits} credits
{task.actual_credits !== null && (
<> (Used: {task.actual_credits})</>
)}
</div>

STEP 10: Frontend: Quota Indicator + Settings (2 days)

Files:

  • frontend/src/components/settings/QuotaCard.tsx (new)
  • frontend/src/pages/Settings.tsx (modify)
  • frontend/src/store/settingsStore.ts (modify)
  • frontend/src/hooks/useQuota.ts (new)

Scope: S

Show:

// Quota progress
<div>
<h3>Weekly Credits</h3>
<ProgressBar
value={quotaUsed.weekly}
max={quota.weekly_limit}
warning={0.8}
/>
<p>{quota.weekly_remaining} credits remaining</p>
</div>

// Monthly quota
<div>
<h3>Monthly Credits</h3>
<ProgressBar
value={quotaUsed.monthly}
max={quota.monthly_limit}
/>
<p>{quota.monthly_remaining} credits remaining</p>
</div>

STEP 11: API Endpoint: POST /api/tasks/estimate (1 day)

Files:

  • backend/api/tasks.py (modify)

Scope: S Dependencies: STEP 4, 5

New Route:

@router.post("/tasks/estimate")
async def estimate_task_cost(
request: TaskCreate,
current_user: AuthUser = Depends(get_current_user)
) -> RoutingDecision:
"""
Estimate credits and routing decision BEFORE executing
Frontend calls this to show user: "This will cost 5 credits and run in the cloud"
"""
task = Task(**request.model_dump())
routing = await task_router.route_task(task)
return routing

Frontend calls before submitting:

async function estimateTaskCost(task: TaskCreate) {
const response = await fetch('/api/tasks/estimate', {
method: 'POST',
body: JSON.stringify(task)
});
return response.json(); // RoutingDecision
}

STEP 12: Migration Script: Backfill Data (1 day)

Files:

  • backend/scripts/backfill_task_execution.py (new)

Scope: S

Script:

async def backfill_execution_data():
"""
One-time script to populate existing tasks with defaults
- Set execution_preference='auto' for all existing tasks
- Set execution_location='remote' (assumption: all ran remote before)
- Set executor_id='python:legacy-processor'
- Create default space quotas
"""
# Insert default quota for each space
# Update all tasks with defaults
# Log results

Run: python -m backend.scripts.backfill_task_execution


STEP 13: Integration Tests + Documentation (2 days)

Files:

  • backend/tests/test_task_router.py (new)
  • backend/tests/test_credits_service.py (new)
  • backend/tests/test_task_store.py (new)
  • docs/api.md (update)
  • docs/architecture.md (update)
  • docs/features/2026-02-15-IMPLEMENTATION_PLAN.md (this file - mark complete)

Scope: M

Test Coverage:

# test_task_router.py
- Test routing decision for auto/cost_optimized/performance_optimized
- Test local handler detection
- Test quota blocking
- Test executor_id assignment

# test_credits_service.py
- Test credit estimation (base + multiplier)
- Test LLM token-based calculation
- Test quota enforcement (monthly hard, weekly soft)
- Test credit refund on actual vs. estimated

# test_task_store.py
- Test PostgresTaskStore CRUD
- Test list_pending, filtering
- Test data persistence

Docs:

  • Add task routing section to architecture.md
  • Add credits & quotas section to architecture.md
  • Add /api/tasks/estimate endpoint to api.md
  • Add quota management section to user guide

Effort Breakdown

StepTaskEffortDuration
1DB MigrationsS2 days
2Pydantic ModelsS1 day
3TaskStore ABCS2 days
4TaskRouter ServiceM3 days
5TaskCreditsServiceM3 days
6Refactor TaskServiceM3 days
7Update TaskProcessorM3 days
8Integrate Router/CreditsS1 day
9Frontend Task CardsS2 days
10Frontend Quota UIS2 days
11API Estimate EndpointS1 day
12Migration ScriptS1 day
13Tests + DocsM2 days
TOTAL~30 days (6 weeks)

Testing Strategy

Unit Tests

  • Router decision logic (all preferences, all scenarios)
  • Credit calculations (base, multiplier, LLM tokens)
  • Quota enforcement (hard, soft, refund)
  • TaskStore CRUD operations

Integration Tests

  • Task creation → routing → execution → credit charge → completion
  • Quota exceeded scenario (task blocked)
  • Quota warning scenario (task proceeds with warning)
  • Router choosing local vs. remote based on capability

E2E Tests

  • User creates task → sees estimate → confirms → watches execution badge change → sees credits deducted
  • Quota warning banner appears when weekly limit approached
  • Monthly quota resets on schedule

Performance Tests

  • Router decision latency (<100ms)
  • Credit calculation latency (<50ms)
  • Quota queries indexed properly

Risks & Mitigation

RiskMitigation
Credit calculation wrongStart with simple flat-rate, validate with tests, iterate
Quota reset timing bugsCron job tested, manual reset available
TaskStore refactor breaks existing testsGradual refactoring, backwards-compat layer
Frontend quota UI confusingUser testing, clear copy + tooltips
Local routing decision failsFallback to remote if local execution fails

Deliverables (at completion)

  • ✅ Tasks track execution location (local/remote/hybrid)
  • ✅ Tasks estimate and charge credits
  • ✅ Spaces have monthly hard + weekly soft quotas
  • ✅ User can choose execution preference (cost vs. performance)
  • ✅ Frontend shows location + credits + quota status
  • ✅ Storage abstraction ready for Redis/S3 backends
  • ✅ Complete test coverage (>80%)
  • ✅ Updated architecture docs

Future Extensions (Deferred)

  • Redis queue backend (Phase 4 distributed workers)
  • S3 archive backend (Phase 4 long-term storage)
  • Usage reports & analytics (Phase 3e.5 dashboard)
  • Predictions API (estimate time + cost before execution)
  • Crypto integration (Phase 3m marketplace, credits → MorphCoin)
  • Alerts & notifications (quota approaching, daily summary)
  • Time-based pricing (peak hour surcharges)
  • Team/enterprise quotas (shared vs. per-seat)