·48 min

SOVEREIGN: The Unified Architecture — A Magnum Opus for Local-First AI Systems That Think for Themselves

The capstone synthesis of every system I have built — Dynamic Persona MoE RAG, agentic knowledge graphs, Control Boundary governance, local inference stacks, and spec-driven code generation — collapsed into one unified sovereign AI architecture called SOVEREIGN. This is the project blueprint.

DK

Daniel Kliewer

Author, Sovereign AI

sovereign AIlocal-firstMoE RAGknowledge graphagentic orchestrationdata sovereigntyOllamaNeo4jChromaDBFastAPINext.jslocal LLMControl Boundaryaudit-ready AIautonomous agentspersona engineeringSpecGenarchitecturecapstonePythonTypeScript
Sovereign AI book cover

From the Book

This is from Sovereign AI: Building Local-First Intelligent Systems.

Get the Book — $88
SOVEREIGN: The Unified Architecture — A Magnum Opus for Local-First AI Systems That Think for Themselves

SOVEREIGN: The Unified Architecture

A Magnum Opus for Local-First AI Systems That Think for Themselves

"The mind that runs on borrowed infrastructure answers to its landlord. Build your own floor."


Preface: Why This Post Exists

Every system I have built over the last several years was an answer to a problem I could not ignore.

SynthInt answered the problem of opaque identity: why should the values baked into an AI's persona belong to someone else? Dynamic Persona MoE RAG answered the problem of context drift: why should yesterday's dead context contaminate today's reasoning? The Private Knowledge Graph answered the problem of relational amnesia: why should the connections between ideas collapse into similarity scores that lose their meaning? DeerFlow 2.0 answered the problem of isolated execution: why should agents be monoliths when they can be swarms? OpenClaw answered the problem of cloud dependency: why should inference require a network request? SpecGen answered the problem of the blank page: why should code generation be non-deterministic when the specification is precise? mcbot01 answered the problem of foundation: why should every project rebuild the local-first scaffold from scratch?

Each of these was a partial answer. A module. A proof-of-concept that one piece of the sovereignty puzzle could be built, deployed, and owned.

This post is the synthesis.

SOVEREIGNSelf-owned Orchestration of Versatile Expert Reasoning, Evaluation, Intelligence, Governance, and Network — is the unified architecture that collapses all of these systems into a single coherent project. It is not a rewrite. It is an integration. Every module you have read about on this site is a subsystem in the larger machine. This post is the blueprint for assembling that machine.

I am writing this for myself first. Then for you — the person who read the Sovereignty Manifesto, who runs Ollama on local hardware, who understands intuitively that the architecture you choose encodes your values. You already know why this matters. This post is about how to build it.

And specifically: this post is written so that a coding agent — given nothing but this document as context — can construct the entire SOVEREIGN system from scratch. The architecture is fully specified here. The scaffolding is complete. The philosophy is embedded in the structure itself, because in sovereign AI, the code is always the philosophy.


I. The Thesis: One Problem, Seven Partial Answers, One Synthesis

The core problem of AI in 2026 is not capability. It is ownership.

The most capable models in the world run on hardware you do not control, store context you did not authorize, evolve in directions you did not choose, and serve objectives that were never yours. You interact with them through an interface that was designed to maximize your dependency, not your agency. The extraction is architectural. It was designed in.

I have spent the better part of a decade building the counter-architecture. Not as a rejection of capability — the sovereign stack I describe here is extraordinarily capable — but as a rejection of the trade embedded in every cloud AI interaction: your context in exchange for their compute.

The seven systems that SOVEREIGN synthesizes each resolved one dimension of this problem:

| System | Problem Solved | Core Contribution | |---|---|---| | SynthInt / Dynamic Persona MoE RAG | Opaque identity, static personas | Personas as versioned, auditable JSON; MoE routing to specialized reasoning agents | | Private Knowledge Graph | Relational amnesia, flat vector retrieval | Explicit semantic relationships via NetworkX/Neo4j; provenance-tracked multi-hop reasoning | | DeerFlow 2.0 | Monolithic agent execution | SuperAgent harness; AIO sandbox; persistent memory across agent invocations | | OpenClaw | Cloud inference dependency | Fully local agent runtime via Ollama + llama.cpp; zero-telemetry execution paths | | SpecGen | Non-deterministic code generation | Spec-driven, RAG-grounded code generation; deterministic output from structured input | | mcbot01 | Fragmented local-first scaffolding | Reactive UI + async FastAPI backend as the reusable foundation layer | | Control Boundary Engine | No governance in the execution path | Intent evaluation before execution; audit-ready pipelines; Colorado AI Act "Reasonable Care" compliance |

SOVEREIGN does not replace these systems. It is the environment in which they all run together, passing context between each other through a shared memory substrate, governed by a unified evaluation loop, exposed through a single interface.

The result is not merely a better RAG system. It is a local-first AI operating system — a platform for thought that you own completely.


II. Architecture Overview: The Seven Layers

SOVEREIGN is organized as seven concentric layers. Each layer is independently deployable, testable, and replaceable. The boundaries between layers are explicit interfaces, not implementation assumptions. This is the sovereignty principle applied to architecture itself: no layer should be dependent on the internal implementation of another.

text
1┌─────────────────────────────────────────────────────────────────────┐
2│ LAYER 7: INTERFACE LAYER │
3│ Next.js 16 (App Router) + React + TypeScript │
4│ Conversational UI · Session Management · Persona Selector │
5├─────────────────────────────────────────────────────────────────────┤
6│ LAYER 6: API GATEWAY LAYER │
7│ FastAPI · REST/GraphQL · WebSocket streaming · Auth middleware │
8│ Request validation · Rate limiting · Audit log emission │
9├─────────────────────────────────────────────────────────────────────┤
10│ LAYER 5: ORCHESTRATION LAYER │
11│ MoE Orchestrator · Agent Swarm Router · DeerFlow SuperAgent │
12│ Intent classification · Persona activation · Result aggregation │
13├─────────────────────────────────────────────────────────────────────┤
14│ LAYER 4: GOVERNANCE LAYER │
15│ Control Boundary Engine · Evaluation Loop · Audit Trail │
16│ Intent evaluation · Output scoring · Hallucination detection │
17├─────────────────────────────────────────────────────────────────────┤
18│ LAYER 3: REASONING LAYER │
19│ Dynamic Persona Engine · Specialist Agent Pool · SpecGen │
20│ Persona lifecycle · Bounded trait evolution · Code synthesis │
21├─────────────────────────────────────────────────────────────────────┤
22│ LAYER 2: MEMORY LAYER │
23│ Knowledge Graph (Neo4j/NetworkX) · Vector Store (ChromaDB) │
24│ Episodic memory · Semantic graph · Embedding index · Pruning │
25├─────────────────────────────────────────────────────────────────────┤
26│ LAYER 1: INFERENCE LAYER │
27│ Ollama · llama.cpp · Local model registry │
28│ On-prem inference · Zero telemetry · Reproducible seeds │
29└─────────────────────────────────────────────────────────────────────┘

Every request in SOVEREIGN flows downward through these layers and returns upward. The path is never short-circuited. There is no "fast path" that skips governance. There is no "trusted caller" that bypasses the evaluation loop. The architecture enforces the principle that accountability is not optional — it is structural.


III. The Memory Substrate: Dual-Layer Sovereign Memory

The most important architectural decision in SOVEREIGN is the structure of memory. Memory determines what the system knows, what it can reason about, and what it forgets.

SOVEREIGN uses a dual-substrate memory architecture: a semantic knowledge graph for relational, provenance-tracked long-term memory, and a vector store for high-dimensional similarity retrieval. These are not interchangeable. They are complementary, and the architecture uses them for different reasoning tasks.

3.1 The Semantic Knowledge Graph

The knowledge graph in SOVEREIGN is a persistent, typed, directional graph built on Neo4j (for production persistence) with a NetworkX in-memory layer for query-scoped reasoning. The graph is not a flat document store. It is a living model of your knowledge domain.

Every node in the graph carries:

  • A unique identifier and type
  • A source document reference (provenance)
  • A creation timestamp and last-accessed timestamp
  • A relevance decay coefficient (used by the pruning engine)
  • A confidence weight (updated by the evaluation loop)

Every edge in the graph carries:

  • A typed relationship label (CAUSES, SUPPORTS, CONTRADICTS, PRECEDES, DERIVES_FROM, etc.)
  • A weight (0.0–1.0) representing relationship strength
  • A source (which agent or document established this relationship)
  • A timestamp

This structure makes multi-hop reasoning explicit and auditable. When the system traces a path from Concept A to Claim B through Relationship R, that path is a first-class data structure you can inspect, export, and challenge. It is not a black-box attention pattern.

