Skip to main content

IndexerIntegration — Unified Vector Indexing

Status: Design (V1.0) Replaces: Scattered embed→store in 5 call sites Depends on: EventBus, VectorStore, BaseInterface, InterfaceManager


The Problem

Vector indexing in Morphee is copy-pasted across 5 locations, each wiring the same pattern manually:

SiteFilememory_typeWhat it indexes
_index_skillskills/service.pyskill_indexSkill name + description
summarize_conversationmemory/summarizer.pyfact/preference/eventExtracted conversation facts
save_conversation_summarymemory/summarizer.pyconversation_summaryFull conversation summaries
MemoryIntegration._storeintegrations/memory.pyfact/preference/event/noteAI-triggered memory storage
MemoryIntegration._correctintegrations/memory.py(existing type)Corrected memories (GDPR Art. 16)

Every site calls get_embedding_manager()get_provider()provider.embed(text)VectorStore().insert(). There is no shared abstraction.

This creates three problems:

  1. Duplication. Five copy-paste sites means five places to update when we change embedding providers, add encryption, or modify the storage schema.

  2. Single modality. Every indexer embeds text with the same model. When we add image indexing (CLIP, V1.3) or audio indexing (Whisper, V1.5), we need a different embedding pipeline — but the current pattern has no extension point.

  3. Full tool catalog bloat. actions_to_anthropic_tools() collects ALL actions from ALL registered integrations and sends them to Claude on every turn. With 15+ integrations and growing, that's 50+ tools = thousands of tokens per LLM call. The system has no way to pre-filter which tools are relevant to a given query.


The Insight

Indexing is event subscription. Services already emit events (skill.created, conversation.message_added, git.committed). Indexers should react to those events — not be scattered inside the services that emit them.

And indexers don't just recall memory — they pre-filter the entire Integration catalog for the LLM.


Architecture

                          Events

┌───────────────────────┼───────────────────────┐
│ │ │
▼ ▼ ▼
┌──────────┐ ┌──────────────┐ ┌──────────────┐
│ Text │ │ Integration │ │ Git │
│ Indexer │ │ Indexer │ │ Indexer │
└────┬─────┘ └──────┬───────┘ └──────┬───────┘
│ │ │
▼ ▼ ▼
┌──────────────────────────────────────────────────────────┐
│ VectorStore (pgvector) │
└──────────────────────────┬───────────────────────────────┘


┌──────────────────────────────────────────────────────────┐
│ VectorRouter │
│ │
│ User message → embed → fan-out across ALL indexers: │
│ - TextIndexer: fact/preference/event/summary │
│ - IntegrationIndexer: which tools match this query? │
│ - GitIndexer: .morph/ file changes │
│ │
│ Best hit wins → RouteDecision │
└──────────────────────────────────────────────────────────┘

Each indexer is a BaseInterface subclass — an Integration like everything else in Morphee. It subscribes to events in setup(), embeds content when events fire, and responds to search queries from the VectorRouter.


Two Killer Use Cases

A. Memory Indexing (unify what exists today)

Today's 5 scattered embed→store sites become a single TextIndexer that subscribes to events:

class TextIndexer(IndexerIntegration):
name = "indexer.text"
index_events = [
"skill.created", "skill.updated", "skill.deleted",
"conversation.summarized",
"memory.stored", "memory.corrected",
"canvas.component_placed", "canvas.component_dismissed",
]
managed_types = ["skill_index", "fact", "preference", "event",
"conversation_summary", "note", "canvas_component"]

When skill.created fires, the TextIndexer calls extract_content(event)embed()store(). The skill service no longer needs to know anything about embeddings.

Result: One indexer replaces five copy-paste sites. Adding a new indexed type means adding one event handler, not threading embed logic through another service.

B. Integration/Tool Indexing (NEW — the big win)

This is the novel part. Today:

