IndexerIntegration — Unified Vector Indexing
Status: Design (V1.0) Replaces: Scattered embed→store in 5 call sites Depends on: EventBus, VectorStore, BaseInterface, InterfaceManager
The Problem
Vector indexing in Morphee is copy-pasted across 5 locations, each wiring the same pattern manually:
| Site | File | memory_type | What it indexes |
|---|---|---|---|
_index_skill | skills/service.py | skill_index | Skill name + description |
summarize_conversation | memory/summarizer.py | fact/preference/event | Extracted conversation facts |
save_conversation_summary | memory/summarizer.py | conversation_summary | Full conversation summaries |
MemoryIntegration._store | integrations/memory.py | fact/preference/event/note | AI-triggered memory storage |
MemoryIntegration._correct | integrations/memory.py | (existing type) | Corrected memories (GDPR Art. 16) |
Every site calls get_embedding_manager() → get_provider() → provider.embed(text) → VectorStore().insert(). There is no shared abstraction.
This creates three problems:
-
Duplication. Five copy-paste sites means five places to update when we change embedding providers, add encryption, or modify the storage schema.
-
Single modality. Every indexer embeds text with the same model. When we add image indexing (CLIP, V1.3) or audio indexing (Whisper, V1.5), we need a different embedding pipeline — but the current pattern has no extension point.
-
Full tool catalog bloat.
actions_to_anthropic_tools()collects ALL actions from ALL registered integrations and sends them to Claude on every turn. With 15+ integrations and growing, that's 50+ tools = thousands of tokens per LLM call. The system has no way to pre-filter which tools are relevant to a given query.
The Insight
Indexing is event subscription. Services already emit events (skill.created, conversation.message_added, git.committed). Indexers should react to those events — not be scattered inside the services that emit them.
And indexers don't just recall memory — they pre-filter the entire Integration catalog for the LLM.
Architecture
Events
│
┌───────────────────────┼───────────────────────┐
│ │ │
▼ ▼ ▼
┌──────────┐ ┌──────────────┐ ┌──────────────┐
│ Text │ │ Integration │ │ Git │
│ Indexer │ │ Indexer │ │ Indexer │
└────┬─────┘ └──────┬───────┘ └──────┬───────┘
│ │ │
▼ ▼ ▼
┌──────────────────────────────────────────────────────────┐
│ VectorStore (pgvector) │
└──────────────────────────┬───────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────┐
│ VectorRouter │
│ │
│ User message → embed → fan-out across ALL indexers: │
│ - TextIndexer: fact/preference/event/summary │
│ - IntegrationIndexer: which tools match this query? │
│ - GitIndexer: .morph/ file changes │
│ │
│ Best hit wins → RouteDecision │
└──────────────────────────────────────────────────────────┘
Each indexer is a BaseInterface subclass — an Integration like everything else in Morphee. It subscribes to events in setup(), embeds content when events fire, and responds to search queries from the VectorRouter.
Two Killer Use Cases
A. Memory Indexing (unify what exists today)
Today's 5 scattered embed→store sites become a single TextIndexer that subscribes to events:
class TextIndexer(IndexerIntegration):
name = "indexer.text"
index_events = [
"skill.created", "skill.updated", "skill.deleted",
"conversation.summarized",
"memory.stored", "memory.corrected",
"canvas.component_placed", "canvas.component_dismissed",
]
managed_types = ["skill_index", "fact", "preference", "event",
"conversation_summary", "note", "canvas_component"]
When skill.created fires, the TextIndexer calls extract_content(event) → embed() → store(). The skill service no longer needs to know anything about embeddings.
Result: One indexer replaces five copy-paste sites. Adding a new indexed type means adding one event handler, not threading embed logic through another service.
B. Integration/Tool Indexing (NEW — the big win)
This is the novel part. Today:
# chat/tools.py — actions_to_anthropic_tools()
for interface in all_interfaces:
for action in interface.get_actions():
tools.append(build_tool_schema(interface, action))
# → ALL tools sent to Claude. Every turn. ~50+ tools = thousands of tokens.
With IntegrationIndexer:
class IntegrationIndexer(IndexerIntegration):
name = "indexer.integrations"
index_events = [
"interface.registered", "interface.unregistered",
"interface.action_added", "interface.action_removed",
]
managed_types = ["integration_index"]
async def extract_content(self, event_type, data):
# Embed: "gmail — send: Send an email to one or more recipients"
return f"{data['interface_name']} — {data['action_name']}: {data['action_description']}"
At startup (and whenever integrations change), the IntegrationIndexer embeds every registered action. When a user sends a message:
- VectorRouter embeds the user message
- Queries IntegrationIndexer: "Which actions match this query?" → top-N results
- Only those N actions are injected into the LLM's tool list
- LLM sees 3-5 relevant tools instead of 50+
Token impact: With 50 tools averaging ~100 tokens each (name + description + parameter schema), that's ~5,000 tokens per turn. Pre-filtering to 5 tools = ~500 tokens. ~4,500 tokens saved per LLM call.
The existing skill_index is a special case of this. Today, skills are indexed separately from other integrations. The IntegrationIndexer generalizes this: skills, built-in integrations, and WASM extensions are all indexed the same way.
Before:
skill_index (skills/service.py) ← special case
+ ALL other tools sent blindly ← no filtering
After:
integration_index (IntegrationIndexer) ← everything indexed
VectorRouter picks top-N ← per-query filtering
The IndexerIntegration Contract
class IndexerIntegration(BaseInterface):
"""Base class for all vector indexers.
Subclasses declare what events they listen to and what memory_types
they manage. The base class handles the embed→store pipeline.
Override extract_content() at minimum. Override embed() for non-text
modalities (CLIP, Whisper). Override store() for non-pgvector backends.
"""
# --- Subclass declarations ---
index_events: list[str] = []
"""Events this indexer subscribes to. Wired automatically in setup()."""
managed_types: list[str] = []
"""memory_type values this indexer owns. Used by VectorRouter for fan-out."""
# --- Core pipeline (override what's different) ---
async def extract_content(self, event_type: str, data: dict) -> Optional[str]:
"""Transform an event into indexable text. Return None to skip indexing.
This is the main method subclasses implement.
"""
raise NotImplementedError
async def embed(self, content: str) -> list[float]:
"""Default: text embedding via EmbeddingManager.
Override for CLIP (images), Whisper (audio), or custom models."""
provider = await get_embedding_manager().get_provider()
return await provider.embed(content)
async def store(self, embedding: list[float], content: str,
memory_type: str, metadata: dict,
group_id: UUID, space_id: Optional[UUID] = None,
scope: str = "group") -> Optional[UUID]:
"""Default: pgvector via VectorStore. Override for LanceDB or custom."""
store = VectorStore()
return await store.insert(
embedding=embedding, content=content,
memory_type=memory_type, scope=scope,
group_id=group_id, space_id=space_id,
metadata=metadata,
)
async def delete(self, content: str, group_id: UUID,
memory_type: str) -> bool:
"""Default: delete by content match. Override for custom cleanup."""
store = VectorStore()
return await store.delete_by_content(
content=content, group_id=group_id, memory_type=memory_type
)
async def search(self, query_embedding: list[float],
group_id: UUID, space_id: Optional[UUID] = None,
limit: int = 5) -> list[dict]:
"""Called by VectorRouter during fan-out. Searches this indexer's managed_types."""
store = VectorStore()
results = []
for mt in self.managed_types:
hits = await store.search(
query_embedding=query_embedding, memory_type=mt,
group_id=group_id, space_id=space_id, limit=limit,
)
results.extend(hits)
results.sort(key=lambda r: r["similarity"], reverse=True)
return results[:limit]
# --- Lifecycle (automatic wiring) ---
async def setup(self):
"""Subscribe to declared events. Subclasses can override to add setup logic."""
for event_pattern in self.index_events:
await self.subscribe(event_pattern, self._on_event)
async def _on_event(self, event_type: str, data: dict):
"""Internal handler. Runs extract → embed → store pipeline."""
content = await self.extract_content(event_type, data)
if content is None:
return
embedding = await self.embed(content)
metadata = data.get("metadata", {})
memory_type = data.get("memory_type", self.managed_types[0])
group_id = UUID(data["group_id"])
space_id = UUID(data["space_id"]) if data.get("space_id") else None
await self.store(embedding, content, memory_type, metadata,
group_id, space_id)
Design principle: Subclasses only override what's different. A text indexer overrides extract_content(). An image indexer overrides extract_content() + embed(). A LanceDB-backed indexer overrides store() + search().
Built-in Indexers
TextIndexer (V1.0)
Replaces all 5 existing embed→store sites. Handles text-based memories: facts, preferences, events, conversation summaries, notes, canvas components.
class TextIndexer(IndexerIntegration):
name = "indexer.text"
index_events = [
"memory.stored", "memory.corrected", "memory.deleted",
"conversation.summarized",
"skill.created", "skill.updated", "skill.deleted",
"canvas.component_placed", "canvas.component_dismissed",
]
managed_types = ["fact", "preference", "event", "note",
"conversation_summary", "skill_index",
"canvas_component"]
async def extract_content(self, event_type, data):
if event_type.startswith("skill."):
return f"{data['skill_name']}. {data.get('skill_description', '')}"
if event_type == "memory.deleted":
await self.delete(data["content"], UUID(data["group_id"]),
data["memory_type"])
return None # skip indexing
return data.get("content") or data.get("summary")
IntegrationIndexer (V1.0)
Indexes all registered integrations and their actions for tool pre-filtering. This is the novel contribution.
class IntegrationIndexer(IndexerIntegration):
name = "indexer.integrations"
index_events = [
"interface.registered", "interface.unregistered",
]
managed_types = ["integration_index"]
async def extract_content(self, event_type, data):
if event_type == "interface.unregistered":
# Delete all entries for this interface
for action_name in data.get("action_names", []):
key = f"{data['interface_name']}__{action_name}"
await self.delete(key, UUID(data["group_id"]),
"integration_index")
return None
# interface.registered — index each action separately
actions = data.get("actions", [])
for action in actions:
content = f"{data['interface_name']}__{action['name']}"
text = f"{data['interface_name']} — {action['name']}: {action['description']}"
embedding = await self.embed(text)
await self.store(
embedding=embedding, content=content,
memory_type="integration_index",
metadata={
"interface_name": data["interface_name"],
"action_name": action["name"],
"action_description": action["description"],
"has_required_params": action.get("has_required_params", False),
"ai_access": action.get("ai_access", "execute"),
},
group_id=UUID(data.get("group_id", "00000000-0000-0000-0000-000000000000")),
)
return None # already stored per-action above
async def reindex_all(self, interface_manager):
"""Called at startup. Indexes all currently registered integrations."""
for interface in interface_manager.interfaces.values():
defn = interface.get_definition()
for action in defn.actions:
content = f"{interface.name}__{action.name}"
text = f"{interface.name} — {action.name}: {action.description}"
embedding = await self.embed(text)
await self.store(
embedding=embedding, content=content,
memory_type="integration_index",
metadata={
"interface_name": interface.name,
"action_name": action.name,
"action_description": action.description,
},
group_id=UUID("00000000-0000-0000-0000-000000000000"), # global
)
Startup flow:
InterfaceManagerregisters all integrations (existing)IntegrationIndexer.reindex_all(interface_manager)— embeds every action (new)TextIndexerwires event subscriptions (replacesreindex_all_skills())
Query flow:
- User sends message
- VectorRouter embeds the message
- Queries IntegrationIndexer: top-5 actions by similarity
actions_to_anthropic_tools()only includes those 5 actions (instead of all 50+)- LLM responds with tool call from the pre-filtered set
- If no good match (similarity < threshold), fall back to full catalog
GitIndexer (V1.0)
Indexes .morph/ file changes for "What changed?" queries. Subscribes to OpenMorph write events.
class GitIndexer(IndexerIntegration):
name = "indexer.git"
index_events = [
"core.*.morph_written", # any integration writing to .morph/
"git.committed", # direct git commits
]
managed_types = ["git_change"]
async def extract_content(self, event_type, data):
# "tasks/task-abc123.yaml updated: Buy groceries for dinner"
return f"{data['path']} {data.get('action', 'updated')}: {data.get('summary', '')}"
ImageIndexer (V1.3)
Multi-modal indexing using CLIP embeddings. Different embedding model, same pipeline.
class ImageIndexer(IndexerIntegration):
name = "indexer.image"
index_events = ["file.uploaded", "canvas.image_placed"]
managed_types = ["image_index"]
async def embed(self, content: str) -> list[float]:
# Override: use CLIP model instead of text embeddings
return await self.clip_provider.embed_image(content)
async def extract_content(self, event_type, data):
return data.get("file_path") or data.get("image_url")
VendorIndexer (V1.2)
WASM extensions ship their own indexers. A JIRA integration might index issue titles; a Notion integration might index page content.
# Loaded from WASM extension manifest
class VendorIndexer(IndexerIntegration):
"""Indexer defined by a WASM extension.
The extension's manifest.yaml declares:
indexer:
events: ["net.atlassian.jira.issue_created", ...]
managed_types: ["jira_issue"]
extract_wasm_fn: "extract_index_content"
"""
async def extract_content(self, event_type, data):
# Delegate to WASM function
return await self.wasm_runtime.call(
self.extract_wasm_fn, event_type, data
)
VectorRouter Evolution
Today's VectorRouter checks memory types in a hardcoded order. With IndexerIntegrations, it becomes a fan-out across all registered indexers:
Before (hardcoded):
VectorRouter.route()
→ _check_memory(["fact", "preference", "canvas_component"])
→ _check_skills(["skill_index"])
→ return best match
After (fan-out):
VectorRouter.route()
→ for indexer in registered_indexers:
results.extend(await indexer.search(query_embedding, ...))
→ return best match across all indexers
Threshold evolution:
| Route | Current | With IndexerIntegrations |
|---|---|---|
| DIRECT_MEMORY | ≥ 0.92 from fact/preference/canvas_component | ≥ 0.92 from TextIndexer |
| SKILL_EXECUTE | ≥ 0.88 from skill_index, no required params | ≥ 0.88 from IntegrationIndexer, no required params |
| TOOL_FILTER | N/A | ≥ 0.70 from IntegrationIndexer → pre-filter tool list |
| SKILL_HINT | ≥ 0.83 from skill_index, has required params | ≥ 0.83 from IntegrationIndexer, has required params |
| LLM_REQUIRED | fallback | fallback (with filtered tool list if TOOL_FILTER hit) |
New route type: TOOL_FILTER. Even when the query needs the LLM (similarity < 0.83), the IntegrationIndexer still pre-filters which tools the LLM sees. The TOOL_FILTER threshold (≥ 0.70) is deliberately lower — it doesn't need high confidence to narrow the tool list, just enough to exclude obviously irrelevant tools.
Knowledge Pipeline Connection
IndexerIntegrations are the fast recall layer in the Knowledge Pipeline:
USE → EXTRACT → COMPILE → SHARE → INSTALL → USE
↑ IndexerIntegrations live here
│
Extract = events fire → indexers embed → vectors stored
│
Connected to Runtime Hierarchy:
VectorRouter (free, ~10ms)
→ IndexerIntegration results
→ only if miss: LLMRuntime ($$$)
The VectorRouter sits above the BaseMorphRuntime hierarchy. It's the zero-cost check before any runtime is invoked. IndexerIntegrations feed the VectorRouter with the vectors it needs to make routing decisions.
Progressive compilation applies to indexing too:
- Level 0: Raw text → TextIndexer (embed full text)
- Level 1: Structured skill → TextIndexer (embed name + description, more precise)
- Level 2: Canvas component → TextIndexer (embed component label + content)
- Level 3: Compiled WASM → IntegrationIndexer (embed action definitions)
Higher compilation levels produce more precise, shorter index entries — which means higher similarity scores and better routing decisions.
Events to Add
Several events that indexers need are not yet emitted by existing services:
| Event | Emitter | When | Needed by |
|---|---|---|---|
interface.registered | InterfaceManager | After register_interface() | IntegrationIndexer |
interface.unregistered | InterfaceManager | After unregister_interface() | IntegrationIndexer |
memory.stored | MemoryIntegration | After _store() | TextIndexer |
memory.corrected | MemoryIntegration | After _correct() | TextIndexer |
memory.deleted | MemoryIntegration | After _delete() | TextIndexer |
conversation.summarized | ConversationSummarizer | After summarize_conversation() | TextIndexer |
canvas.component_placed | FrontendIntegration | After show_card/show_form | TextIndexer |
canvas.component_dismissed | FrontendIntegration | After user dismisses | TextIndexer |
git.committed | GitStore | After morph_write() commit | GitIndexer |
core.*.morph_written | BaseInterface | After morph_write() | GitIndexer |
file.uploaded | FilesystemIntegration | After file upload | ImageIndexer (V1.3) |
Some of these events may already exist in partial form (e.g., core.tasks.morph_written is emitted by the tasks integration). The key change is ensuring all services emit events that indexers can subscribe to — instead of services calling embed→store directly.
Rollout
V1.0 — Unify and Generalize
- Implement
IndexerIntegrationbase class —BaseInterfacesubclass with the contract above - Implement
TextIndexer— replace all 5 existing embed→store sites - Implement
IntegrationIndexer— index all registered actions, pre-filter tool list - Implement
GitIndexer— index.morph/changes - Evolve
VectorRouter— fan-out across indexers, addTOOL_FILTERroute - Add missing events —
interface.registered,memory.stored,conversation.summarized, etc. - Modify
actions_to_anthropic_tools()— accept optional pre-filtered action list from VectorRouter
Migration: The 5 existing embed→store sites are replaced, not wrapped. The skill_index memory_type is preserved for backwards compatibility — TextIndexer manages it. reindex_all_skills() is replaced by TextIndexer + IntegrationIndexer startup reindex.
V1.2 — WASM Vendor Indexers
- WASM extensions declare indexers in their manifest
VendorIndexerwraps WASMextract_contentfunctions- Extensions can ship custom embedding models (via WASM)
V1.3 — Multi-modal
ImageIndexerwith CLIP embeddings (candle on mobile, ONNX on desktop)AudioIndexerwith Whisper → text → TextIndexer (or direct audio embeddings)- VectorRouter handles mixed-modality results (normalize similarity scores across embedding spaces)
Compatibility
The IndexerIntegration contract is designed to be compatible with existing infrastructure:
- BaseInterface: IndexerIntegration extends it — inherits
setup(),subscribe(),morph_write(),get_definition(), etc. - VectorStore: Default
store()/search()/delete()implementations use the existingVectorStore— no database changes needed - EmbeddingManager: Default
embed()uses the existing singleton — same model, same dimensions - EventBus: Subscriptions wired via existing
BaseInterface.subscribe()pattern - InterfaceManager: Indexers are registered like any other integration
- Encryption:
VectorStore.insert()already encrypts content — indexers inherit this automatically
The main code changes are:
- Services emit events instead of calling embed→store directly
VectorRouterqueries indexers instead of hardcoded memory typesactions_to_anthropic_tools()accepts a pre-filtered action list
No database migrations. No new tables. Vectors still go in pgvector with the existing schema.