python
1# sovereign/memory/knowledge_graph.py
2
3from dataclasses import dataclass, field
4from datetime import datetime
5from typing import Dict, List, Optional, Any
6import networkx as nx
7import uuid
8
9
10@dataclass
11class KGNode:
12 """A typed, provenance-tracked node in the sovereign knowledge graph."""
13 id: str
14 label: str # Entity type: CONCEPT, CLAIM, DOCUMENT, AGENT, EVENT
15 content: str # Human-readable representation
16 source_document_id: str # Provenance anchor
17 confidence: float = 1.0 # Updated by evaluation loop
18 access_count: int = 0 # Used by LRU-style pruning
19 decay_coefficient: float = 0.95 # Per-session relevance decay
20 created_at: str = field(default_factory=lambda: datetime.utcnow().isoformat())
21 last_accessed_at: Optional[str] = None
22 metadata: Dict[str, Any] = field(default_factory=dict)
23
24
25@dataclass
26class KGEdge:
27 """A typed, weighted, traceable relationship in the sovereign knowledge graph."""
28 id: str
29 source_id: str
30 target_id: str
31 relationship: str # CAUSES, SUPPORTS, CONTRADICTS, PRECEDES, DERIVES_FROM
32 weight: float = 1.0
33 established_by: str = "system" # Agent ID or document ID that created this edge
34 created_at: str = field(default_factory=lambda: datetime.utcnow().isoformat())
35 metadata: Dict[str, Any] = field(default_factory=dict)
36
37
38class SovereignKnowledgeGraph:
39 """
40 Dual-substrate knowledge graph: persistent Neo4j backend with
41 NetworkX in-memory layer for query-scoped reasoning.
42
43 Design principle: every reasoning path is traceable.
44 Every node has provenance. Every edge has an author.
45 Nothing is inferred without a trail.
46 """
47
48 def __init__(self, config: Dict[str, Any]):
49 self.config = config
50 self.in_memory_graph = nx.DiGraph()
51 self.nodes: Dict[str, KGNode] = {}
52 self.edges: List[KGEdge] = []
53 self._neo4j_driver = None
54 self._init_neo4j()
55
56 def _init_neo4j(self):
57 """Initialize Neo4j connection if configured; fall back to pure NetworkX."""
58 try:
59 from neo4j import GraphDatabase
60 self._neo4j_driver = GraphDatabase.driver(
61 self.config.get("neo4j_uri", "bolt://localhost:7687"),
62 auth=(
63 self.config.get("neo4j_user", "neo4j"),
64 self.config.get("neo4j_password", "sovereign")
65 )
66 )
67 except Exception:
68 # Graceful degradation: operate as pure in-memory graph
69 self._neo4j_driver = None
70
71 def add_node(self, label: str, content: str, source_document_id: str,
72 confidence: float = 1.0, metadata: Optional[Dict] = None) -> KGNode:
73 node = KGNode(
74 id=str(uuid.uuid4()),
75 label=label,
76 content=content,
77 source_document_id=source_document_id,
78 confidence=confidence,
79 metadata=metadata or {}
80 )
81 self.nodes[node.id] = node
82 self.in_memory_graph.add_node(
83 node.id,
84 label=label,
85 content=content,
86 confidence=confidence
87 )
88 if self._neo4j_driver:
89 self._persist_node_to_neo4j(node)
90 return node
91
92 def add_edge(self, source_id: str, target_id: str, relationship: str,
93 weight: float = 1.0, established_by: str = "system") -> Optional[KGEdge]:
94 if source_id not in self.nodes or target_id not in self.nodes:
95 return None
96 edge = KGEdge(
97 id=str(uuid.uuid4()),
98 source_id=source_id,
99 target_id=target_id,
100 relationship=relationship,
101 weight=weight,
102 established_by=established_by
103 )
104 self.edges.append(edge)
105 self.in_memory_graph.add_edge(
106 source_id, target_id,
107 relationship=relationship,
108 weight=weight
109 )
110 if self._neo4j_driver:
111 self._persist_edge_to_neo4j(edge)
112 return edge
113
114 def find_reasoning_path(self, source_id: str, target_id: str,
115 relationship_filter: Optional[List[str]] = None) -> List[KGNode]:
116 """
117 Find an explicit, auditable reasoning path between two nodes.
118
119 This is not similarity search. This is structured inference.
120 The path returned is a chain of evidence, not a probability distribution.
121 """
122 try:
123 path_ids = nx.shortest_path(self.in_memory_graph, source_id, target_id)
124 path_nodes = [self.nodes[nid] for nid in path_ids if nid in self.nodes]
125 if relationship_filter:
126 # Filter edges along the path to the specified relationship types
127 path_nodes = self._filter_path_by_relationship(path_ids, relationship_filter)
128 # Update access counts — the memory knows it has been used
129 for node in path_nodes:
130 node.access_count += 1
131 node.last_accessed_at = datetime.utcnow().isoformat()
132 return path_nodes
133 except (nx.NetworkXNoPath, nx.NodeNotFound):
134 return []
135
136 def apply_temporal_decay(self, decay_factor: float = 0.95):
137 """
138 Apply temporal decay to all node confidence scores.
139
140 Design philosophy: memory that is never accessed should fade.
141 The system forgets gracefully, not catastrophically.
142 Forgetting is not failure. It is discernment.
143 """
144 for node in self.nodes.values():
145 if node.last_accessed_at is None:
146 node.confidence *= decay_factor
147 node.confidence = max(0.01, node.confidence)
148
149 def prune_low_confidence_nodes(self, threshold: float = 0.1) -> List[str]:
150 """
151 Remove nodes whose confidence has decayed below the threshold.
152 Returns list of pruned node IDs for audit logging.
153
154 What is pruned is not destroyed — it is archived.
155 Sovereignty includes the right to forget deliberately.
156 """
157 pruned_ids = []
158 nodes_to_prune = [
159 nid for nid, node in self.nodes.items()
160 if node.confidence < threshold
161 ]
162 for nid in nodes_to_prune:
163 self.in_memory_graph.remove_node(nid)
164 pruned_ids.append(nid)
165 del self.nodes[nid]
166 return pruned_ids
167
168 def export_subgraph(self, node_ids: List[str]) -> Dict[str, Any]:
169 """Export a subgraph for inspection, audit, or external analysis."""
170 subgraph_nodes = {nid: self.nodes[nid] for nid in node_ids if nid in self.nodes}
171 subgraph_edges = [
172 e for e in self.edges
173 if e.source_id in node_ids and e.target_id in node_ids
174 ]
175 return {
176 "nodes": [vars(n) for n in subgraph_nodes.values()],
177 "edges": [vars(e) for e in subgraph_edges],
178 "exported_at": datetime.utcnow().isoformat()
179 }
180
181 def _persist_node_to_neo4j(self, node: KGNode):
182 with self._neo4j_driver.session() as session:
183 session.run(
184 "MERGE (n:Node {id: $id}) "
185 "SET n.label = $label, n.content = $content, "
186 "n.source_document_id = $source_document_id, "
187 "n.confidence = $confidence, n.created_at = $created_at",
188 id=node.id, label=node.label, content=node.content,
189 source_document_id=node.source_document_id,
190 confidence=node.confidence, created_at=node.created_at
191 )
192
193 def _persist_edge_to_neo4j(self, edge: KGEdge):
194 with self._neo4j_driver.session() as session:
195 session.run(
196 "MATCH (a:Node {id: $source_id}), (b:Node {id: $target_id}) "
197 f"MERGE (a)-[r:{edge.relationship} {{id: $edge_id}}]->(b) "
198 "SET r.weight = $weight, r.established_by = $established_by",
199 source_id=edge.source_id, target_id=edge.target_id,
200 edge_id=edge.id, weight=edge.weight,
201 established_by=edge.established_by
202 )
203
204 def _filter_path_by_relationship(self, path_ids: List[str],
205 allowed_relationships: List[str]) -> List[KGNode]:
206 filtered = []
207 for i in range(len(path_ids) - 1):
208 edge_data = self.in_memory_graph.get_edge_data(path_ids[i], path_ids[i + 1])
209 if edge_data and edge_data.get("relationship") in allowed_relationships:
210 if path_ids[i] in self.nodes:
211 filtered.append(self.nodes[path_ids[i]])
212 return filtered

3.2 The Vector Store Integration

The vector store (ChromaDB in development, Qdrant in production) handles the similarity retrieval that the knowledge graph cannot: dense semantic search across large document corpora where the exact relational structure is not yet known.

The critical design decision here is that the vector store feeds the knowledge graph, not the other way around. Vector retrieval surfaces candidate documents. The knowledge graph determines how those documents relate to each other and to the current query context. The vector store is a search index. The knowledge graph is the mind.

python
1# sovereign/memory/vector_store.py
2
3from typing import List, Dict, Any, Optional
4import chromadb
5from chromadb.config import Settings
6
7
8class SovereignVectorStore:
9 """
10 Local-first vector store with zero cloud dependency.
11
12 ChromaDB in development (file-backed, no server required).
13 Qdrant in production (local server, same guarantee).
14
15 The embeddings are yours. The index is yours.
16 Nothing is sent to an external endpoint.
17 """
18
19 def __init__(self, config: Dict[str, Any]):
20 self.persist_directory = config.get("persist_directory", "./data/chromadb")
21 self.collection_name = config.get("collection_name", "sovereign_documents")
22 self.embedding_model = config.get("embedding_model", "nomic-embed-text")
23
24 # File-backed persistence: data survives restarts on your hardware
25 self.client = chromadb.PersistentClient(
26 path=self.persist_directory,
27 settings=Settings(anonymized_telemetry=False) # Explicit: no telemetry
28 )
29 self.collection = self.client.get_or_create_collection(
30 name=self.collection_name,
31 metadata={"hnsw:space": "cosine"}
32 )
33
34 def embed_and_store(self, documents: List[Dict[str, Any]]) -> List[str]:
35 """
36 Embed documents and persist to local vector store.
37 Returns document IDs for graph node linkage.
38 """
39 doc_ids = []
40 for doc in documents:
41 doc_id = doc.get("id", str(uuid.uuid4()))
42 self.collection.add(
43 documents=[doc["content"]],
44 metadatas=[{
45 "source": doc.get("source", "unknown"),
46 "doc_type": doc.get("doc_type", "text"),
47 "created_at": datetime.utcnow().isoformat(),
48 "provenance": doc.get("provenance", "")
49 }],
50 ids=[doc_id]
51 )
52 doc_ids.append(doc_id)
53 return doc_ids
54
55 def query(self, query_text: str, n_results: int = 10,
56 where_filter: Optional[Dict] = None) -> List[Dict[str, Any]]:
57 """
58 Semantic search over local embeddings.
59 Returns results with full provenance metadata.
60 """
61 results = self.collection.query(
62 query_texts=[query_text],
63 n_results=n_results,
64 where=where_filter,
65 include=["documents", "metadatas", "distances"]
66 )
67 return [
68 {
69 "id": results["ids"][0][i],
70 "content": results["documents"][0][i],
71 "metadata": results["metadatas"][0][i],
72 "relevance_score": 1.0 - results["distances"][0][i]
73 }
74 for i in range(len(results["ids"][0]))
75 ]

IV. The Inference Layer: Local Execution, Zero Dependency

The inference layer is non-negotiable. It is the foundation of every sovereignty guarantee in the system. If inference is remote, the entire stack is a thin wrapper over someone else's infrastructure. Sovereignty is not a frontend feature. It begins at the model.

SOVEREIGN's inference layer supports three execution modes:

Mode 1: Ollama (Primary) — HTTP interface to locally served models. Fast, easy to configure, supports quantized variants of Llama, Qwen, Mistral, Phi, and Gemma families.

Mode 2: llama.cpp (Fallback/Air-Gap) — Direct binary execution. No server process. No HTTP overhead. Used when network interface is unacceptable (air-gapped environments, maximum-security deployments).

Mode 3: Hybrid — Different specialist agents use different models. The orchestrator routes to the fastest suitable model for the current task. Code tasks go to a code-optimized model. Long-context tasks go to a high-context-window model. All models are local.