# chat/tools.py — actions_to_anthropic_tools()
for interface in all_interfaces:
for action in interface.get_actions():
tools.append(build_tool_schema(interface, action))
# → ALL tools sent to Claude. Every turn. ~50+ tools = thousands of tokens.

With IntegrationIndexer:

class IntegrationIndexer(IndexerIntegration):
name = "indexer.integrations"
index_events = [
"interface.registered", "interface.unregistered",
"interface.action_added", "interface.action_removed",
]
managed_types = ["integration_index"]

async def extract_content(self, event_type, data):
# Embed: "gmail — send: Send an email to one or more recipients"
return f"{data['interface_name']}{data['action_name']}: {data['action_description']}"

At startup (and whenever integrations change), the IntegrationIndexer embeds every registered action. When a user sends a message:

  1. VectorRouter embeds the user message
  2. Queries IntegrationIndexer: "Which actions match this query?" → top-N results
  3. Only those N actions are injected into the LLM's tool list
  4. LLM sees 3-5 relevant tools instead of 50+

Token impact: With 50 tools averaging ~100 tokens each (name + description + parameter schema), that's ~5,000 tokens per turn. Pre-filtering to 5 tools = ~500 tokens. ~4,500 tokens saved per LLM call.

The existing skill_index is a special case of this. Today, skills are indexed separately from other integrations. The IntegrationIndexer generalizes this: skills, built-in integrations, and WASM extensions are all indexed the same way.

Before:
skill_index (skills/service.py) ← special case
+ ALL other tools sent blindly ← no filtering

After:
integration_index (IntegrationIndexer) ← everything indexed
VectorRouter picks top-N ← per-query filtering

The IndexerIntegration Contract

class IndexerIntegration(BaseInterface):
"""Base class for all vector indexers.

Subclasses declare what events they listen to and what memory_types
they manage. The base class handles the embed→store pipeline.
Override extract_content() at minimum. Override embed() for non-text
modalities (CLIP, Whisper). Override store() for non-pgvector backends.
"""

# --- Subclass declarations ---

index_events: list[str] = []
"""Events this indexer subscribes to. Wired automatically in setup()."""

managed_types: list[str] = []
"""memory_type values this indexer owns. Used by VectorRouter for fan-out."""

# --- Core pipeline (override what's different) ---

async def extract_content(self, event_type: str, data: dict) -> Optional[str]:
"""Transform an event into indexable text. Return None to skip indexing.

This is the main method subclasses implement.
"""
raise NotImplementedError

async def embed(self, content: str) -> list[float]:
"""Default: text embedding via EmbeddingManager.
Override for CLIP (images), Whisper (audio), or custom models."""
provider = await get_embedding_manager().get_provider()
return await provider.embed(content)

async def store(self, embedding: list[float], content: str,
memory_type: str, metadata: dict,
group_id: UUID, space_id: Optional[UUID] = None,
scope: str = "group") -> Optional[UUID]:
"""Default: pgvector via VectorStore. Override for LanceDB or custom."""
store = VectorStore()
return await store.insert(
embedding=embedding, content=content,
memory_type=memory_type, scope=scope,
group_id=group_id, space_id=space_id,
metadata=metadata,
)

async def delete(self, content: str, group_id: UUID,
memory_type: str) -> bool:
"""Default: delete by content match. Override for custom cleanup."""
store = VectorStore()
return await store.delete_by_content(
content=content, group_id=group_id, memory_type=memory_type
)

async def search(self, query_embedding: list[float],
group_id: UUID, space_id: Optional[UUID] = None,
limit: int = 5) -> list[dict]:
"""Called by VectorRouter during fan-out. Searches this indexer's managed_types."""
store = VectorStore()
results = []
for mt in self.managed_types:
hits = await store.search(
query_embedding=query_embedding, memory_type=mt,
group_id=group_id, space_id=space_id, limit=limit,
)
results.extend(hits)
results.sort(key=lambda r: r["similarity"], reverse=True)
return results[:limit]

# --- Lifecycle (automatic wiring) ---

