Python API¶
NeuralMemory provides a comprehensive async Python API.
Quick Start¶
import asyncio
from neural_memory import Brain, BrainConfig
from neural_memory.storage import InMemoryStorage
from neural_memory.engine.encoder import MemoryEncoder
from neural_memory.engine.retrieval import ReflexPipeline
async def main():
# Create storage and brain
storage = InMemoryStorage()
brain = Brain.create("my-brain")
await storage.save_brain(brain)
storage.set_brain(brain.id)
# Encode memories
encoder = MemoryEncoder(storage, brain.config)
await encoder.encode("Met Alice to discuss API design")
await encoder.encode("Decided to use FastAPI for backend")
# Query memories
pipeline = ReflexPipeline(storage, brain.config)
result = await pipeline.query("What was decided?")
print(f"Answer: {result.context}")
print(f"Confidence: {result.confidence}")
asyncio.run(main())
Core Classes¶
Brain¶
The main container for a memory system.
from neural_memory import Brain, BrainConfig
# Create with default config
brain = Brain.create("my-brain")
# Create with custom config
config = BrainConfig(
decay_rate=0.1,
reinforcement_delta=0.05,
activation_threshold=0.2,
max_spread_hops=4,
max_context_tokens=1500
)
brain = Brain.create("my-brain", config=config)
# Access properties
print(brain.id)
print(brain.name)
print(brain.config)
BrainConfig¶
Configuration for brain behavior.
from neural_memory import BrainConfig
config = BrainConfig(
decay_rate=0.1, # How fast memories fade
reinforcement_delta=0.05, # Strength gain on access
activation_threshold=0.2, # Minimum activation
max_spread_hops=4, # Max traversal depth
max_context_tokens=1500 # Max response tokens
)
Neuron¶
Atomic unit of information.
from neural_memory import Neuron, NeuronType
neuron = Neuron(
id="neuron-123",
type=NeuronType.ENTITY,
content="Alice",
metadata={"role": "developer"}
)
Neuron Types:
NeuronType.TIME- Temporal referencesNeuronType.SPATIAL- LocationsNeuronType.ENTITY- People, thingsNeuronType.ACTION- ActivitiesNeuronType.STATE- ConditionsNeuronType.CONCEPT- Ideas, topicsNeuronType.SENSORY- ObservationsNeuronType.INTENT- Goals
Synapse¶
Connection between neurons.
from neural_memory import Synapse, SynapseType, Direction
synapse = Synapse(
id="synapse-456",
source_id="neuron-a",
target_id="neuron-b",
type=SynapseType.CAUSED_BY,
weight=0.8,
direction=Direction.UNIDIRECTIONAL
)
Synapse Types:
Temporal: HAPPENED_AT, BEFORE, AFTER, DURING
Causal: CAUSED_BY, LEADS_TO, ENABLES, PREVENTS
Associative: CO_OCCURS, RELATED_TO, SIMILAR_TO
Semantic: IS_A, HAS_PROPERTY, INVOLVES
Fiber¶
A signal pathway through the neural graph (a memory).
from neural_memory import Fiber
# Create with factory method
fiber = Fiber.create(
neuron_ids={"n1", "n2", "n3"},
synapse_ids={"s1", "s2"},
anchor_neuron_id="n1",
pathway=["n1", "n2", "n3"], # Ordered signal pathway
)
# Fiber properties
fiber.conductivity # 0.0-1.0, signal transmission quality
fiber.pathway # Ordered neuron sequence
fiber.pathway_length # Number of neurons in pathway
fiber.last_conducted # When last activated (datetime or None)
# Immutable operations (return new Fiber)
fiber = fiber.with_conductivity(0.8)
fiber = fiber.conduct(reinforce=True) # +0.02 conductivity
fiber = fiber.conduct(conducted_at=now) # Set activation time
fiber = fiber.with_salience(0.9)
fiber = fiber.add_tags("important", "reviewed")
# Pathway queries
fiber.pathway_position("n2") # Returns 1 (index in pathway)
fiber.is_in_pathway("n2") # Returns True
Storage Backends¶
InMemoryStorage¶
For testing and development.
from neural_memory.storage import InMemoryStorage
storage = InMemoryStorage()
# Operations
await storage.add_neuron(neuron)
await storage.get_neuron("neuron-id")
await storage.find_neurons(type=NeuronType.ENTITY)
await storage.add_synapse(synapse)
await storage.get_synapses(source_id="neuron-a")
await storage.add_fiber(fiber)
await storage.get_fiber("fiber-id")
SQLiteStorage¶
For persistent single-user storage.
from neural_memory.storage import SQLiteStorage
storage = SQLiteStorage("./brain.db")
await storage.initialize()
brain = Brain.create("my-brain")
await storage.save_brain(brain)
storage.set_brain(brain.id)
# Use same API as InMemoryStorage
await storage.add_neuron(neuron)
SharedStorage¶
For remote server connection.
from neural_memory.storage import SharedStorage
async with SharedStorage("http://localhost:8000", "brain-id") as storage:
neurons = await storage.find_neurons()
await storage.add_neuron(neuron)
Engine Components¶
MemoryEncoder¶
Encodes text into neurons and synapses.
from neural_memory.engine.encoder import MemoryEncoder
encoder = MemoryEncoder(storage, brain.config)
# Encode text
result = await encoder.encode("Met Alice at coffee shop")
print(f"Fiber: {result.fiber.id}")
print(f"Neurons created: {len(result.neurons_created)}")
print(f"Synapses created: {len(result.synapses_created)}")
ReflexPipeline¶
Query memories using spreading activation.
from neural_memory.engine.retrieval import ReflexPipeline, DepthLevel
# Hybrid mode (default) - reflex trail + classic BFS discovery
pipeline = ReflexPipeline(storage, brain.config, use_reflex=True)
# Classic mode - distance-based activation only (pre-v0.6.0 behavior)
pipeline = ReflexPipeline(storage, brain.config, use_reflex=False)
# Basic query
result = await pipeline.query("What did Alice say?")
# Query with depth
result = await pipeline.query(
"Why did we choose PostgreSQL?",
depth=DepthLevel.DEEP,
max_tokens=1000
)
print(f"Context: {result.context}")
print(f"Confidence: {result.confidence}")
print(f"Neurons activated: {result.neurons_activated}")
print(f"Depth used: {result.depth_used}")
print(f"Co-activations: {len(result.co_activations)}")
Depth Levels:
DepthLevel.INSTANT- 1 hop, direct answersDepthLevel.CONTEXT- 2-3 hops, surrounding contextDepthLevel.HABIT- 4+ hops, patternsDepthLevel.DEEP- Full traversal, causal chains
ReflexActivation ¶
Trail-based activation engine (v0.6.0+).
from neural_memory.engine.reflex_activation import ReflexActivation
reflex = ReflexActivation(storage, brain.config)
# Activate along fiber pathways
results = await reflex.activate_trail(
anchor_neurons=["time_neuron_1", "entity_neuron_2"],
fibers=fibers,
reference_time=datetime.utcnow(),
)
# results: dict[str, ActivationResult]
for neuron_id, result in results.items():
print(f"{neuron_id}: level={result.activation_level}, hops={result.hop_distance}")
# Combined activation with co-activation detection
activations, co_activations = await reflex.activate_with_co_binding(
anchor_sets=[["time_1"], ["entity_1"]],
fibers=fibers,
)
CoActivation ¶
Represents neurons that co-fired from multiple anchor sets (Hebbian binding).
from neural_memory.engine.reflex_activation import CoActivation
# Created by ReflexActivation.find_co_activated()
co = CoActivation(
neuron_ids=frozenset(["n1", "n2"]),
temporal_window_ms=500,
co_fire_count=3, # Activated by 3 anchor sets
binding_strength=1.0, # 3/3 = perfect co-activation
source_anchors=["time_1", "entity_1", "concept_1"],
)
DecayManager¶
Apply memory decay (Ebbinghaus curve).
from neural_memory.engine.lifecycle import DecayManager
manager = DecayManager(
decay_rate=0.1,
prune_threshold=0.01,
min_age_days=1.0
)
# Apply decay
report = await manager.apply_decay(storage)
print(f"Neurons decayed: {report.neurons_decayed}")
print(f"Synapses decayed: {report.synapses_decayed}")
print(f"Neurons pruned: {report.neurons_pruned}")
# Dry run
report = await manager.apply_decay(storage, dry_run=True)
ReinforcementManager¶
Strengthen frequently accessed paths.
from neural_memory.engine.lifecycle import ReinforcementManager
manager = ReinforcementManager(reinforcement_delta=0.05)
await manager.reinforce(
storage,
activated_neurons=["n1", "n2"],
activated_synapses=["s1"]
)
DBTrainer¶
Train a brain from database schema knowledge (v1.6.0+).
from neural_memory.engine.db_trainer import DBTrainer, DBTrainingConfig
# Configure training
config = DBTrainingConfig(
connection_string="sqlite:///data/ecommerce.db",
domain_tag="ecommerce",
max_tables=100,
consolidate=True,
)
# Train
trainer = DBTrainer(storage, brain.config)
result = await trainer.train(config)
print(f"Tables: {result.tables_processed}")
print(f"Relationships: {result.relationships_mapped}")
print(f"Patterns: {result.patterns_detected}")
print(f"Fingerprint: {result.schema_fingerprint}")
DBTrainingConfig options:
| Parameter | Type | Default | Description |
|---|---|---|---|
connection_string |
str | required | Database URI (e.g., sqlite:///path/to/db) |
domain_tag |
str | "" |
Tag applied to all schema knowledge |
brain_name |
str | "" |
Target brain (empty = current) |
consolidate |
bool | True |
Run ENRICH consolidation after |
salience_ceiling |
float | 0.5 |
Cap initial fiber salience |
initial_stage |
str | "episodic" |
Maturation stage |
include_patterns |
bool | True |
Detect schema patterns |
include_relationships |
bool | True |
Create FK-based synapses |
max_tables |
int | 100 |
Maximum tables to process |
Schema introspection (advanced):
from neural_memory.engine.db_introspector import SchemaIntrospector
introspector = SchemaIntrospector()
snapshot = await introspector.introspect("sqlite:///data/app.db")
for table in snapshot.tables:
print(f"{table.name}: {len(table.columns)} cols, {len(table.foreign_keys)} FKs")
print(f"Fingerprint: {snapshot.schema_fingerprint}")
ContextOptimizer¶
Smart context selection with composite scoring (v2.6.0+).
from neural_memory.engine.context_optimizer import (
optimize_context,
compute_composite_score,
ContextItem,
ContextPlan,
)
# Optimize fibers for context injection
plan = await optimize_context(storage, fibers, max_tokens=4000)
print(f"Items: {len(plan.items)}")
print(f"Tokens used: {plan.total_tokens}")
print(f"Dropped: {plan.dropped_count}")
for item in plan.items:
print(f" {item.fiber_id}: score={item.score:.3f}, tokens={item.token_count}")
Composite Score Weights:
| Factor | Weight | Description |
|---|---|---|
| Activation | 0.30 | Neuron activation level |
| Priority | 0.25 | TypedMemory priority (0-10) |
| Frequency | 0.20 | Fiber access count (capped at 20) |
| Conductivity | 0.15 | Fiber signal quality (0.0-1.0) |
| Freshness | 0.10 | Creation recency score |
QueryPatternMining¶
Learn topic co-occurrence patterns from recall history (v2.6.0+).
from neural_memory.engine.query_pattern_mining import (
extract_topics,
mine_query_topic_pairs,
learn_query_patterns,
suggest_follow_up_queries,
)
# Extract topics from a query
topics = extract_topics("authentication jwt tokens")
# → ["authentication", "jwt", "tokens"]
# Mine patterns from recall events (called during consolidation)
report = await learn_query_patterns(storage, brain.config, utcnow())
print(f"Topics: {report.topics_extracted}")
print(f"Patterns: {report.patterns_learned}")
# Get follow-up suggestions based on learned patterns
suggestions = await suggest_follow_up_queries(storage, topics, brain.config)
# → ["middleware", "routing", "express"] (if learned)
Alert¶
Proactive brain health alert (v2.6.0+).
from neural_memory.core.alert import Alert, AlertType, AlertStatus
# Alerts are created automatically by the health pulse system
# Manage them via storage methods:
# List active alerts
alerts = await storage.get_active_alerts(limit=50)
for alert in alerts:
print(f"[{alert.severity}] {alert.alert_type}: {alert.message}")
# Mark as seen
await storage.mark_alerts_seen([alert.id for alert in alerts])
# Acknowledge (prevents auto-resolution)
await storage.mark_alert_acknowledged(alert_id)
# Auto-resolve by type
await storage.resolve_alerts_by_type(["high_neuron_count"])
Alert Types: high_neuron_count, high_fiber_count, high_synapse_count, low_connectivity, high_orphan_ratio, expired_memories, stale_fibers
Alert Statuses: active → seen → acknowledged → resolved
Typed Memories¶
from neural_memory.core import TypedMemory, MemoryType, Priority
# Create typed memory
memory = TypedMemory.create(
fiber_id="fiber-123",
memory_type=MemoryType.DECISION,
priority=Priority.HIGH,
source="user_input",
expires_in_days=30,
tags={"project", "auth"}
)
await storage.add_typed_memory(memory)
# Query by type
decisions = await storage.find_typed_memories(
memory_type=MemoryType.DECISION,
min_priority=Priority.NORMAL
)
Memory Types:
MemoryType.FACTMemoryType.DECISIONMemoryType.PREFERENCEMemoryType.TODOMemoryType.INSIGHTMemoryType.CONTEXTMemoryType.INSTRUCTIONMemoryType.ERRORMemoryType.WORKFLOWMemoryType.REFERENCE
Projects¶
from neural_memory.core import Project
# Create project
project = Project.create(
name="Q1 Sprint",
description="First quarter sprint",
duration_days=14,
tags={"sprint", "q1"}
)
await storage.add_project(project)
# Associate memories
memory = TypedMemory.create(
fiber_id="fiber-123",
memory_type=MemoryType.TODO,
project_id=project.id
)
# Query project memories
memories = await storage.get_project_memories(project.id)
Export & Import¶
from neural_memory.sharing import BrainExporter
exporter = BrainExporter()
# Export
snapshot = await exporter.export(storage, brain.id)
json_data = exporter.to_json(snapshot)
# Save to file
with open("brain.json", "w") as f:
f.write(json_data)
# Import
from neural_memory.sharing import BrainImporter
importer = BrainImporter()
await importer.import_brain(storage, snapshot, "new-brain-id")
Complete Example¶
import asyncio
from neural_memory import Brain, BrainConfig
from neural_memory.storage import SQLiteStorage
from neural_memory.engine.encoder import MemoryEncoder
from neural_memory.engine.retrieval import ReflexPipeline, DepthLevel
from neural_memory.engine.lifecycle import DecayManager
async def main():
# Setup
storage = SQLiteStorage("./memories.db")
await storage.initialize()
brain = Brain.create("work", config=BrainConfig(
max_context_tokens=2000
))
await storage.save_brain(brain)
storage.set_brain(brain.id)
encoder = MemoryEncoder(storage, brain.config)
pipeline = ReflexPipeline(storage, brain.config)
# Store memories
await encoder.encode("Met with team to discuss architecture")
await encoder.encode("DECISION: Use microservices. REASON: Better scaling")
await encoder.encode("TODO: Set up CI/CD pipeline")
# Query
result = await pipeline.query(
"What architecture decision was made?",
depth=DepthLevel.CONTEXT
)
print(f"Answer: {result.context}")
# Apply decay (for old memories)
decay_manager = DecayManager()
report = await decay_manager.apply_decay(storage, dry_run=True)
print(f"Would decay {report.neurons_decayed} neurons")
asyncio.run(main())