python
1# sovereign/inference/local_engine.py
2
3from typing import Dict, Any, Optional, Generator
4import requests
5import subprocess
6import json
7
8
9class LocalInferenceEngine:
10 """
11 Unified interface to local model execution.
12
13 Design invariant: no request leaves this machine.
14 The api_endpoint, even in Ollama mode, resolves to localhost.
15 There is no fallback to a cloud endpoint.
16 If local inference fails, the system fails loudly — not silently to the cloud.
17 """
18
19 EXECUTION_MODES = ["ollama", "llama_cpp", "hybrid"]
20
21 def __init__(self, config: Dict[str, Any]):
22 self.mode = config.get("execution_mode", "ollama")
23 self.ollama_endpoint = config.get("ollama_endpoint", "http://localhost:11434")
24 self.llama_cpp_binary = config.get("llama_cpp_binary", "./bin/llama-cli")
25 self.model_registry = config.get("model_registry", {})
26 self.default_model = config.get("default_model", "llama3.2")
27 self.seed = config.get("seed", 42) # Reproducibility by default
28 self.default_temperature = config.get("temperature", 0.1)
29
30 self._validate_local_availability()
31
32 def _validate_local_availability(self):
33 """
34 Refuse to initialize if no local inference backend is reachable.
35
36 This is a hard failure, not a warning.
37 Failing loudly protects sovereignty — a silent fallback would not.
38 """
39 if self.mode in ("ollama", "hybrid"):
40 try:
41 response = requests.get(f"{self.ollama_endpoint}/api/tags", timeout=5)
42 response.raise_for_status()
43 except Exception as e:
44 raise RuntimeError(
45 f"SOVEREIGN requires local inference. Ollama is not reachable at "
46 f"{self.ollama_endpoint}. Start Ollama with `ollama serve` and retry.\n"
47 f"Original error: {e}"
48 )
49
50 def generate(self, prompt: str, system_prompt: str = "",
51 model: Optional[str] = None, temperature: Optional[float] = None,
52 max_tokens: int = 2000, seed: Optional[int] = None) -> str:
53 """
54 Generate a response from the local model.
55 Returns the complete response text.
56 """
57 effective_model = model or self.default_model
58 effective_temperature = temperature if temperature is not None else self.default_temperature
59 effective_seed = seed if seed is not None else self.seed
60
61 if self.mode == "ollama":
62 return self._generate_ollama(
63 prompt, system_prompt, effective_model,
64 effective_temperature, max_tokens, effective_seed
65 )
66 elif self.mode == "llama_cpp":
67 return self._generate_llama_cpp(
68 prompt, system_prompt, effective_model,
69 effective_temperature, max_tokens
70 )
71 else:
72 raise ValueError(f"Unknown execution mode: {self.mode}")
73
74 def generate_stream(self, prompt: str, system_prompt: str = "",
75 model: Optional[str] = None) -> Generator[str, None, None]:
76 """
77 Stream tokens from local inference for real-time UI updates.
78 Every token comes from your hardware.
79 """
80 effective_model = model or self.default_model
81 payload = {
82 "model": effective_model,
83 "messages": [
84 {"role": "system", "content": system_prompt},
85 {"role": "user", "content": prompt}
86 ],
87 "options": {"temperature": self.default_temperature, "seed": self.seed},
88 "stream": True
89 }
90 with requests.post(
91 f"{self.ollama_endpoint}/api/chat",
92 json=payload,
93 stream=True,
94 timeout=120
95 ) as response:
96 for line in response.iter_lines():
97 if line:
98 chunk = json.loads(line)
99 if not chunk.get("done"):
100 yield chunk.get("message", {}).get("content", "")
101
102 def route_to_specialist(self, task_type: str, prompt: str,
103 system_prompt: str = "") -> str:
104 """
105 Route to the best local model for the given task type.
106
107 The routing table is yours. You decide which model handles what.
108 The routing logic is explicit, auditable, and modifiable.
109 """
110 routing_table = self.model_registry.get("routing", {})
111 specialist_model = routing_table.get(task_type, self.default_model)
112 return self.generate(prompt, system_prompt, model=specialist_model)
113
114 def _generate_ollama(self, prompt: str, system_prompt: str, model: str,
115 temperature: float, max_tokens: int, seed: int) -> str:
116 payload = {
117 "model": model,
118 "messages": [
119 {"role": "system", "content": system_prompt or "You are a helpful, precise assistant."},
120 {"role": "user", "content": prompt}
121 ],
122 "options": {
123 "temperature": temperature,
124 "seed": seed,
125 "num_predict": max_tokens
126 },
127 "stream": False
128 }
129 response = requests.post(
130 f"{self.ollama_endpoint}/api/chat",
131 json=payload,
132 timeout=120
133 )
134 response.raise_for_status()
135 return response.json()["message"]["content"]
136
137 def _generate_llama_cpp(self, prompt: str, system_prompt: str, model: str,
138 temperature: float, max_tokens: int) -> str:
139 model_path = self.model_registry.get("paths", {}).get(model, model)
140 full_prompt = f"<|system|>{system_prompt}<|user|>{prompt}<|assistant|>"
141 result = subprocess.run(
142 [
143 self.llama_cpp_binary,
144 "-m", model_path,
145 "-p", full_prompt,
146 "--temp", str(temperature),
147 "-n", str(max_tokens),
148 "--silent-prompt",
149 "--no-display-prompt"
150 ],
151 capture_output=True, text=True, timeout=300
152 )
153 if result.returncode != 0:
154 raise RuntimeError(f"llama.cpp execution failed: {result.stderr}")
155 return result.stdout.strip()

V. The Persona Engine: Identity as a First-Class Data Structure

Every prior system I have built has wrestled with the same question: what is an AI persona, exactly? In corporate systems, it is a system prompt — a string of text injected at the top of the context window, ephemeral, invisible, unversioned, unauditable. You accept it as a default and interact with a character whose values you did not choose.

In SOVEREIGN, a persona is a typed, versioned, evolvable data structure with a complete lifecycle. It has traits (numeric weights that shape how the reasoning engine processes queries), expertise domains (which determine routing priority), an activation cost (used by the MoE orchestrator to balance resource allocation), and a performance history (updated by the evaluation loop after every query).

The persona is not the model. The model is a reasoning engine. The persona is a constraint vector applied to that engine. You can have dozens of personas sharing a single model instance. You can swap personas without changing the model. You can evolve a persona's trait weights based on its performance without retraining anything. The separation is total.

python
1# sovereign/reasoning/persona_engine.py
2
3from dataclasses import dataclass, field
4from typing import Dict, List, Optional, Any
5from datetime import datetime
6import json
7import os
8import uuid
9
10
11@dataclass
12class PersonaTrait:
13 name: str
14 weight: float # 0.0 to 1.0
15 description: str
16 evolution_rate: float = 0.05 # How quickly this trait responds to feedback
17
18
19@dataclass
20class PersonaPerformance:
21 total_queries: int = 0
22 total_score: float = 0.0
23 last_used: Optional[str] = None
24 success_rate: float = 0.0
25 domain_scores: Dict[str, float] = field(default_factory=dict)
26
27 @property
28 def average_score(self) -> float:
29 if self.total_queries == 0:
30 return 0.0
31 return self.total_score / self.total_queries
32
33
34@dataclass
35class Persona:
36 """
37 A sovereign persona: fully owned, fully auditable, fully evolvable.
38
39 This is not a system prompt. It is a data structure with history,
40 with traits that evolve according to rules you define,
41 with performance metrics that you evaluate,
42 and with a lifecycle that you control.
43 """
44 id: str
45 name: str
46 description: str
47 traits: Dict[str, PersonaTrait]
48 expertise: List[str]
49 activation_cost: float = 0.3
50 status: str = "experimental" # experimental → active → stable → pruned
51 version: int = 1
52 created_at: str = field(default_factory=lambda: datetime.utcnow().isoformat())
53 updated_at: Optional[str] = None
54 performance: PersonaPerformance = field(default_factory=PersonaPerformance)
55 evolution_log: List[Dict[str, Any]] = field(default_factory=list)
56 system_prompt_template: str = ""
57
58 def get_system_prompt(self, context: str = "") -> str:
59 """Generate the system prompt from trait weights and context."""
60 trait_descriptions = []
61 for trait_name, trait in self.traits.items():
62 if trait.weight > 0.6:
63 trait_descriptions.append(f"strong {trait_name.replace('_', ' ')}")
64 elif trait.weight > 0.3:
65 trait_descriptions.append(f"moderate {trait_name.replace('_', ' ')}")
66
67 trait_string = ", ".join(trait_descriptions) if trait_descriptions else "balanced reasoning"
68 return (
69 f"You are {self.name}. {self.description} "
70 f"Your reasoning is characterized by: {trait_string}. "
71 f"Your areas of expertise are: {', '.join(self.expertise)}. "
72 f"{self.system_prompt_template} "
73 f"{f'Current context: {context}' if context else ''}"
74 ).strip()
75
76 def apply_bounded_update(self, feedback_vector: Dict[str, float]) -> Dict[str, Any]:
77 """
78 Apply the bounded update function: Δw = f(feedback) × (1 − w)
79
80 The (1 − w) term ensures convergence — high-weight traits resist
81 extreme changes. This prevents runaway specialization.
82 Stability is a design feature, not a constraint.
83 """
84 evolution_entry = {
85 "timestamp": datetime.utcnow().isoformat(),
86 "version": self.version,
87 "changes": []
88 }
89
90 for trait_name, trait in self.traits.items():
91 feedback_value = feedback_vector.get(trait_name, 0.0)
92 delta = feedback_value * trait.evolution_rate * (1.0 - trait.weight)
93 new_weight = max(0.0, min(1.0, trait.weight + delta))
94
95 evolution_entry["changes"].append({
96 "trait": trait_name,
97 "from": trait.weight,
98 "to": new_weight,
99 "delta": new_weight - trait.weight,
100 "feedback": feedback_value
101 })
102 trait.weight = new_weight
103
104 self.version += 1
105 self.updated_at = datetime.utcnow().isoformat()
106 self.evolution_log.append(evolution_entry)
107 return evolution_entry
108
109
110class PersonaEngine:
111 """
112 Manages the complete lifecycle of sovereign personas.
113
114 Active → Stable → Pruned → Cold Storage → Recalled.
115 The lifecycle is yours to govern.
116 Nothing is deleted without your explicit instruction.
117 Cold storage preserves everything for potential recall.
118 """
119
120 LIFECYCLE_STATES = ["experimental", "active", "stable", "pruned"]
121 PERSONAS_DIR = "./data/personas"
122
123 def __init__(self, config: Dict[str, Any]):
124 self.config = config
125 self.active_personas: Dict[str, Persona] = {}
126 self.cold_storage: Dict[str, Persona] = {}
127 self.personas_dir = config.get("personas_dir", self.PERSONAS_DIR)
128 self._ensure_directory_structure()
129 self._load_active_personas()
130
131 def _ensure_directory_structure(self):
132 for state in self.LIFECYCLE_STATES:
133 os.makedirs(os.path.join(self.personas_dir, state), exist_ok=True)
134 os.makedirs(os.path.join(self.personas_dir, "cold_storage"), exist_ok=True)
135
136 def _load_active_personas(self):
137 for state in ["experimental", "active", "stable"]:
138 state_dir = os.path.join(self.personas_dir, state)
139 for fname in os.listdir(state_dir):
140 if fname.endswith(".json"):
141 with open(os.path.join(state_dir, fname)) as f:
142 data = json.load(f)
143 persona = self._deserialize_persona(data)
144 self.active_personas[persona.id] = persona
145
146 def route_to_persona(self, query: str, query_domain: str) -> List[Persona]:
147 """
148 Select the best personas for the current query using multi-factor routing.
149
150 Routing considers: domain expertise match, activation cost,
151 historical performance in the query domain, and current lifecycle state.
152 Only stable and active personas participate in production routing.
153 """
154 candidates = [
155 p for p in self.active_personas.values()
156 if p.status in ("active", "stable")
157 ]
158
159 scored_candidates = []
160 for persona in candidates:
161 domain_match = 1.0 if query_domain in persona.expertise else 0.3
162 historical_score = persona.performance.domain_scores.get(query_domain, 0.5)
163 cost_penalty = 1.0 - persona.activation_cost
164 composite_score = (
165 0.4 * domain_match +
166 0.4 * historical_score +
167 0.2 * cost_penalty
168 )
169 scored_candidates.append((persona, composite_score))
170
171 scored_candidates.sort(key=lambda x: x[1], reverse=True)
172 max_parallel = self.config.get("max_parallel_personas", 3)
173 return [p for p, _ in scored_candidates[:max_parallel]]
174
175 def prune_persona(self, persona_id: str, reason: str = "performance_threshold") -> bool:
176 """
177 Retire a persona to cold storage. Not deletion — archival.
178 The persona's full history is preserved.
179 The reason is logged.
180 It can be recalled if context warrants.
181 """
182 if persona_id not in self.active_personas:
183 return False
184
185 persona = self.active_personas[persona_id]
186 persona.status = "pruned"
187 persona.updated_at = datetime.utcnow().isoformat()
188 persona.evolution_log.append({
189 "timestamp": datetime.utcnow().isoformat(),
190 "event": "pruned",
191 "reason": reason
192 })
193
194 self.cold_storage[persona_id] = persona
195 del self.active_personas[persona_id]
196 self._save_persona_to_state(persona, "cold_storage")
197 return True
198
199 def recall_persona(self, persona_id: str, query_context: str) -> Optional[Persona]:
200 """
201 Attempt to recall a pruned persona based on current query context.
202
203 The system asks: is this dormant knowledge relevant again?
204 If yes, it is restored. If no, it remains dormant.
205 The question is explicit. The answer is auditable.
206 """
207 if persona_id not in self.cold_storage:
208 return None
209
210 persona = self.cold_storage[persona_id]
211 # Compute context relevance by checking domain overlap
212 query_terms = set(query_context.lower().split())
213 expertise_terms = set(" ".join(persona.expertise).lower().split())
214 overlap = len(query_terms & expertise_terms) / max(len(expertise_terms), 1)
215
216 recall_threshold = self.config.get("recall_threshold", 0.3)
217 if overlap >= recall_threshold:
218 persona.status = "active"
219 persona.updated_at = datetime.utcnow().isoformat()
220 persona.evolution_log.append({
221 "timestamp": datetime.utcnow().isoformat(),
222 "event": "recalled",
223 "context_overlap": overlap
224 })
225 self.active_personas[persona_id] = persona
226 del self.cold_storage[persona_id]
227 return persona
228 return None
229
230 def _deserialize_persona(self, data: Dict[str, Any]) -> Persona:
231 traits = {
232 k: PersonaTrait(**v) if isinstance(v, dict) else PersonaTrait(
233 name=k, weight=float(v), description="", evolution_rate=0.05
234 )
235 for k, v in data.get("traits", {}).items()
236 }
237 performance_data = data.get("performance", {})
238 performance = PersonaPerformance(
239 total_queries=performance_data.get("total_queries", 0),
240 total_score=performance_data.get("total_score", 0.0),
241 last_used=performance_data.get("last_used"),
242 success_rate=performance_data.get("success_rate", 0.0),
243 domain_scores=performance_data.get("domain_scores", {})
244 )
245 return Persona(
246 id=data.get("id", str(uuid.uuid4())),
247 name=data["name"],
248 description=data.get("description", ""),
249 traits=traits,
250 expertise=data.get("expertise", []),
251 activation_cost=data.get("activation_cost", 0.3),
252 status=data.get("status", "experimental"),
253 version=data.get("version", 1),
254 created_at=data.get("created_at", datetime.utcnow().isoformat()),
255 performance=performance,
256 evolution_log=data.get("evolution_log", []),
257 system_prompt_template=data.get("system_prompt_template", "")
258 )
259
260 def _save_persona_to_state(self, persona: Persona, state: str):
261 filepath = os.path.join(self.personas_dir, state, f"{persona.id}.json")
262 with open(filepath, "w") as f:
263 json.dump(vars(persona), f, indent=2, default=str)