async def setup(self):
"""Subscribe to declared events. Subclasses can override to add setup logic."""
for event_pattern in self.index_events:
await self.subscribe(event_pattern, self._on_event)

async def _on_event(self, event_type: str, data: dict):
"""Internal handler. Runs extract → embed → store pipeline."""
content = await self.extract_content(event_type, data)
if content is None:
return
embedding = await self.embed(content)
metadata = data.get("metadata", {})
memory_type = data.get("memory_type", self.managed_types[0])
group_id = UUID(data["group_id"])
space_id = UUID(data["space_id"]) if data.get("space_id") else None
await self.store(embedding, content, memory_type, metadata,
group_id, space_id)

Design principle: Subclasses only override what's different. A text indexer overrides extract_content(). An image indexer overrides extract_content() + embed(). A LanceDB-backed indexer overrides store() + search().


Built-in Indexers

TextIndexer (V1.0)

Replaces all 5 existing embed→store sites. Handles text-based memories: facts, preferences, events, conversation summaries, notes, canvas components.

class TextIndexer(IndexerIntegration):
name = "indexer.text"
index_events = [
"memory.stored", "memory.corrected", "memory.deleted",
"conversation.summarized",
"skill.created", "skill.updated", "skill.deleted",
"canvas.component_placed", "canvas.component_dismissed",
]
managed_types = ["fact", "preference", "event", "note",
"conversation_summary", "skill_index",
"canvas_component"]

async def extract_content(self, event_type, data):
if event_type.startswith("skill."):
return f"{data['skill_name']}. {data.get('skill_description', '')}"
if event_type == "memory.deleted":
await self.delete(data["content"], UUID(data["group_id"]),
data["memory_type"])
return None # skip indexing
return data.get("content") or data.get("summary")

IntegrationIndexer (V1.0)

Indexes all registered integrations and their actions for tool pre-filtering. This is the novel contribution.

class IntegrationIndexer(IndexerIntegration):
name = "indexer.integrations"
index_events = [
"interface.registered", "interface.unregistered",
]
managed_types = ["integration_index"]

async def extract_content(self, event_type, data):
if event_type == "interface.unregistered":
# Delete all entries for this interface
for action_name in data.get("action_names", []):
key = f"{data['interface_name']}__{action_name}"
await self.delete(key, UUID(data["group_id"]),
"integration_index")
return None
# interface.registered — index each action separately
actions = data.get("actions", [])
for action in actions:
content = f"{data['interface_name']}__{action['name']}"
text = f"{data['interface_name']}{action['name']}: {action['description']}"
embedding = await self.embed(text)
await self.store(
embedding=embedding, content=content,
memory_type="integration_index",
metadata={
"interface_name": data["interface_name"],
"action_name": action["name"],
"action_description": action["description"],
"has_required_params": action.get("has_required_params", False),
"ai_access": action.get("ai_access", "execute"),
},
group_id=UUID(data.get("group_id", "00000000-0000-0000-0000-000000000000")),
)
return None # already stored per-action above

async def reindex_all(self, interface_manager):
"""Called at startup. Indexes all currently registered integrations."""
for interface in interface_manager.interfaces.values():
defn = interface.get_definition()
for action in defn.actions:
content = f"{interface.name}__{action.name}"
text = f"{interface.name}{action.name}: {action.description}"
embedding = await self.embed(text)
await self.store(
embedding=embedding, content=content,
memory_type="integration_index",
metadata={
"interface_name": interface.name,
"action_name": action.name,
"action_description": action.description,
},
group_id=UUID("00000000-0000-0000-0000-000000000000"), # global
)

Startup flow:

  1. InterfaceManager registers all integrations (existing)
  2. IntegrationIndexer.reindex_all(interface_manager) — embeds every action (new)
  3. TextIndexer wires event subscriptions (replaces reindex_all_skills())

