A bottom-up technical review of the full integration stack: from BaseAgent abstract class through ReasoningEnsemble to RAGReasoningEnsemble and the OpenAI-compatible API surface.
The root of all 9 reasoning strategies. Every agent inherits from BaseAgent, which provides
an OllamaClient connection, structured logging, and the contract: implement run(),
optionally override stream().
from abc import ABC, abstractmethod
from agent_reasoning.client import OllamaClient
from termcolor import colored
class BaseAgent(ABC):
def __init__(self, model="gemma3:270m", base_url=None):
self.client = OllamaClient(model=model, base_url=base_url)
self.name = "BaseAgent"
self.color = "white"
def log_thought(self, message):
print(colored(f"[{self.name}]: {message}", self.color))
@abstractmethod
def run(self, query) -> str:
pass
def stream(self, query) -> Generator[str, None, None]:
"""Default: yields the final result as one chunk."""
result = self.run(query)
if result:
yield result
class OllamaClient:
def __init__(self, model="gemma3:270m", base_url=None):
if base_url is None:
from agent_reasoning.config import get_ollama_host
base_url = get_ollama_host()
self.model = model
self.base_url = base_url
def generate(self, prompt, system=None, stream=True,
temperature=0.7, top_k=40, top_p=0.9,
num_predict=2048, stop=None):
url = f"{self.base_url}/api/generate"
# POSTs to Ollama, streams NDJSON chunks
# Yields response text incrementally
...
Each agent implements a distinct reasoning strategy. All inherit from BaseAgent and communicate with Ollama through OllamaClient. The AGENT_MAP provides both canonical and alias keys for flexible lookup.
AGENT_MAP = {
"standard": StandardAgent,
"cot": CoTAgent,
"chain_of_thought": CoTAgent, # alias
"tot": ToTAgent,
"tree_of_thoughts": ToTAgent, # alias
"react": ReActAgent,
"self_reflection": SelfReflectionAgent,
"reflection": SelfReflectionAgent, # alias
"consistency": ConsistencyAgent,
"self_consistency": ConsistencyAgent, # alias
"decomposed": DecomposedAgent,
"least_to_most": LeastToMostAgent,
"ltm": LeastToMostAgent, # alias
"recursive": RecursiveAgent,
"rlm": RecursiveAgent, # alias
}
# 9 unique agents, 14 total keys (5 aliases)
The ensemble orchestrates multiple reasoning strategies in parallel using a ThreadPoolExecutor (max 10 workers). Responses are clustered by semantic similarity using sentence-transformers embeddings, and the largest cluster wins via majority vote. Ties prefer Chain-of-Thought.
class ReasoningEnsemble:
def __init__(self,
model_name: str = "gemma3:270m",
similarity_threshold: float = 0.85,
embedding_model: str = "all-MiniLM-L6-v2"
):
self._executor = ThreadPoolExecutor(max_workers=10)
async def run(self, query, strategies, config=None) -> Dict:
# Single strategy → direct return (no voting)
# Multiple strategies → parallel execution + majority vote
return {
"winner": {strategy, response, vote_count},
"all_responses": [{strategy, response, duration_ms}, ...],
"total_duration_ms": float,
"voting_details": {clusters, threshold, total_responses}
}
def _majority_vote(self, responses):
# 1. Encode responses with SentenceTransformer
# 2. Compute cosine similarity matrix
# 3. Greedy clustering (threshold: 0.85)
# 4. Largest cluster wins; ties prefer CoT
...
def run_sync(self, query, strategies, config=None):
return asyncio.run(self.run(query, strategies, config))
RAGReasoningEnsemble wraps the base ReasoningEnsemble from agent-reasoning and adds three capabilities: RAG context retrieval from Oracle AI Database, database event logging via OraDBEventLogger, and a streaming execution trace protocol for real-time UI updates.
from agent_reasoning import ReasoningEnsemble
from agent_reasoning.agents import AGENT_MAP
class RAGReasoningEnsemble:
def __init__(self,
model_name: str = "gemma3:270m",
vector_store=None, # OraDBVectorStore
event_logger=None, # OraDBEventLogger
similarity_threshold=0.85
):
self.ensemble = ReasoningEnsemble( # Composition, not inheritance
model_name=model_name,
similarity_threshold=similarity_threshold
)
self.vector_store = vector_store
self.event_logger = event_logger
Query → _retrieve_context(query, collection)
→ collection_map: {
"PDF": "pdf_collection",
"Web": "web_collection",
"Repository": "repo_collection",
"General": "general_knowledge"
}
→ vector_store.query(query, collection_name, n_results=5)
→ Returns {
chunks: [{content, metadata, score}, ...],
sources: ["file.pdf", "page.html", ...],
avg_score: float
}
_build_augmented_prompt(query, context):
→ """Use the following context to answer the question.
If the context doesn't contain relevant information,
use your general knowledge.
Context:
[Source: file.pdf]
chunk text here...
[Source: page.html]
another chunk...
Question: {query}
Answer:"""
@dataclass
class ExecutionEvent:
timestamp: str # "%H:%M:%S"
event_type: str # see table below
message: str
data: Optional[Dict] = None
@dataclass
class ReasoningResult:
winner: Dict[str, Any]
all_responses: List[Dict]
execution_trace: List[ExecutionEvent]
rag_context: Optional[Dict]
total_duration_ms: float
voting_details: Optional[Dict]
| Event Type | Description | Data Payload |
|---|---|---|
| start | Query begins processing | Strategy count |
| rag | RAG context retrieved from Oracle | {chunks, score} |
| strategy_start | Individual strategy begins execution | Strategy icon + name |
| strategy_complete | Strategy finished with timing | {duration_ms} |
| voting | Majority voting / winner selection | Winner + vote count |
| complete | Full ensemble processing complete | Total duration |
| result | Final result (streaming mode only) | ReasoningResult |
STRATEGY_ICONS = {
"standard": "📝", "cot": "🔗",
"tot": "🌳", "react": "🛠️",
"self_reflection": "🪞", "consistency": "🔄",
"decomposed": "🧩", "least_to_most": "📈",
"recursive": "🔁",
}
All reasoning events are persisted to Oracle AI Database. The logger maintains 6 event tables, with REASONING_EVENTS specifically tracking ensemble executions including strategy selections, vote counts, durations, and RAG context metadata.
CREATE TABLE REASONING_EVENTS (
event_id VARCHAR2(100) PRIMARY KEY,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
query_text CLOB,
strategies_requested CLOB, -- JSON array
winner_strategy VARCHAR2(50),
winner_response CLOB,
vote_count NUMBER,
total_strategies NUMBER,
all_responses CLOB, -- JSON array
rag_enabled NUMBER(1),
collection_used VARCHAR2(200),
chunks_retrieved NUMBER,
total_duration_ms NUMBER,
parallel_execution NUMBER(1), -- always 1 for ensemble
config_json CLOB,
status VARCHAR2(50),
error_message CLOB
);
The OpenAI-compatible /v1/models endpoint presents each reasoning strategy (and its RAG variant)
as a selectable "model" in Open WebUI. This means 9 base strategies x 2 (with/without RAG) = 18 model IDs.
The REASONING_MODELS registry maps each model ID to its strategy key and RAG flag.
| Model ID | Strategy | RAG | Display Name |
|---|---|---|---|
| standard | standard | -- | Standard |
| standard-rag | standard | RAG | Standard + RAG |
| cot | cot | -- | Chain of Thought |
| cot-rag | cot | RAG | Chain of Thought + RAG |
| tot | tot | -- | Tree of Thoughts |
| tot-rag | tot | RAG | Tree of Thoughts + RAG |
| react | react | -- | ReAct |
| react-rag | react | RAG | ReAct + RAG |
| self-reflection | self_reflection | -- | Self-Reflection |
| self-reflection-rag | self_reflection | RAG | Self-Reflection + RAG |
| consistency | consistency | -- | Self-Consistency |
| consistency-rag | consistency | RAG | Self-Consistency + RAG |
| decomposed | decomposed | -- | Decomposed |
| decomposed-rag | decomposed | RAG | Decomposed + RAG |
| least-to-most | least_to_most | -- | Least-to-Most |
| least-to-most-rag | least_to_most | RAG | Least-to-Most + RAG |
| recursive | recursive | -- | Recursive |
| recursive-rag | recursive | RAG | Recursive + RAG |
The A2A (Agent-to-Agent) handler processes JSON-RPC 2.0 requests on POST /a2a.
Reasoning methods are routed through the RAGReasoningEnsemble for strategy execution,
while document and agent discovery methods operate through the standard A2A handler.
// Request
{
"jsonrpc": "2.0",
"method": "reasoning.execute",
"params": {
"query": "What is machine learning?",
"strategy": "cot-rag",
"collection": "PDF"
},
"id": 1
}
// Additional reasoning methods:
// reasoning.strategy - Execute a specific strategy
// reasoning.list - List available strategies
// Other A2A methods:
// document.query - Query documents via RAG
// document.upload - Upload documents
// agent.discover - Discover agent capabilities
// agent.register - Register new agents
Independent from the reasoning ensemble, the Agent Factory provides a 4-stage Chain-of-Thought
pipeline using LangChain-based agents. These use the LocalLLM wrapper (which calls Ollama)
and are activated when use_cot=True is passed to LocalRAGAgent.
def create_agents(llm, vector_store=None):
"""Create the set of specialized CoT agents."""
return {
"planner": PlannerAgent(llm), # Query → 3-4 plan steps
"researcher": ResearchAgent(llm, vs), # Steps → Source research
"reasoner": ReasoningAgent(llm), # Findings → Conclusions
"synthesizer": SynthesisAgent(llm), # Steps → Final answer
}
# Each agent uses LangChain ChatPromptTemplate
# and the LocalLLM wrapper that calls OllamaModelHandler
# Pipeline: Plan → Research → Reason → Synthesize
LocalRAGAgent provides two distinct processing paths based on the use_cot flag.
Both paths use the same underlying OraDBVectorStore for retrieval, but the reasoning
pipeline differs significantly.
self.ensemble = ReasoningEnsemble(...)), not subclassing.
This keeps the library layer clean and testable independently.
loop.run_in_executor().