VI. The Governance Layer: The Control Boundary Engine

The Control Boundary Engine is the system's conscience. It runs on every request. It cannot be bypassed. It evaluates intent before execution, scores outputs after generation, and emits a complete audit trail that satisfies enterprise governance requirements including the Colorado AI Act's "Reasonable Care" standard.

In corporate AI, governance is a post-hoc appendage — a feedback button, a content moderation layer, a logging system bolted onto the side of the architecture after the fact. In SOVEREIGN, governance is embedded in the execution path. You cannot get a response without passing through the evaluation loop. You cannot update a persona without logging the change. You cannot prune a knowledge graph node without recording the decision.

This is not compliance theater. It is the architecture of a system that answers to you.

python
1# sovereign/governance/control_boundary.py
2
3from dataclasses import dataclass, field
4from typing import Dict, Any, Optional, List
5from datetime import datetime
6from enum import Enum
7import uuid
8
9
10class IntentCategory(Enum):
11 INFORMATIONAL = "informational"
12 GENERATIVE = "generative"
13 ANALYTICAL = "analytical"
14 EXECUTABLE = "executable" # Triggers higher governance scrutiny
15 ADMINISTRATIVE = "administrative" # System modification — maximum scrutiny
16
17
18class GovernanceDecision(Enum):
19 PROCEED = "proceed"
20 PROCEED_WITH_LOGGING = "proceed_with_logging"
21 REQUIRE_CONFIRMATION = "require_confirmation"
22 BLOCK = "block"
23
24
25@dataclass
26class ControlBoundaryResult:
27 request_id: str
28 intent_category: IntentCategory
29 governance_decision: GovernanceDecision
30 risk_score: float # 0.0 (benign) to 1.0 (high risk)
31 justification: str
32 audit_record: Dict[str, Any]
33 timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat())
34 passed: bool = True
35
36
37@dataclass
38class OutputEvaluationResult:
39 request_id: str
40 grounding_score: float # How well anchored to source documents
41 coherence_score: float # Internal logical consistency
42 coverage_score: float # Query completeness
43 hallucination_penalty: float # Detected confabulation
44 composite_score: float # Weighted aggregate
45 flagged_claims: List[str] # Claims requiring provenance verification
46 audit_record: Dict[str, Any]
47 timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat())
48
49
50class ControlBoundaryEngine:
51 """
52 The governance conscience of SOVEREIGN.
53
54 Every request passes through here before execution.
55 Every output passes through here before delivery.
56 The audit trail is complete, immutable, and yours.
57
58 This is not a security layer. It is an accountability layer.
59 The distinction matters: security prevents bad actors.
60 Accountability ensures the system answers to you.
61 """
62
63 def __init__(self, config: Dict[str, Any]):
64 self.config = config
65 self.audit_log_path = config.get("audit_log_path", "./logs/audit.jsonl")
66 self.risk_thresholds = config.get("risk_thresholds", {
67 "block": 0.9,
68 "require_confirmation": 0.7,
69 "enhanced_logging": 0.4
70 })
71 self._init_audit_log()
72
73 def _init_audit_log(self):
74 import os
75 os.makedirs(os.path.dirname(self.audit_log_path), exist_ok=True)
76
77 def evaluate_request(self, query: str, session_id: str,
78 user_context: Dict[str, Any]) -> ControlBoundaryResult:
79 """
80 Phase 1: Evaluate intent before execution.
81
82 The system asks itself: what is this request trying to do?
83 Is the intent aligned with the configured governance policy?
84 What level of scrutiny does this request warrant?
85 """
86 request_id = str(uuid.uuid4())
87 intent_category = self._classify_intent(query)
88 risk_score = self._compute_risk_score(query, intent_category, user_context)
89 governance_decision = self._make_governance_decision(risk_score, intent_category)
90
91 justification = self._generate_justification(
92 intent_category, risk_score, governance_decision
93 )
94
95 audit_record = {
96 "request_id": request_id,
97 "session_id": session_id,
98 "query_hash": hash(query), # Hash, not raw query — privacy-preserving audit
99 "intent_category": intent_category.value,
100 "risk_score": risk_score,
101 "governance_decision": governance_decision.value,
102 "justification": justification,
103 "timestamp": datetime.utcnow().isoformat()
104 }
105
106 self._append_to_audit_log(audit_record)
107
108 return ControlBoundaryResult(
109 request_id=request_id,
110 intent_category=intent_category,
111 governance_decision=governance_decision,
112 risk_score=risk_score,
113 justification=justification,
114 audit_record=audit_record,
115 passed=(governance_decision != GovernanceDecision.BLOCK)
116 )
117
118 def evaluate_output(self, output: str, source_nodes: List[Dict],
119 query: str, request_id: str) -> OutputEvaluationResult:
120 """
121 Phase 2: Evaluate output before delivery.
122
123 The system asks: is this response grounded in evidence?
124 Does it make claims that cannot be traced to source documents?
125 Is it coherent? Is it complete relative to the query?
126
127 This is the architectural answer to hallucination.
128 Not a post-hoc filter — an embedded evaluation.
129 """
130 grounding_score = self._compute_grounding_score(output, source_nodes)
131 coherence_score = self._compute_coherence_score(output)
132 coverage_score = self._compute_coverage_score(output, query)
133 hallucination_penalty = self._detect_hallucinations(output, source_nodes)
134 flagged_claims = self._extract_flagged_claims(output, source_nodes)
135
136 composite_score = (
137 0.35 * grounding_score +
138 0.30 * coherence_score +
139 0.25 * coverage_score -
140 0.10 * hallucination_penalty
141 )
142 composite_score = max(0.0, min(1.0, composite_score))
143
144 audit_record = {
145 "request_id": request_id,
146 "grounding_score": grounding_score,
147 "coherence_score": coherence_score,
148 "coverage_score": coverage_score,
149 "hallucination_penalty": hallucination_penalty,
150 "composite_score": composite_score,
151 "flagged_claims_count": len(flagged_claims),
152 "timestamp": datetime.utcnow().isoformat()
153 }
154 self._append_to_audit_log(audit_record)
155
156 return OutputEvaluationResult(
157 request_id=request_id,
158 grounding_score=grounding_score,
159 coherence_score=coherence_score,
160 coverage_score=coverage_score,
161 hallucination_penalty=hallucination_penalty,
162 composite_score=composite_score,
163 flagged_claims=flagged_claims,
164 audit_record=audit_record
165 )
166
167 def _classify_intent(self, query: str) -> IntentCategory:
168 query_lower = query.lower()
169 if any(k in query_lower for k in ["delete", "modify", "update", "configure", "install"]):
170 return IntentCategory.ADMINISTRATIVE
171 if any(k in query_lower for k in ["execute", "run", "deploy", "create file", "write to"]):
172 return IntentCategory.EXECUTABLE
173 if any(k in query_lower for k in ["analyze", "compare", "evaluate", "assess"]):
174 return IntentCategory.ANALYTICAL
175 if any(k in query_lower for k in ["write", "generate", "create", "draft", "produce"]):
176 return IntentCategory.GENERATIVE
177 return IntentCategory.INFORMATIONAL
178
179 def _compute_risk_score(self, query: str, intent: IntentCategory,
180 context: Dict[str, Any]) -> float:
181 base_scores = {
182 IntentCategory.INFORMATIONAL: 0.1,
183 IntentCategory.GENERATIVE: 0.3,
184 IntentCategory.ANALYTICAL: 0.2,
185 IntentCategory.EXECUTABLE: 0.6,
186 IntentCategory.ADMINISTRATIVE: 0.8
187 }
188 return base_scores.get(intent, 0.5)
189
190 def _make_governance_decision(self, risk_score: float,
191 intent: IntentCategory) -> GovernanceDecision:
192 if risk_score >= self.risk_thresholds["block"]:
193 return GovernanceDecision.BLOCK
194 if risk_score >= self.risk_thresholds["require_confirmation"]:
195 return GovernanceDecision.REQUIRE_CONFIRMATION
196 if risk_score >= self.risk_thresholds["enhanced_logging"]:
197 return GovernanceDecision.PROCEED_WITH_LOGGING
198 return GovernanceDecision.PROCEED
199
200 def _compute_grounding_score(self, output: str,
201 source_nodes: List[Dict]) -> float:
202 if not source_nodes:
203 return 0.0
204 source_terms = set()
205 for node in source_nodes:
206 content = node.get("content", "")
207 source_terms.update(content.lower().split())
208 output_terms = set(output.lower().split())
209 overlap = len(output_terms & source_terms)
210 return min(1.0, overlap / max(len(output_terms), 1) * 3.0)
211
212 def _compute_coherence_score(self, output: str) -> float:
213 sentences = [s.strip() for s in output.split(".") if s.strip()]
214 if len(sentences) < 2:
215 return 1.0
216 return min(1.0, 0.5 + (len(sentences) / 20.0))
217
218 def _compute_coverage_score(self, output: str, query: str) -> float:
219 query_terms = set(query.lower().split())
220 output_text = output.lower()
221 covered = sum(1 for term in query_terms if term in output_text)
222 return covered / max(len(query_terms), 1)
223
224 def _detect_hallucinations(self, output: str,
225 source_nodes: List[Dict]) -> float:
226 specific_claims = [
227 word for word in output.split()
228 if word.replace(",", "").replace(".", "").isdigit()
229 or (len(word) > 2 and word[0].isupper())
230 ]
231 if not specific_claims or not source_nodes:
232 return 0.0
233 source_content = " ".join(n.get("content", "") for n in source_nodes).lower()
234 ungrounded = sum(
235 1 for claim in specific_claims
236 if claim.lower() not in source_content
237 )
238 return min(1.0, ungrounded / max(len(specific_claims), 1))
239
240 def _extract_flagged_claims(self, output: str,
241 source_nodes: List[Dict]) -> List[str]:
242 source_content = " ".join(n.get("content", "") for n in source_nodes).lower()
243 sentences = [s.strip() for s in output.split(".") if s.strip()]
244 flagged = []
245 for sentence in sentences:
246 key_terms = [w for w in sentence.split() if len(w) > 5]
247 if key_terms and not any(t.lower() in source_content for t in key_terms):
248 flagged.append(sentence)
249 return flagged[:5] # Return top 5 flagged sentences
250
251 def _generate_justification(self, intent: IntentCategory,
252 risk_score: float,
253 decision: GovernanceDecision) -> str:
254 return (
255 f"Intent classified as {intent.value} with risk score {risk_score:.2f}. "
256 f"Governance decision: {decision.value}. "
257 f"Threshold configuration: block={self.risk_thresholds['block']}, "
258 f"confirm={self.risk_thresholds['require_confirmation']}."
259 )
260
261 def _append_to_audit_log(self, record: Dict[str, Any]):
262 import json
263 with open(self.audit_log_path, "a") as f:
264 f.write(json.dumps(record) + "\n")