Query flow:

  1. User sends message
  2. VectorRouter embeds the message
  3. Queries IntegrationIndexer: top-5 actions by similarity
  4. actions_to_anthropic_tools() only includes those 5 actions (instead of all 50+)
  5. LLM responds with tool call from the pre-filtered set
  6. If no good match (similarity < threshold), fall back to full catalog

GitIndexer (V1.0)

Indexes .morph/ file changes for "What changed?" queries. Subscribes to OpenMorph write events.

class GitIndexer(IndexerIntegration):
name = "indexer.git"
index_events = [
"core.*.morph_written", # any integration writing to .morph/
"git.committed", # direct git commits
]
managed_types = ["git_change"]

async def extract_content(self, event_type, data):
# "tasks/task-abc123.yaml updated: Buy groceries for dinner"
return f"{data['path']} {data.get('action', 'updated')}: {data.get('summary', '')}"

ImageIndexer (V1.3)

Multi-modal indexing using CLIP embeddings. Different embedding model, same pipeline.

class ImageIndexer(IndexerIntegration):
name = "indexer.image"
index_events = ["file.uploaded", "canvas.image_placed"]
managed_types = ["image_index"]

async def embed(self, content: str) -> list[float]:
# Override: use CLIP model instead of text embeddings
return await self.clip_provider.embed_image(content)

async def extract_content(self, event_type, data):
return data.get("file_path") or data.get("image_url")

VendorIndexer (V1.2)

WASM extensions ship their own indexers. A JIRA integration might index issue titles; a Notion integration might index page content.

# Loaded from WASM extension manifest
class VendorIndexer(IndexerIntegration):
"""Indexer defined by a WASM extension.

The extension's manifest.yaml declares:
indexer:
events: ["net.atlassian.jira.issue_created", ...]
managed_types: ["jira_issue"]
extract_wasm_fn: "extract_index_content"
"""
async def extract_content(self, event_type, data):
# Delegate to WASM function
return await self.wasm_runtime.call(
self.extract_wasm_fn, event_type, data
)

VectorRouter Evolution

Today's VectorRouter checks memory types in a hardcoded order. With IndexerIntegrations, it becomes a fan-out across all registered indexers:

Before (hardcoded):
VectorRouter.route()
→ _check_memory(["fact", "preference", "canvas_component"])
→ _check_skills(["skill_index"])
→ return best match

After (fan-out):
VectorRouter.route()
→ for indexer in registered_indexers:
results.extend(await indexer.search(query_embedding, ...))
→ return best match across all indexers

Threshold evolution:

RouteCurrentWith IndexerIntegrations
DIRECT_MEMORY≥ 0.92 from fact/preference/canvas_component≥ 0.92 from TextIndexer
SKILL_EXECUTE≥ 0.88 from skill_index, no required params≥ 0.88 from IntegrationIndexer, no required params
TOOL_FILTERN/A≥ 0.70 from IntegrationIndexer → pre-filter tool list
SKILL_HINT≥ 0.83 from skill_index, has required params≥ 0.83 from IntegrationIndexer, has required params
LLM_REQUIREDfallbackfallback (with filtered tool list if TOOL_FILTER hit)

New route type: TOOL_FILTER. Even when the query needs the LLM (similarity < 0.83), the IntegrationIndexer still pre-filters which tools the LLM sees. The TOOL_FILTER threshold (≥ 0.70) is deliberately lower — it doesn't need high confidence to narrow the tool list, just enough to exclude obviously irrelevant tools.


Knowledge Pipeline Connection

IndexerIntegrations are the fast recall layer in the Knowledge Pipeline:

USE → EXTRACT → COMPILE → SHARE → INSTALL → USE

↑ IndexerIntegrations live here

Extract = events fire → indexers embed → vectors stored

Connected to Runtime Hierarchy:
VectorRouter (free, ~10ms)
→ IndexerIntegration results
→ only if miss: LLMRuntime ($$$)

The VectorRouter sits above the BaseMorphRuntime hierarchy. It's the zero-cost check before any runtime is invoked. IndexerIntegrations feed the VectorRouter with the vectors it needs to make routing decisions.

