Good call - the server-side pattern is more robust. Here are several useful custom tools following that approach:
1. Task Management: store_task
from typing import Optional
from enum import Enum
class Priority(str, Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
URGENT = "urgent"
class Status(str, Enum):
TODO = "todo"
IN_PROGRESS = "in_progress"
BLOCKED = "blocked"
DONE = "done"
async def store_task(
self,
agent_state: AgentState,
actor: User,
task_id: str,
title: str,
priority: Priority,
status: Status = Status.TODO,
due_date: Optional[str] = None,
assigned_to: Optional[str] = None,
notes: Optional[str] = None
) -> str:
"""
Store or update a task in the agent's task list.
Args:
task_id: Unique identifier for the task
title: Brief description of the task
priority: Task priority (low, medium, high, urgent)
status: Current status (todo, in_progress, blocked, done)
due_date: Optional due date in ISO format (YYYY-MM-DD)
assigned_to: Optional person assigned to the task
notes: Optional additional context
"""
block_label = "tasks"
# Get existing tasks or create new structure
try:
existing_block = agent_state.memory.get_block(block_label)
tasks = json.loads(existing_block.value)
except (KeyError, json.JSONDecodeError):
tasks = {"tasks": {}}
# Update or add task
tasks["tasks"][task_id] = {
"title": title,
"priority": priority.value,
"status": status.value,
"due_date": due_date,
"assigned_to": assigned_to,
"notes": notes,
"updated_at": datetime.now().isoformat()
}
# Write to memory
json_content = json.dumps(tasks, indent=2)
try:
agent_state.memory.get_block(block_label)
agent_state.memory.update_block_value(label=block_label, value=json_content)
except KeyError:
from letta.schemas.block import Block
new_block = Block(
label=block_label,
value=json_content,
description="Task list with priorities, statuses, and assignments"
)
agent_state.memory.set_block(new_block)
await self.agent_manager.update_memory_if_changed_async(
agent_id=agent_state.id,
new_memory=agent_state.memory,
actor=actor
)
return f"Task '{title}' ({priority.value}) stored with status: {status.value}"
Use case: Project management agents, personal assistants, team coordination
2. Contact Tracking: store_contact_info
async def store_contact_info(
self,
agent_state: AgentState,
actor: User,
name: str,
relationship: str,
email: Optional[str] = None,
phone: Optional[str] = None,
company: Optional[str] = None,
last_interaction: Optional[str] = None,
notes: Optional[str] = None,
tags: Optional[str] = None # comma-separated
) -> str:
"""
Store or update contact information and relationship context.
Args:
name: Full name of the contact
relationship: Nature of relationship (colleague, client, friend, etc.)
email: Email address
phone: Phone number
company: Company/organization affiliation
last_interaction: Date of last interaction (ISO format)
notes: Additional context about the person
tags: Comma-separated tags (e.g., "technical,decision-maker")
"""
block_label = "contacts"
try:
existing_block = agent_state.memory.get_block(block_label)
contacts = json.loads(existing_block.value)
except (KeyError, json.JSONDecodeError):
contacts = {"contacts": {}}
contact_data = {
"relationship": relationship,
"email": email,
"phone": phone,
"company": company,
"last_interaction": last_interaction,
"notes": notes,
"tags": [tag.strip() for tag in tags.split(",")] if tags else [],
"updated_at": datetime.now().isoformat()
}
contacts["contacts"][name] = contact_data
json_content = json.dumps(contacts, indent=2)
try:
agent_state.memory.get_block(block_label)
agent_state.memory.update_block_value(label=block_label, value=json_content)
except KeyError:
from letta.schemas.block import Block
new_block = Block(
label=block_label,
value=json_content,
description="Contact information and relationship context"
)
agent_state.memory.set_block(new_block)
await self.agent_manager.update_memory_if_changed_async(
agent_id=agent_state.id,
new_memory=agent_state.memory,
actor=actor
)
return f"Contact info stored for {name} ({relationship})"
Use case: CRM agents, relationship managers, networking assistants
3. Meeting Notes: store_meeting_summary
async def store_meeting_summary(
self,
agent_state: AgentState,
actor: User,
meeting_date: str, # ISO format
title: str,
attendees: str, # comma-separated names
key_decisions: str,
action_items: str, # JSON array string: [{"owner": "name", "task": "description"}]
next_steps: Optional[str] = None
) -> str:
"""
Store meeting summary with decisions and action items.
Args:
meeting_date: Date of meeting (ISO format YYYY-MM-DD)
title: Meeting title or topic
attendees: Comma-separated list of attendee names
key_decisions: Major decisions made during the meeting
action_items: JSON array of action items with owners
next_steps: Optional follow-up actions or next meeting date
"""
block_label = "meetings"
try:
existing_block = agent_state.memory.get_block(block_label)
meetings = json.loads(existing_block.value)
except (KeyError, json.JSONDecodeError):
meetings = {"meetings": []}
# Parse action items
try:
parsed_action_items = json.loads(action_items)
except json.JSONDecodeError:
parsed_action_items = []
meeting_data = {
"date": meeting_date,
"title": title,
"attendees": [name.strip() for name in attendees.split(",")],
"key_decisions": key_decisions,
"action_items": parsed_action_items,
"next_steps": next_steps,
"recorded_at": datetime.now().isoformat()
}
meetings["meetings"].append(meeting_data)
json_content = json.dumps(meetings, indent=2)
try:
agent_state.memory.get_block(block_label)
agent_state.memory.update_block_value(label=block_label, value=json_content)
except KeyError:
from letta.schemas.block import Block
new_block = Block(
label=block_label,
value=json_content,
description="Meeting summaries with decisions and action items"
)
agent_state.memory.set_block(new_block)
await self.agent_manager.update_memory_if_changed_async(
agent_id=agent_state.id,
new_memory=agent_state.memory,
actor=actor
)
return f"Meeting summary stored: {title} on {meeting_date}"
Use case: Executive assistants, team coordination, project tracking
4. Preference Management: store_user_preference
from typing import Union
async def store_user_preference(
self,
agent_state: AgentState,
actor: User,
category: str,
preference_key: str,
preference_value: str,
notes: Optional[str] = None
) -> str:
"""
Store user preferences in structured format.
Args:
category: Preference category (e.g., "communication", "workflow", "ui")
preference_key: Specific preference identifier
preference_value: The preference value
notes: Optional context about why this preference exists
"""
block_label = "preferences"
try:
existing_block = agent_state.memory.get_block(block_label)
prefs = json.loads(existing_block.value)
except (KeyError, json.JSONDecodeError):
prefs = {"preferences": {}}
if category not in prefs["preferences"]:
prefs["preferences"][category] = {}
prefs["preferences"][category][preference_key] = {
"value": preference_value,
"notes": notes,
"updated_at": datetime.now().isoformat()
}
json_content = json.dumps(prefs, indent=2)
try:
agent_state.memory.get_block(block_label)
agent_state.memory.update_block_value(label=block_label, value=json_content)
except KeyError:
from letta.schemas.block import Block
new_block = Block(
label=block_label,
value=json_content,
description="User preferences organized by category"
)
agent_state.memory.set_block(new_block)
await self.agent_manager.update_memory_if_changed_async(
agent_id=agent_state.id,
new_memory=agent_state.memory,
actor=actor
)
return f"Preference stored: {category}.{preference_key} = {preference_value}"
Use case: Personalization agents, adaptive interfaces, user profiling
5. Time Series Data: store_metric
async def store_metric(
self,
agent_state: AgentState,
actor: User,
metric_name: str,
value: float,
unit: str,
timestamp: Optional[str] = None,
tags: Optional[str] = None # comma-separated
) -> str:
"""
Store time-series metric data.
Args:
metric_name: Name of the metric being tracked
value: Numeric value of the metric
unit: Unit of measurement (e.g., "USD", "hours", "count")
timestamp: Optional ISO timestamp (defaults to now)
tags: Optional comma-separated tags for filtering
"""
block_label = f"metrics_{metric_name.replace(' ', '_').lower()}"
if timestamp is None:
timestamp = datetime.now().isoformat()
try:
existing_block = agent_state.memory.get_block(block_label)
metrics = json.loads(existing_block.value)
except (KeyError, json.JSONDecodeError):
metrics = {
"metric_name": metric_name,
"unit": unit,
"data_points": []
}
data_point = {
"timestamp": timestamp,
"value": value,
"tags": [tag.strip() for tag in tags.split(",")] if tags else []
}
metrics["data_points"].append(data_point)
json_content = json.dumps(metrics, indent=2)
try:
agent_state.memory.get_block(block_label)
agent_state.memory.update_block_value(label=block_label, value=json_content)
except KeyError:
from letta.schemas.block import Block
new_block = Block(
label=block_label,
value=json_content,
description=f"Time series data for {metric_name} ({unit})"
)
agent_state.memory.set_block(new_block)
await self.agent_manager.update_memory_if_changed_async(
agent_id=agent_state.id,
new_memory=agent_state.memory,
actor=actor
)
return f"Metric recorded: {metric_name} = {value} {unit} at {timestamp}"
Use case: Analytics agents, performance tracking, habit monitoring
Common Patterns
All these tools follow the same structure:
- Typed parameters for schema enforcement
- Get or create memory block pattern
- Parse existing JSON if block exists
- Update data structure with new information
- Serialize to JSON with formatting
- Write to block using update or create
- Persist to database via agent_manager
- Return success message with key details
Benefits
Reliability: Schema violations caught at tool call time, not during JSON parsing
Maintainability: Each tool is self-contained and testable
Extensibility: Easy to add new structured data types
UI Integration: Predictable JSON structure makes UI rendering straightforward
Agent Friendliness: Simple tool signatures are easier for LLMs to use correctly than freeform JSON construction
These patterns should cover most structured data storage needs. Want me to explore any specific domain in more detail?