VII. The Orchestration Layer: MoE Routing and Agent Swarms

The MoE orchestrator is the brain of SOVEREIGN's execution path. It receives a query from the API gateway, consults the governance layer for clearance, routes to the persona engine for specialist selection, dispatches parallel persona commentary passes against the knowledge graph, aggregates results through a multi-dimensional evaluation function, and returns a synthesized response with a full execution trace.

This is not a chain. It is a graph. Execution can be parallel, recursive, or branching depending on query complexity and persona routing decisions.

python
1# sovereign/orchestration/moe_orchestrator.py
2
3from typing import Dict, List, Any, Optional
4from datetime import datetime
5import asyncio
6import uuid
7
8from sovereign.reasoning.persona_engine import PersonaEngine, Persona
9from sovereign.memory.knowledge_graph import SovereignKnowledgeGraph
10from sovereign.memory.vector_store import SovereignVectorStore
11from sovereign.inference.local_engine import LocalInferenceEngine
12from sovereign.governance.control_boundary import ControlBoundaryEngine, GovernanceDecision
13
14
15class MoEOrchestrator:
16 """
17 The Mixture-of-Experts orchestrator for SOVEREIGN.
18
19 Routes queries to specialist personas, executes parallel
20 commentary passes, aggregates results through multi-dimensional
21 evaluation, and returns synthesized responses with full execution traces.
22
23 Every execution is reproducible.
24 Every routing decision is logged.
25 Every persona contribution is attributed.
26 """
27
28 def __init__(self, config: Dict[str, Any]):
29 self.config = config
30 self.persona_engine = PersonaEngine(config.get("persona_config", {}))
31 self.knowledge_graph = SovereignKnowledgeGraph(config.get("graph_config", {}))
32 self.vector_store = SovereignVectorStore(config.get("vector_config", {}))
33 self.inference_engine = LocalInferenceEngine(config.get("inference_config", {}))
34 self.governance = ControlBoundaryEngine(config.get("governance_config", {}))
35
36 def execute(self, query: str, session_id: str,
37 user_context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
38 """
39 Full orchestration pipeline.
40
41 Phase 1: Governance pre-check
42 Phase 2: Context retrieval (vector + graph)
43 Phase 3: Persona routing
44 Phase 4: Parallel persona commentary passes
45 Phase 5: Aggregation and synthesis
46 Phase 6: Governance post-check
47 Phase 7: Persona evolution update
48 Phase 8: Return with full execution trace
49 """
50 execution_trace = {
51 "execution_id": str(uuid.uuid4()),
52 "query": query,
53 "session_id": session_id,
54 "started_at": datetime.utcnow().isoformat(),
55 "phases": []
56 }
57
58 # ── Phase 1: Governance Pre-Check ────────────────────────────────────────
59 governance_result = self.governance.evaluate_request(
60 query, session_id, user_context or {}
61 )
62 execution_trace["phases"].append({
63 "phase": "governance_precheck",
64 "result": governance_result.audit_record
65 })
66
67 if not governance_result.passed:
68 return self._build_blocked_response(query, governance_result, execution_trace)
69
70 # ── Phase 2: Context Retrieval ────────────────────────────────────────────
71 vector_results = self.vector_store.query(query, n_results=10)
72 query_domain = self._infer_domain(query, vector_results)
73
74 # Build query-scoped graph from retrieved documents
75 source_node_ids = self._build_query_graph(query, vector_results)
76 execution_trace["phases"].append({
77 "phase": "context_retrieval",
78 "vector_results_count": len(vector_results),
79 "graph_nodes_constructed": len(source_node_ids),
80 "inferred_domain": query_domain
81 })
82
83 # ── Phase 3: Persona Routing ──────────────────────────────────────────────
84 activated_personas = self.persona_engine.route_to_persona(query, query_domain)
85 execution_trace["phases"].append({
86 "phase": "persona_routing",
87 "activated_personas": [p.id for p in activated_personas],
88 "persona_count": len(activated_personas)
89 })
90
91 if not activated_personas:
92 return self._build_no_persona_response(query, execution_trace)
93
94 # ── Phase 4: Parallel Persona Commentary ─────────────────────────────────
95 persona_results = self._execute_persona_passes(
96 query, activated_personas, vector_results, source_node_ids
97 )
98 execution_trace["phases"].append({
99 "phase": "persona_commentary",
100 "results_count": len(persona_results)
101 })
102
103 # ── Phase 5: Aggregation and Synthesis ───────────────────────────────────
104 aggregated_response = self._aggregate_and_synthesize(
105 query, persona_results, vector_results
106 )
107 execution_trace["phases"].append({
108 "phase": "aggregation",
109 "composite_score": aggregated_response["evaluation_score"],
110 "synthesis_length": len(aggregated_response["synthesis"])
111 })
112
113 # ── Phase 6: Governance Post-Check ───────────────────────────────────────
114 output_evaluation = self.governance.evaluate_output(
115 aggregated_response["synthesis"],
116 vector_results,
117 query,
118 governance_result.request_id
119 )
120 execution_trace["phases"].append({
121 "phase": "governance_postcheck",
122 "grounding_score": output_evaluation.grounding_score,
123 "hallucination_penalty": output_evaluation.hallucination_penalty,
124 "flagged_claims_count": len(output_evaluation.flagged_claims)
125 })
126
127 # ── Phase 7: Persona Evolution ────────────────────────────────────────────
128 self._update_persona_evolution(
129 activated_personas, persona_results,
130 aggregated_response["evaluation_score"], query_domain
131 )
132
133 # ── Phase 8: Prune underperformers ───────────────────────────────────────
134 self._run_pruning_cycle()
135
136 execution_trace["completed_at"] = datetime.utcnow().isoformat()
137
138 return {
139 "response": aggregated_response["synthesis"],
140 "evaluation": {
141 "composite_score": aggregated_response["evaluation_score"],
142 "grounding_score": output_evaluation.grounding_score,
143 "coherence_score": output_evaluation.coherence_score,
144 "hallucination_penalty": output_evaluation.hallucination_penalty
145 },
146 "provenance": {
147 "source_documents": [r["metadata"].get("source") for r in vector_results[:5]],
148 "activated_personas": [p.name for p in activated_personas],
149 "flagged_claims": output_evaluation.flagged_claims
150 },
151 "execution_trace": execution_trace
152 }
153
154 def _execute_persona_passes(self, query: str, personas: List[Persona],
155 vector_results: List[Dict],
156 source_node_ids: List[str]) -> List[Dict[str, Any]]:
157 """Execute parallel persona commentary passes."""
158 context = self._format_context_for_inference(vector_results)
159 results = []
160
161 for persona in personas:
162 start_time = datetime.utcnow()
163 system_prompt = persona.get_system_prompt(context=query)
164
165 inference_prompt = (
166 f"Based on the following context, provide your expert analysis:\n\n"
167 f"CONTEXT:\n{context}\n\n"
168 f"QUERY: {query}\n\n"
169 f"Provide a detailed analysis from your perspective as {persona.name}. "
170 f"Reference specific information from the context. "
171 f"Identify key insights and any limitations in the available information."
172 )
173
174 try:
175 commentary = self.inference_engine.generate(
176 inference_prompt, system_prompt, max_tokens=1500
177 )
178 latency_ms = (datetime.utcnow() - start_time).total_seconds() * 1000
179
180 results.append({
181 "persona_id": persona.id,
182 "persona_name": persona.name,
183 "commentary": commentary,
184 "relevance_score": self._score_relevance(commentary, query),
185 "key_insights": self._extract_key_insights(commentary),
186 "latency_ms": latency_ms,
187 "success": True
188 })
189 except Exception as e:
190 results.append({
191 "persona_id": persona.id,
192 "persona_name": persona.name,
193 "commentary": "",
194 "relevance_score": 0.0,
195 "key_insights": [],
196 "latency_ms": 0,
197 "success": False,
198 "error": str(e)
199 })
200
201 return results
202
203 def _aggregate_and_synthesize(self, query: str, persona_results: List[Dict],
204 vector_results: List[Dict]) -> Dict[str, Any]:
205 """Synthesize persona commentaries into a unified response."""
206 successful_results = [r for r in persona_results if r["success"]]
207
208 if not successful_results:
209 return {"synthesis": "No successful persona passes completed.", "evaluation_score": 0.0}
210
211 synthesis_prompt = (
212 "Synthesize the following expert analyses into a single, coherent response. "
213 "Preserve the key insights from each perspective. "
214 "Resolve contradictions explicitly. "
215 "Be precise about what is known versus inferred.\n\n"
216 )
217
218 for result in successful_results:
219 synthesis_prompt += (
220 f"### {result['persona_name']} Analysis:\n"
221 f"{result['commentary']}\n\n"
222 )
223
224 synthesis_prompt += f"\nQuery to address: {query}\n\nProvide a unified synthesis:"
225
226 synthesis = self.inference_engine.generate(
227 synthesis_prompt,
228 system_prompt="You are a synthesis engine. Combine multiple expert perspectives into clear, grounded analysis.",
229 max_tokens=2000
230 )
231
232 evaluation_score = self._evaluate_synthesis(
233 [r["commentary"] for r in successful_results],
234 [insight for r in successful_results for insight in r["key_insights"]],
235 query
236 )
237
238 return {"synthesis": synthesis, "evaluation_score": evaluation_score}
239
240 def _evaluate_synthesis(self, commentaries: List[str],
241 insights: List[str], query: str) -> float:
242 if not commentaries:
243 return 0.0
244
245 coverage = min(1.0, len(insights) / max(len(query.split()), 1) * 2.0)
246
247 if len(commentaries) < 2:
248 coherence = 1.0
249 else:
250 all_terms = [set(c.lower().split()) for c in commentaries]
251 pairwise_overlaps = []
252 for i in range(len(all_terms)):
253 for j in range(i + 1, len(all_terms)):
254 union = all_terms[i] | all_terms[j]
255 intersection = all_terms[i] & all_terms[j]
256 pairwise_overlaps.append(len(intersection) / max(len(union), 1))
257 coherence = sum(pairwise_overlaps) / max(len(pairwise_overlaps), 1)
258
259 query_terms = set(query.lower().split())
260 all_output = " ".join(commentaries).lower()
261 relevance = sum(1 for t in query_terms if t in all_output) / max(len(query_terms), 1)
262
263 return 0.4 * coverage + 0.3 * coherence + 0.3 * relevance
264
265 def _build_query_graph(self, query: str,
266 vector_results: List[Dict]) -> List[str]:
267 """Construct a query-scoped knowledge graph from retrieved documents."""
268 node_ids = []
269 for result in vector_results:
270 node = self.knowledge_graph.add_node(
271 label="DOCUMENT",
272 content=result["content"][:500],
273 source_document_id=result["id"],
274 confidence=result["relevance_score"]
275 )
276 node_ids.append(node.id)
277
278 # Connect related documents
279 for i in range(len(node_ids) - 1):
280 self.knowledge_graph.add_edge(
281 node_ids[i], node_ids[i + 1],
282 relationship="RELATED_TO",
283 weight=0.5,
284 established_by="query_construction"
285 )
286 return node_ids
287
288 def _update_persona_evolution(self, personas: List[Persona],
289 results: List[Dict],
290 aggregate_score: float, domain: str):
291 for persona in personas:
292 persona_result = next(
293 (r for r in results if r["persona_id"] == persona.id), None
294 )
295 if not persona_result:
296 continue
297
298 individual_score = persona_result.get("relevance_score", aggregate_score)
299 feedback_vector = {
300 trait_name: individual_score
301 for trait_name in persona.traits.keys()
302 }
303 persona.apply_bounded_update(feedback_vector)
304
305 persona.performance.total_queries += 1
306 persona.performance.total_score += individual_score
307 persona.performance.last_used = datetime.utcnow().isoformat()
308 persona.performance.domain_scores[domain] = (
309 persona.performance.domain_scores.get(domain, 0.5) * 0.8 +
310 individual_score * 0.2
311 )
312 if individual_score >= 0.6:
313 persona.performance.success_rate = (
314 persona.performance.success_rate * 0.9 + 0.1
315 )
316
317 def _run_pruning_cycle(self):
318 """Retire consistently underperforming personas."""
319 prune_threshold = self.config.get("prune_threshold", 0.3)
320 for persona_id, persona in list(self.persona_engine.active_personas.items()):
321 if (persona.performance.total_queries >= 10 and
322 persona.performance.average_score < prune_threshold):
323 self.persona_engine.prune_persona(
324 persona_id, reason=f"average_score {persona.performance.average_score:.2f} below threshold {prune_threshold}"
325 )
326
327 def _infer_domain(self, query: str, vector_results: List[Dict]) -> str:
328 domain_keywords = {
329 "code": ["function", "class", "algorithm", "implement", "debug", "code", "python", "typescript"],
330 "research": ["analyze", "study", "evidence", "research", "paper", "data", "statistics"],
331 "writing": ["write", "draft", "compose", "article", "blog", "narrative", "story"],
332 "architecture": ["system", "design", "architecture", "infrastructure", "deploy", "scale"],
333 "governance": ["compliance", "policy", "audit", "risk", "regulation", "governance"]
334 }
335 query_lower = query.lower()
336 domain_scores = {}
337 for domain, keywords in domain_keywords.items():
338 domain_scores[domain] = sum(1 for kw in keywords if kw in query_lower)
339 return max(domain_scores, key=domain_scores.get)
340
341 def _format_context_for_inference(self, vector_results: List[Dict]) -> str:
342 context_parts = []
343 for i, result in enumerate(vector_results[:5]):
344 source = result["metadata"].get("source", "unknown")
345 content = result["content"][:400]
346 score = result["relevance_score"]
347 context_parts.append(f"[Source {i+1}: {source} | Relevance: {score:.2f}]\n{content}")
348 return "\n\n".join(context_parts)
349
350 def _score_relevance(self, commentary: str, query: str) -> float:
351 query_terms = set(query.lower().split())
352 commentary_terms = set(commentary.lower().split())
353 return len(query_terms & commentary_terms) / max(len(query_terms), 1)
354
355 def _extract_key_insights(self, commentary: str) -> List[str]:
356 sentences = [s.strip() for s in commentary.split(".") if len(s.strip()) > 40]
357 return sentences[:3]
358
359 def _build_blocked_response(self, query: str, governance_result: Any,
360 trace: Dict) -> Dict[str, Any]:
361 return {
362 "response": f"Request blocked by governance layer. Reason: {governance_result.justification}",
363 "blocked": True,
364 "governance_result": governance_result.audit_record,
365 "execution_trace": trace
366 }
367
368 def _build_no_persona_response(self, query: str, trace: Dict) -> Dict[str, Any]:
369 return {
370 "response": "No active personas available for this query domain. Review persona configuration.",
371 "no_personas": True,
372 "execution_trace": trace
373 }

VIII. The SpecGen Module: Deterministic Code from Specification

One of the most powerful — and underutilized — components in the system is SpecGen: the deterministic code generation engine that produces production-ready implementations from structured technical specifications.

SpecGen was born from a frustration I could not resolve with vanilla LLM code generation: non-determinism. Given the same specification twice, most code generation systems will produce meaningfully different implementations. The patterns, the naming conventions, the error handling strategies, the test coverage — all of it varies with temperature and token sampling. This is fine for exploration. It is unacceptable for production infrastructure.

SpecGen solves this through three mechanisms: (1) a structured specification format that eliminates ambiguity before generation, (2) RAG-grounded generation that anchors output to your existing codebase patterns, and (3) a fixed-seed inference call that produces deterministic output given the same specification and context.

python
1# sovereign/specgen/spec_generator.py
2
3from dataclasses import dataclass, field
4from typing import Dict, List, Optional, Any
5import json
6import hashlib
7
8
9@dataclass
10class ComponentSpec:
11 """
12 A fully specified component for deterministic code generation.
13
14 Ambiguity in the spec means ambiguity in the output.
15 Every field is required because every field shapes the generated code.
16 Underspecified components produce underspecified implementations.
17 """
18 name: str
19 component_type: str # service, model, api_endpoint, utility, test, config
20 language: str # python, typescript, sql, yaml, bash
21 description: str
22 inputs: List[Dict[str, str]] # [{name, type, description, required}]
23 outputs: List[Dict[str, str]] # [{name, type, description}]
24 dependencies: List[str] # Other component names this depends on
25 constraints: List[str] # Explicit behavioral constraints
26 error_handling: List[str] # Error cases and handling strategies
27 test_scenarios: List[Dict] # [{name, given, when, then}]
28 existing_patterns: List[str] # Code patterns from codebase to follow
29
30 @property
31 def spec_hash(self) -> str:
32 """Deterministic hash of the specification — same spec = same hash = same code."""
33 spec_string = json.dumps(
34 {k: v for k, v in vars(self).items() if k != "spec_hash"},
35 sort_keys=True
36 )
37 return hashlib.sha256(spec_string.encode()).hexdigest()[:12]
38
39
40class SpecGenerator:
41 """
42 Deterministic code generation from structured specifications.
43
44 The key insight: LLM code generation is non-deterministic by default
45 because the prompt is underspecified and the sampling is random.
46 Remove the underspecification. Fix the seed.
47 Now the generation is deterministic.
48
49 Your codebase is a corpus. New code should be grounded in existing patterns.
50 SpecGen retrieves those patterns before generating.
51 The result is code that looks like it was written by the same author
52 as the rest of the codebase — because it was trained on the same corpus.
53 """
54
55 def __init__(self, config: Dict[str, Any], vector_store, inference_engine):
56 self.config = config
57 self.vector_store = vector_store
58 self.inference_engine = inference_engine
59 self.generation_seed = config.get("generation_seed", 42)
60 self.spec_cache: Dict[str, str] = {}
61
62 def generate_component(self, spec: ComponentSpec) -> Dict[str, Any]:
63 """Generate a complete, production-ready component from specification."""
64
65 # Check spec cache — same spec always produces same code
66 if spec.spec_hash in self.spec_cache:
67 return {
68 "code": self.spec_cache[spec.spec_hash],
69 "spec_hash": spec.spec_hash,
70 "cache_hit": True
71 }
72
73 # Retrieve existing patterns from the codebase
74 pattern_context = self._retrieve_existing_patterns(spec)
75
76 # Build deterministic generation prompt
77 generation_prompt = self._build_generation_prompt(spec, pattern_context)
78 system_prompt = self._build_system_prompt(spec)
79
80 # Generate with fixed seed for determinism
81 generated_code = self.inference_engine.generate(
82 generation_prompt,
83 system_prompt=system_prompt,
84 temperature=0.0, # Zero temperature: maximum determinism
85 seed=self.generation_seed,
86 max_tokens=3000
87 )
88
89 # Generate tests in a separate pass
90 test_code = self._generate_tests(spec, generated_code, pattern_context)
91
92 result = {
93 "component_name": spec.name,
94 "component_type": spec.component_type,
95 "language": spec.language,
96 "spec_hash": spec.spec_hash,
97 "implementation": generated_code,
98 "tests": test_code,
99 "dependencies": spec.dependencies,
100 "cache_hit": False
101 }
102
103 self.spec_cache[spec.spec_hash] = generated_code
104 return result
105
106 def _retrieve_existing_patterns(self, spec: ComponentSpec) -> str:
107 """Retrieve relevant code patterns from the existing codebase."""
108 search_query = f"{spec.component_type} {spec.language} {' '.join(spec.existing_patterns[:3])}"
109 results = self.vector_store.query(
110 search_query,
111 n_results=5,
112 where_filter={"doc_type": "code"}
113 )
114 if not results:
115 return "No existing patterns found in codebase."
116 return "\n\n".join([
117 f"# Pattern from {r['metadata'].get('source', 'unknown')}:\n{r['content']}"
118 for r in results
119 ])
120
121 def _build_generation_prompt(self, spec: ComponentSpec, pattern_context: str) -> str:
122 return f"""Generate a production-ready {spec.language} {spec.component_type} named {spec.name}.
123
124SPECIFICATION:
125- Description: {spec.description}
126- Inputs: {json.dumps(spec.inputs, indent=2)}
127- Outputs: {json.dumps(spec.outputs, indent=2)}
128- Dependencies: {', '.join(spec.dependencies)}
129- Constraints: {chr(10).join(f' - {c}' for c in spec.constraints)}
130- Error handling: {chr(10).join(f' - {e}' for e in spec.error_handling)}
131
132EXISTING CODEBASE PATTERNS TO FOLLOW:
133{pattern_context}
134
135Generate ONLY the implementation code. No preamble. No explanation. No markdown fences.
136The code must be complete, typed, and production-ready."""
137
138 def _build_system_prompt(self, spec: ComponentSpec) -> str:
139 language_instructions = {
140 "python": "Use type hints, dataclasses, explicit error handling, and docstrings. Follow PEP 8.",
141 "typescript": "Use strict TypeScript with explicit types. No `any`. Prefer interfaces over types for objects.",
142 "sql": "Use explicit column names, proper indexes, and transactional safety.",
143 }
144 return (
145 f"You are a senior software engineer generating production {spec.language} code. "
146 f"{language_instructions.get(spec.language, '')} "
147 f"Output ONLY valid {spec.language} code. No explanations."
148 )
149
150 def _generate_tests(self, spec: ComponentSpec, implementation: str,
151 pattern_context: str) -> str:
152 test_prompt = f"""Generate comprehensive tests for this {spec.language} {spec.component_type}.
153
154IMPLEMENTATION:
155{implementation}
156
157TEST SCENARIOS:
158{json.dumps(spec.test_scenarios, indent=2)}
159
160Generate complete test code following the patterns in the codebase.
161Cover success cases, edge cases, and each error handling scenario.
162Output ONLY test code."""
163 return self.inference_engine.generate(
164 test_prompt,
165 system_prompt=f"Generate complete {spec.language} tests. Output ONLY code.",
166 temperature=0.0,
167 seed=self.generation_seed,
168 max_tokens=2000
169 )

IX. The API Gateway: FastAPI Backend

python
1# sovereign/api/main.py
2
3from fastapi import FastAPI, HTTPException, BackgroundTasks, WebSocket
4from fastapi.middleware.cors import CORSMiddleware
5from pydantic import BaseModel, Field
6from typing import Dict, Any, Optional, List
7import uuid
8import yaml
9from sovereign.orchestration.moe_orchestrator import MoEOrchestrator
10from sovereign.governance.control_boundary import ControlBoundaryEngine
11
12
13def load_config(path: str = "./config/sovereign.yaml") -> Dict[str, Any]:
14 with open(path) as f:
15 return yaml.safe_load(f)
16
17
18config = load_config()
19app = FastAPI(
20 title="SOVEREIGN API",
21 description="Self-owned local-first AI orchestration. No cloud. No telemetry. Your inference.",
22 version="1.0.0"
23)
24
25app.add_middleware(
26 CORSMiddleware,
27 allow_origins=config.get("cors_origins", ["http://localhost:3000"]),
28 allow_credentials=True,
29 allow_methods=["*"],
30 allow_headers=["*"],
31)
32
33orchestrator = MoEOrchestrator(config)
34
35
36class QueryRequest(BaseModel):
37 query: str = Field(..., min_length=1, max_length=10000)
38 session_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
39 persona_override: Optional[List[str]] = None
40 domain_hint: Optional[str] = None
41 stream: bool = False
42
43
44class DocumentIngestRequest(BaseModel):
45 documents: List[Dict[str, Any]]
46 collection: Optional[str] = "default"
47 extract_entities: bool = True
48 build_graph_edges: bool = True
49
50
51@app.post("/query")
52async def query(request: QueryRequest) -> Dict[str, Any]:
53 """
54 Primary query endpoint. Runs the full 8-phase orchestration pipeline.
55 Returns response with evaluation scores, provenance, and execution trace.
56 """
57 try:
58 result = orchestrator.execute(
59 query=request.query,
60 session_id=request.session_id,
61 user_context={"domain_hint": request.domain_hint}
62 )
63 return result
64 except Exception as e:
65 raise HTTPException(status_code=500, detail=str(e))
66
67
68@app.websocket("/query/stream")
69async def query_stream(websocket: WebSocket):
70 """
71 Streaming query endpoint for real-time token delivery.
72 Every token comes from local inference.
73 """
74 await websocket.accept()
75 try:
76 data = await websocket.receive_json()
77 query_text = data.get("query", "")
78 session_id = data.get("session_id", str(uuid.uuid4()))
79
80 for token in orchestrator.inference_engine.generate_stream(query_text):
81 await websocket.send_json({"token": token, "done": False})
82
83 await websocket.send_json({"token": "", "done": True})
84 except Exception as e:
85 await websocket.send_json({"error": str(e), "done": True})
86 finally:
87 await websocket.close()
88
89
90@app.post("/documents/ingest")
91async def ingest_documents(request: DocumentIngestRequest,
92 background_tasks: BackgroundTasks) -> Dict[str, Any]:
93 """Ingest documents into the memory substrate (vector store + knowledge graph)."""
94 doc_ids = orchestrator.vector_store.embed_and_store(request.documents)
95 return {
96 "ingested_count": len(doc_ids),
97 "document_ids": doc_ids,
98 "collection": request.collection
99 }
100
101
102@app.get("/personas")
103async def list_personas() -> Dict[str, Any]:
104 """List all personas with their current lifecycle state and performance metrics."""
105 active = {
106 pid: {
107 "name": p.name,
108 "status": p.status,
109 "expertise": p.expertise,
110 "average_score": p.performance.average_score,
111 "total_queries": p.performance.total_queries,
112 "version": p.version
113 }
114 for pid, p in orchestrator.persona_engine.active_personas.items()
115 }
116 cold = {
117 pid: {"name": p.name, "status": p.status}
118 for pid, p in orchestrator.persona_engine.cold_storage.items()
119 }
120 return {"active": active, "cold_storage": cold}
121
122
123@app.post("/personas/{persona_id}/recall")
124async def recall_persona(persona_id: str, query_context: str) -> Dict[str, Any]:
125 """Attempt to recall a pruned persona based on query context."""
126 recalled = orchestrator.persona_engine.recall_persona(persona_id, query_context)
127 if recalled:
128 return {"recalled": True, "persona_name": recalled.name, "persona_id": recalled.id}
129 return {"recalled": False, "reason": "Context relevance below recall threshold"}
130
131
132@app.get("/audit/log")
133async def get_audit_log(limit: int = 50) -> Dict[str, Any]:
134 """Return the most recent audit log entries."""
135 import json
136 entries = []
137 try:
138 with open(config.get("governance_config", {}).get("audit_log_path", "./logs/audit.jsonl")) as f:
139 for line in f:
140 if line.strip():
141 entries.append(json.loads(line))
142 except FileNotFoundError:
143 entries = []
144 return {"entries": entries[-limit:], "total_count": len(entries)}
145
146
147@app.get("/health")
148async def health() -> Dict[str, Any]:
149 return {
150 "status": "sovereign",
151 "inference_mode": config.get("inference_config", {}).get("execution_mode", "ollama"),
152 "cloud_dependency": False,
153 "telemetry": False
154 }

X. Complete Project Scaffolding

This is the directory structure for a coding agent to construct from scratch. Every file listed is necessary. Every directory serves a specific architectural purpose.

text
1sovereign/
2├── README.md
3├── pyproject.toml
4├── docker-compose.yml
5├── Makefile
6
7├── config/
8│ ├── sovereign.yaml # Master configuration
9│ ├── personas/ # Persona definition templates
10│ │ ├── analytical.json
11│ │ ├── creative.json
12│ │ ├── technical.json
13│ │ ├── critical.json
14│ │ └── generalist.json
15│ └── model_registry.yaml # Local model routing table
16
17├── sovereign/ # Core Python package
18│ ├── __init__.py
19│ │
20│ ├── inference/
21│ │ ├── __init__.py
22│ │ └── local_engine.py # Ollama + llama.cpp unified interface
23│ │
24│ ├── memory/
25│ │ ├── __init__.py
26│ │ ├── knowledge_graph.py # Dual-substrate KG (Neo4j + NetworkX)
27│ │ ├── vector_store.py # ChromaDB/Qdrant local vector store
28│ │ └── document_loader.py # PDF, Markdown, HTML, JSON loaders
29│ │
30│ ├── reasoning/
31│ │ ├── __init__.py
32│ │ ├── persona_engine.py # Persona lifecycle + bounded evolution
33│ │ └── domain_classifier.py
34│ │
35│ ├── orchestration/
36│ │ ├── __init__.py
37│ │ ├── moe_orchestrator.py # 8-phase query execution pipeline
38│ │ └── agent_swarm.py # Multi-agent parallel execution
39│ │
40│ ├── governance/
41│ │ ├── __init__.py
42│ │ ├── control_boundary.py # Intent evaluation + output scoring
43│ │ └── audit_exporter.py # Export audit trail to CSV/JSON
44│ │
45│ ├── specgen/
46│ │ ├── __init__.py
47│ │ ├── spec_generator.py # Deterministic code generation
48│ │ └── spec_validator.py # Validate spec completeness before generation
49│ │
50│ └── api/
51│ ├── __init__.py
52│ ├── main.py # FastAPI application
53│ ├── middleware.py # Request logging, auth
54│ └── models.py # Pydantic request/response models
55
56├── frontend/ # Next.js 14 interface
57│ ├── package.json
58│ ├── tsconfig.json
59│ ├── next.config.ts
60│ ├── tailwind.config.ts
61│ │
62│ ├── app/
63│ │ ├── layout.tsx
64│ │ ├── page.tsx # Main chat interface
65│ │ ├── globals.css
66│ │ │
67│ │ ├── chat/
68│ │ │ └── page.tsx # Conversational query UI
69│ │ ├── personas/
70│ │ │ └── page.tsx # Persona management dashboard
71│ │ ├── knowledge/
72│ │ │ └── page.tsx # Knowledge graph visualization
73│ │ ├── audit/
74│ │ │ └── page.tsx # Audit log viewer
75│ │ └── specgen/
76│ │ └── page.tsx # SpecGen UI: spec input → code output
77│ │
78│ └── components/
79│ ├── ChatInterface.tsx
80│ ├── PersonaCard.tsx
81│ ├── GraphViewer.tsx # D3.js or Cytoscape knowledge graph viz
82│ ├── AuditLog.tsx
83│ ├── EvaluationScore.tsx
84│ ├── ProvenancePanel.tsx
85│ └── SpecForm.tsx
86
87├── data/
88│ ├── personas/
89│ │ ├── experimental/
90│ │ ├── active/
91│ │ ├── stable/
92│ │ ├── pruned/
93│ │ └── cold_storage/
94│ ├── chromadb/ # Local vector store persistence
95│ ├── graph_snapshots/ # Exported knowledge graph states
96│ └── documents/ # Source document repository
97
98├── logs/
99│ ├── audit.jsonl # Governance audit trail (append-only)
100│ ├── execution_traces/ # Per-query execution traces
101│ └── persona_evolution/ # Persona lifecycle change logs
102
103├── scripts/
104│ ├── setup.sh # One-command environment setup
105│ ├── ingest_documents.py # Batch document ingestion
106│ ├── create_persona.py # Interactive persona creation wizard
107│ ├── export_audit.py # Audit trail export utility
108│ ├── run_specgen.py # SpecGen CLI
109│ └── graph_snapshot.py # Export knowledge graph state
110
111└── tests/
112 ├── unit/
113 │ ├── test_knowledge_graph.py
114 │ ├── test_persona_engine.py
115 │ ├── test_control_boundary.py
116 │ ├── test_local_engine.py
117 │ └── test_spec_generator.py
118 ├── integration/
119 │ ├── test_orchestration_pipeline.py
120 │ └── test_api_endpoints.py
121 └── fixtures/
122 ├── sample_personas.json
123 ├── sample_documents/
124 └── sample_specs.json

XI. Configuration: The Master Manifest

yaml
1# config/sovereign.yaml
2# Every value here is yours to set. Nothing is a default you cannot override.
3# Read this file as a declaration of your own system's values.
4
5sovereign:
6 version: "1.0.0"
7 environment: "development" # development | production | air_gap
8
9inference_config:
10 execution_mode: "ollama" # ollama | llama_cpp | hybrid
11 ollama_endpoint: "http://localhost:11434"
12 default_model: "llama3.2"
13 seed: 42 # Reproducibility: same seed = same output
14 temperature: 0.1 # Low temperature: precision over creativity
15 max_tokens: 2000
16 model_registry:
17 routing:
18 code: "qwen2.5-coder:7b"
19 research: "llama3.2"
20 writing: "mistral:7b"
21 architecture: "llama3.2"
22 governance: "llama3.2"
23 paths: {} # For llama_cpp mode: model file paths
24
25graph_config:
26 neo4j_uri: "bolt://localhost:7687"
27 neo4j_user: "neo4j"
28 neo4j_password: "sovereign" # Change this before production
29 decay_factor: 0.95 # Temporal decay per session
30 prune_confidence_threshold: 0.1
31
32vector_config:
33 persist_directory: "./data/chromadb"
34 collection_name: "sovereign_documents"
35 embedding_model: "nomic-embed-text"
36
37persona_config:
38 personas_dir: "./data/personas"
39 max_parallel_personas: 3
40 prune_threshold: 0.3
41 recall_threshold: 0.3
42 evolution_rate: 0.05 # How quickly persona traits respond to feedback
43 min_queries_before_prune: 10
44
45governance_config:
46 audit_log_path: "./logs/audit.jsonl"
47 risk_thresholds:
48 block: 0.9
49 require_confirmation: 0.7
50 enhanced_logging: 0.4
51 reasonable_care_mode: true # Colorado AI Act alignment
52
53specgen_config:
54 generation_seed: 42
55 temperature: 0.0 # Zero temperature: maximum determinism
56 cache_generated_specs: true
57
58api_config:
59 host: "0.0.0.0"
60 port: 8000
61 cors_origins:
62 - "http://localhost:3000"
63
64frontend_config:
65 api_base_url: "http://localhost:8000"
66 websocket_url: "ws://localhost:8000/query/stream"
67 graph_visualization: "cytoscape" # d3 | cytoscape

XII. Bootstrap: From Zero to Sovereign in Ten Commands

bash
1# 1. Clone and enter
2git clone https://github.com/kliewerdaniel/sovereign.git
3cd sovereign
4
5# 2. Install Python dependencies
6pip install -r requirements.txt
7
8# 3. Install spaCy language model (for entity extraction in governance layer)
9python -m spacy download en_core_web_sm
10
11# 4. Start Ollama and pull your primary model
12ollama serve &
13ollama pull llama3.2
14ollama pull nomic-embed-text # For local embeddings
15
16# 5. Start Neo4j (optional: skip for pure in-memory graph)
17docker run -d \
18 --name sovereign-neo4j \
19 -p 7474:7474 -p 7687:7687 \
20 -e NEO4J_AUTH=neo4j/sovereign \
21 neo4j:latest
22
23# 6. Create directory structure
24python scripts/setup.sh
25
26# 7. Ingest your first documents
27python scripts/ingest_documents.py --source ./data/documents/
28
29# 8. Start the API backend
30uvicorn sovereign.api.main:app --reload --port 8000
31
32# 9. Start the frontend
33cd frontend && npm install && npm run dev
34
35# 10. Open your sovereign AI at http://localhost:3000
36# No API keys. No cloud. No telemetry.
37# Your hardware. Your inference. Your memory.
38echo "SOVEREIGN is running. You own this."

XIII. The Knowledge Graph of the Blog — Why This Project Is the Synthesis

Every post I have written on this blog is a node in a knowledge graph. Every project I have built is an edge between concepts. SOVEREIGN is the traversal of that graph from end to end — the path that passes through every significant node and resolves the relationships between them.

text
1[local inference] ──ENABLES──▶ [data sovereignty]
2[data sovereignty] ──REQUIRES──▶ [audit trails]
3[audit trails] ──REQUIRES──▶ [control boundary]
4[control boundary] ──GOVERNS──▶ [MoE orchestration]
5[MoE orchestration] ──ROUTES_TO──▶ [persona engine]
6[persona engine] ──QUERIES──▶ [knowledge graph]
7[knowledge graph] ──GROUNDS──▶ [RAG retrieval]
8[RAG retrieval] ──FEEDS──▶ [SpecGen]
9[SpecGen] ──GENERATES──▶ [new sovereign components]
10[new sovereign components] ──EXPAND──▶ [knowledge graph]
11
12 └── (the loop closes)

This is not a coincidence of architecture. It is the point. A sovereign AI system should be able to reason about its own architecture. The knowledge graph should contain documentation of the system itself. SpecGen should be able to generate new components for the system from its own specifications. The orchestrator should be able to route queries about how to improve the orchestrator.

The system is self-referential by design. Not self-modifying — you remain the author of every change. But self-aware in the sense that every component can be queried, explained, and improved using the system itself.

That is what sovereignty means at full depth. Not just that your data stays local. Not just that your inference is on-prem. But that the system you use to think can be used to improve the way you think, and the improvement remains yours.


XIV. What This Is Not

SOVEREIGN is not:

  • A replacement for the best frontier models. GPT-5 and Claude and Gemini outperform every local model on raw capability benchmarks. If capability on cloud hardware with their data on their telemetry is the only thing you care about, this architecture is not for you.

  • A finished product. It is an architecture. A blueprint. A starting point. The personas you define will shape it. The documents you ingest will train its memory. The governance thresholds you configure will determine its behavior. The code this post generates is scaffolding, not a ceiling.

  • A political statement against any particular company. It is a structural argument: systems designed to extract from you produce different architecture than systems designed to serve you. Both exist. The choice between them is yours to make.

What this is: the most complete expression of everything I understand about building AI systems that answer to the person running them. Every module in this codebase is the distillation of a problem I could not stop thinking about until I had an implementation that solved it.

Build it. Modify it. Extend it. Publish your modifications. The graph grows in every direction from here.


Closing: The Architecture Is the Argument

The code in this post is an argument.

The bounded update function Δw = f(feedback) × (1 − w) is an argument that stability matters — that a system should resist extremes, not optimize toward them.

The query-scoped knowledge graph is an argument that memory should be deliberate — that accumulation without discernment is not intelligence, it is noise.

The governance layer in the execution path is an argument that accountability cannot be post-hoc — that a system which can only be evaluated after the fact cannot be meaningfully controlled.

The local inference requirement is an argument that the execution path should belong to the person executing — that cognitive infrastructure has an owner, and that owner should be you.

Every design choice in SOVEREIGN is downstream of one question: who is this system for?

I built it for myself. And then I wrote it down so you could build it for yourself too.

That is what sovereignty means in practice: not the absence of dependency on everything, but the deliberate choice of which dependencies you accept and which you refuse. The cloud can keep the telemetry. You keep the mind.


Appendix A: Python Dependencies

toml
1# pyproject.toml
2[project]
3name = "sovereign"
4version = "1.0.0"
5description = "Self-owned local-first AI orchestration system"
6requires-python = ">=3.11"
7
8dependencies = [
9 # Core
10 "fastapi>=0.110.0",
11 "uvicorn[standard]>=0.29.0",
12 "pydantic>=2.6.0",
13 "pyyaml>=6.0",
14
15 # Inference
16 "requests>=2.31.0",
17
18 # Memory
19 "chromadb>=0.4.24",
20 "networkx>=3.2",
21 "neo4j>=5.18.0",
22
23 # Document processing
24 "pypdf>=4.1.0",
25 "python-docx>=1.1.0",
26 "markdown>=3.6",
27
28 # NLP / Entity extraction
29 "spacy>=3.7.4",
30
31 # Utilities
32 "python-multipart>=0.0.9",
33 "aiofiles>=23.2.1",
34 "websockets>=12.0",
35]
36
37[project.optional-dependencies]
38dev = [
39 "pytest>=8.1.0",
40 "pytest-asyncio>=0.23.0",
41 "httpx>=0.27.0",
42 "black>=24.3.0",
43 "ruff>=0.3.0",
44 "mypy>=1.9.0",
45]

Appendix B: Docker Compose

yaml
1# docker-compose.yml
2# Complete local stack. No external services. No internet required after initial pull.
3
4version: "3.9"
5
6services:
7 sovereign-api:
8 build: .
9 ports:
10 - "8000:8000"
11 volumes:
12 - ./data:/app/data
13 - ./logs:/app/logs
14 - ./config:/app/config
15 environment:
16 - OLLAMA_ENDPOINT=http://ollama:11434
17 - NEO4J_URI=bolt://neo4j:7687
18 depends_on:
19 - ollama
20 - neo4j
21 networks:
22 - sovereign-network
23
24 sovereign-frontend:
25 build: ./frontend
26 ports:
27 - "3000:3000"
28 environment:
29 - NEXT_PUBLIC_API_URL=http://localhost:8000
30 networks:
31 - sovereign-network
32
33 ollama:
34 image: ollama/ollama:latest
35 ports:
36 - "11434:11434"
37 volumes:
38 - ollama-models:/root/.ollama
39 deploy:
40 resources:
41 reservations:
42 devices:
43 - driver: nvidia
44 count: all
45 capabilities: [gpu]
46 networks:
47 - sovereign-network
48
49 neo4j:
50 image: neo4j:5
51 ports:
52 - "7474:7474"
53 - "7687:7687"
54 environment:
55 - NEO4J_AUTH=neo4j/sovereign
56 volumes:
57 - neo4j-data:/data
58 networks:
59 - sovereign-network
60
61volumes:
62 ollama-models:
63 neo4j-data:
64
65networks:
66 sovereign-network:
67 driver: bridge

SOVEREIGN is the synthesis of every system documented on this blog. Every component described here has a prior post that goes deeper on its individual design. The knowledge graph of danielkliewer.com is the context this post assumes you already carry. If you arrived here without that context, the blog is the prerequisite.

Repository: github.com/kliewerdaniel/sovereign

Series: Sovereignty Manifesto · Architecture as Autonomy · Architecture of Autonomy · Private Knowledge Graph · DeerFlow 2.0 · OpenClaw Guide · SOVEREIGN — This Post

Sovereign AI book cover

Sovereign AI: Building Local-First Intelligent Systems

by Daniel Kliewer · Paperback · 72 pages

The hands-on guide to building AI that runs on your hardware, keeps your data private, and eliminates cloud dependence. Working code included.