Progressive compilation applies to indexing too:

  • Level 0: Raw text → TextIndexer (embed full text)
  • Level 1: Structured skill → TextIndexer (embed name + description, more precise)
  • Level 2: Canvas component → TextIndexer (embed component label + content)
  • Level 3: Compiled WASM → IntegrationIndexer (embed action definitions)

Higher compilation levels produce more precise, shorter index entries — which means higher similarity scores and better routing decisions.


Events to Add

Several events that indexers need are not yet emitted by existing services:

EventEmitterWhenNeeded by
interface.registeredInterfaceManagerAfter register_interface()IntegrationIndexer
interface.unregisteredInterfaceManagerAfter unregister_interface()IntegrationIndexer
memory.storedMemoryIntegrationAfter _store()TextIndexer
memory.correctedMemoryIntegrationAfter _correct()TextIndexer
memory.deletedMemoryIntegrationAfter _delete()TextIndexer
conversation.summarizedConversationSummarizerAfter summarize_conversation()TextIndexer
canvas.component_placedFrontendIntegrationAfter show_card/show_formTextIndexer
canvas.component_dismissedFrontendIntegrationAfter user dismissesTextIndexer
git.committedGitStoreAfter morph_write() commitGitIndexer
core.*.morph_writtenBaseInterfaceAfter morph_write()GitIndexer
file.uploadedFilesystemIntegrationAfter file uploadImageIndexer (V1.3)

Some of these events may already exist in partial form (e.g., core.tasks.morph_written is emitted by the tasks integration). The key change is ensuring all services emit events that indexers can subscribe to — instead of services calling embed→store directly.


Rollout

V1.0 — Unify and Generalize

  1. Implement IndexerIntegration base classBaseInterface subclass with the contract above
  2. Implement TextIndexer — replace all 5 existing embed→store sites
  3. Implement IntegrationIndexer — index all registered actions, pre-filter tool list
  4. Implement GitIndexer — index .morph/ changes
  5. Evolve VectorRouter — fan-out across indexers, add TOOL_FILTER route
  6. Add missing eventsinterface.registered, memory.stored, conversation.summarized, etc.
  7. Modify actions_to_anthropic_tools() — accept optional pre-filtered action list from VectorRouter

Migration: The 5 existing embed→store sites are replaced, not wrapped. The skill_index memory_type is preserved for backwards compatibility — TextIndexer manages it. reindex_all_skills() is replaced by TextIndexer + IntegrationIndexer startup reindex.

V1.2 — WASM Vendor Indexers

  • WASM extensions declare indexers in their manifest
  • VendorIndexer wraps WASM extract_content functions
  • Extensions can ship custom embedding models (via WASM)

V1.3 — Multi-modal

  • ImageIndexer with CLIP embeddings (candle on mobile, ONNX on desktop)
  • AudioIndexer with Whisper → text → TextIndexer (or direct audio embeddings)
  • VectorRouter handles mixed-modality results (normalize similarity scores across embedding spaces)

Compatibility

The IndexerIntegration contract is designed to be compatible with existing infrastructure:

  • BaseInterface: IndexerIntegration extends it — inherits setup(), subscribe(), morph_write(), get_definition(), etc.
  • VectorStore: Default store()/search()/delete() implementations use the existing VectorStore — no database changes needed
  • EmbeddingManager: Default embed() uses the existing singleton — same model, same dimensions
  • EventBus: Subscriptions wired via existing BaseInterface.subscribe() pattern
  • InterfaceManager: Indexers are registered like any other integration
  • Encryption: VectorStore.insert() already encrypts content — indexers inherit this automatically

The main code changes are:

  1. Services emit events instead of calling embed→store directly
  2. VectorRouter queries indexers instead of hardcoded memory types
  3. actions_to_anthropic_tools() accepts a pre-filtered action list

No database migrations. No new tables. Vectors still go in pgvector with the existing schema.