Dynamic tools and error handling

I have moved the imports inside the method bodies, also checked the check_environment log file and the venv does have the packages from requirements.txt. However, the other tools like download still fail.

Using a log file is cumbersome and time consuming - is there another way of directly viewing logs from a tool method, for example printing these out to the server console? This would make debugging a whole lot easier

Yes, there’s a much better way to see tool execution output in real-time.

Method 1: Watch Server Logs in Real-Time

Instead of checking log files, tail the Docker logs while your agent runs:

# In one terminal, watch logs
docker logs -f <container_id>

# In another terminal, run your agent code
python your_agent_script.py

Any print() statements or Python logging in your tools should appear immediately in the Docker logs.

Method 2: Add Print Statements to Your Tools

def download_data(
    url: str,
    output_path: str = "~/natural_earth",
    api_key: str = None,
    max_retries: int = 5,
    local_path: str = None,
) -> str:
    """Downloads a dataset from a URL and extracts it if it's a zip file."""
    import os
    import zipfile
    import requests
    from time import sleep
    from urllib.parse import urlparse
    
    print(f"[TOOL] download_data called with url={url}")
    print(f"[TOOL] output_path={output_path}")
    print(f"[TOOL] Expanded path: {os.path.expanduser(output_path)}")
    
    try:
        # Expand home directory
        output_path = os.path.expanduser(output_path)
        os.makedirs(output_path, exist_ok=True)
        print(f"[TOOL] Created directory: {output_path}")
        
        # Parse URL
        parsed_url = urlparse(url)
        filename = os.path.basename(parsed_url.path) or "downloaded_file.zip"
        file_path = os.path.join(output_path, filename)
        print(f"[TOOL] Downloading to: {file_path}")
        
        # Download
        response = requests.get(url, stream=True, timeout=30)
        response.raise_for_status()
        print(f"[TOOL] Download started, status code: {response.status_code}")
        
        with open(file_path, "wb") as f:
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)
        
        print(f"[TOOL] Download complete: {file_path}")
        
        # Extract if zip
        if zipfile.is_zipfile(file_path):
            print(f"[TOOL] Extracting zip file...")
            with zipfile.ZipFile(file_path, "r") as zip_ref:
                zip_ref.extractall(output_path)
            print(f"[TOOL] Extraction complete")
            return f"Successfully downloaded and extracted {url} to {output_path}"
        else:
            print(f"[TOOL] Not a zip file, kept as-is")
            return f"Successfully downloaded {url} to {file_path}"
            
    except Exception as e:
        error_msg = f"ERROR in download_data: {type(e).__name__}: {str(e)}"
        print(f"[TOOL] {error_msg}")
        import traceback
        print(f"[TOOL] Traceback:\n{traceback.format_exc()}")
        return error_msg

Method 3: Use Python Logging Module

def download_data(url: str, output_path: str = "~/natural_earth", **kwargs) -> str:
    """Downloads a dataset from a URL and extracts it if it's a zip file."""
    import os
    import zipfile
    import requests
    import logging
    import sys
    
    # Configure logging to output to stdout (will appear in docker logs)
    logging.basicConfig(
        level=logging.DEBUG,
        format='[TOOL %(levelname)s] %(message)s',
        stream=sys.stdout,
        force=True  # Override any existing config
    )
    logger = logging.getLogger(__name__)
    
    logger.info(f"download_data called with url={url}")
    logger.debug(f"output_path={output_path}")
    
    try:
        output_path = os.path.expanduser(output_path)
        logger.debug(f"Expanded path: {output_path}")
        
        os.makedirs(output_path, exist_ok=True)
        logger.info(f"Created directory: {output_path}")
        
        # ... rest of your code with logger.info() and logger.error()
        
    except Exception as e:
        logger.error(f"Exception: {type(e).__name__}: {str(e)}", exc_info=True)
        return f"ERROR: {str(e)}"

Method 4: Return Verbose Error Information

Structure your tool returns to include debug info:

def download_data(url: str, output_path: str = "~/natural_earth", **kwargs) -> str:
    """Downloads a dataset from a URL and extracts it if it's a zip file."""
    import os
    import zipfile
    import requests
    import traceback
    
    debug_info = []
    debug_info.append(f"Input URL: {url}")
    debug_info.append(f"Input path: {output_path}")
    
    try:
        output_path = os.path.expanduser(output_path)
        debug_info.append(f"Expanded path: {output_path}")
        
        os.makedirs(output_path, exist_ok=True)
        debug_info.append(f"Directory created/verified: {output_path}")
        
        # Download logic...
        
        return "\n".join(debug_info) + "\n✓ SUCCESS"
        
    except Exception as e:
        error_details = [
            "\n".join(debug_info),
            f"\n✗ FAILED",
            f"Error type: {type(e).__name__}",
            f"Error message: {str(e)}",
            f"Traceback:\n{traceback.format_exc()}"
        ]
        return "\n".join(error_details)

Best Debugging Workflow:

  1. Add print statements to your tools
  2. Run docker logs -f <container_id> in one terminal
  3. Execute your agent in another terminal
  4. Watch tool execution output in real-time
  5. Tool return values will also show detailed progress/errors

What’s Likely Failing Now:

Given that imports work but download still fails, check:

  1. Network access from container - Can the container reach external URLs?
  2. File permissions - Can the tool write to the output directory?
  3. Path issues - Is ~/natural_earth resolving correctly inside the container?

Try this minimal test tool first:

def test_basics() -> str:
    """Test basic functionality."""
    import os
    import sys
    
    print("[TEST] Python executable:", sys.executable)
    print("[TEST] Working directory:", os.getcwd())
    print("[TEST] Home directory:", os.path.expanduser("~"))
    
    # Test write permissions
    test_file = "/tmp/letta_test.txt"
    try:
        with open(test_file, "w") as f:
            f.write("test")
        print(f"[TEST] ✓ Can write to {test_file}")
        os.remove(test_file)
    except Exception as e:
        print(f"[TEST] ✗ Cannot write: {e}")
    
    # Test network
    try:
        import requests
        r = requests.get("https://httpbin.org/get", timeout=5)
        print(f"[TEST] ✓ Network access works, status: {r.status_code}")
    except Exception as e:
        print(f"[TEST] ✗ Network failed: {e}")
    
    return "Test complete - check docker logs"

Register and call this, then check your docker logs output.

Nope - print statement or logger.info do not work, they do not show up in server logs. The only success so far is printing to file

That’s a significant finding - it means Letta’s local sandbox is capturing or redirecting stdout/stderr during tool execution, so standard output doesn’t make it back to the Docker logs.

Your Best Option: Return Verbose Debug Info

Since file logging works but stdout doesn’t, use Method 4 from my previous post - return detailed execution information directly from the tool:

def download_data(
    url: str,
    output_path: str = "~/natural_earth",
    api_key: str = None,
    max_retries: int = 5,
    local_path: str = None,
) -> str:
    """Downloads a dataset from a URL and extracts it if it's a zip file."""
    import os
    import zipfile
    import requests
    import traceback
    from urllib.parse import urlparse
    
    # Build verbose output as we go
    output_lines = []
    output_lines.append("=== DOWNLOAD_DATA EXECUTION LOG ===")
    output_lines.append(f"URL: {url}")
    output_lines.append(f"Output path (input): {output_path}")
    
    try:
        # Expand path
        output_path = os.path.expanduser(output_path)
        output_lines.append(f"Output path (expanded): {output_path}")
        
        # Create directory
        os.makedirs(output_path, exist_ok=True)
        output_lines.append(f"✓ Directory created/verified: {output_path}")
        
        # Parse URL
        parsed_url = urlparse(url)
        filename = os.path.basename(parsed_url.path) or "downloaded_file.zip"
        file_path = os.path.join(output_path, filename)
        output_lines.append(f"Target file: {file_path}")
        
        # Download
        output_lines.append(f"Initiating download...")
        response = requests.get(url, stream=True, timeout=30)
        response.raise_for_status()
        output_lines.append(f"✓ HTTP {response.status_code} - Download started")
        
        total_size = 0
        with open(file_path, "wb") as f:
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)
                total_size += len(chunk)
        
        output_lines.append(f"✓ Downloaded {total_size} bytes to {file_path}")
        
        # Extract if zip
        if zipfile.is_zipfile(file_path):
            output_lines.append(f"Detected zip file, extracting...")
            with zipfile.ZipFile(file_path, "r") as zip_ref:
                members = zip_ref.namelist()
                output_lines.append(f"Zip contains {len(members)} files")
                zip_ref.extractall(output_path)
            output_lines.append(f"✓ Extraction complete")
            
            # List extracted files (limit to first 10)
            try:
                extracted = os.listdir(output_path)
                output_lines.append(f"Extracted files ({len(extracted)} total):")
                for item in extracted[:10]:
                    output_lines.append(f"  - {item}")
                if len(extracted) > 10:
                    output_lines.append(f"  ... and {len(extracted) - 10} more")
            except Exception as e:
                output_lines.append(f"Could not list extracted files: {e}")
            
            result = "SUCCESS"
        else:
            output_lines.append(f"Not a zip file, kept as single file")
            result = "SUCCESS"
        
        output_lines.append(f"\n=== {result} ===")
        return "\n".join(output_lines)
        
    except Exception as e:
        output_lines.append(f"\n=== FAILURE ===")
        output_lines.append(f"Error type: {type(e).__name__}")
        output_lines.append(f"Error message: {str(e)}")
        output_lines.append(f"\nFull traceback:")
        output_lines.append(traceback.format_exc())
        return "\n".join(output_lines)

Why This Works:

The tool’s return value DOES get captured and sent back to the agent (and visible in your client). So by building a detailed log as the function executes and returning it, you get full visibility into what happened.

Bonus: Dual Logging

You can combine file logging AND verbose returns:

def download_data(url: str, output_path: str = "~/natural_earth", **kwargs) -> str:
    """Downloads with dual logging: file + return value."""
    import os
    import zipfile
    import requests
    import traceback
    from datetime import datetime
    
    log_file = "/tmp/letta_download.log"
    output_lines = []
    
    def log(message):
        """Log to both file and return output."""
        output_lines.append(message)
        try:
            with open(log_file, "a") as f:
                f.write(f"{message}\n")
        except:
            pass
    
    log(f"=== DOWNLOAD START {datetime.now()} ===")
    log(f"URL: {url}")
    
    try:
        # Your download logic with log() calls
        output_path = os.path.expanduser(output_path)
        log(f"Expanded path: {output_path}")
        
        # ... rest of implementation ...
        
        log("=== SUCCESS ===")
        return "\n".join(output_lines)
        
    except Exception as e:
        log(f"=== FAILURE ===")
        log(f"Error: {type(e).__name__}: {str(e)}")
        log(f"Traceback:\n{traceback.format_exc()}")
        return "\n".join(output_lines)

What You’ll See:

When the agent calls your tool, the full execution log will appear in the agent’s message stream. You’ll see exactly where it succeeds or fails.

Testing:

Update your download_data tool with verbose returns, re-register it, and run your agent. The agent’s response should now include the full execution log showing exactly what happened during the download attempt.

This is the current download tool method

def download_data(

url: str,

output_path: str = None,

api_key: str = None,

max_retries: int = 5,

local_path: str = None,

) → str:

“”"

Downloads a dataset from a URL and extracts it if it's a zip file. Checks for local dataset first.

Args:

    url (str): URL of the dataset (e.g., zip file).

    output_path (str, optional): Local path to save the extracted dataset (default: \~/natural_earth).

    api_key (str, optional): API key for authenticated downloads.

    max_retries (int): Number of retry attempts for download.

    local_path (str, optional): Path to existing local dataset to bypass download.

Returns:

    str: Path to the saved dataset or error message.

"""

import os

import zipfile

import requests

from time import sleep

from urllib.parse import urlparse

import socket

import logging

logging.basicConfig(level=logging.INFO)

logger = logging.getLogger(\__name_\_)

try:

# Set default output_path inside method

if output_path is None:

        output_path = os.path.expanduser("\~/natural_earth")

# Verify os module

if not hasattr(os, “makedirs”):

return (

“Error: ‘os’ module not available in this environment. Contact admin.”

        )

# Check for local dataset

if local_path and os.path.exists(local_path):

        logger.info(f"Using existing local dataset at {local_path}")

return local_path

    local_shp = os.path.join(output_path, "ne_110m_populated_places.shp")

if os.path.exists(local_shp):

        logger.info(f"Using existing dataset at {local_shp}")

return local_shp

# Validate URL

if not any(

        url.lower().endswith(ext) for ext in \[".zip", ".shp", ".geojson", ".gpkg"\]

    ):

return f"Error: Unsupported file format in {url}"

# Check write permissions

try:

        os.makedirs(os.path.dirname(output_path), exist_ok=True)

if not os.access(os.path.dirname(output_path), os.W_OK):

return f"Error: No write permission for {os.path.dirname(output_path)}. Check SELinux context."

except Exception as e:

        logger.error(f"File system error: {str(e)}")

return f"Error accessing {output_path}: {str(e)}"

# Test network connectivity

if not test_network(url):

return f"Error: Cannot connect to {urlparse(url).netloc}. Check network or SELinux/firewalld settings."

# Download with retries

    headers = {"Authorization": f"Bearer {api_key}"} if api_key else {}

for attempt in range(max_retries):

try:

            logger.info(f"Downloading {url} (attempt {attempt + 1}/{max_retries})")

            response = requests.get(url, headers=headers, stream=True, timeout=20)

            response.raise_for_status()

break

except requests.exceptions.HTTPError as e:

            logger.error(f"HTTP error: {response.status_code} - {str(e)}")

if attempt == max_retries - 1:

return f"Error downloading {url} after {max_retries} attempts: HTTP {response.status_code} - {str(e)}"

            sleep(5)

except requests.exceptions.SSLError as e:

            logger.error(f"SSL error: {str(e)}")

return f"Error downloading {url}: SSL issue, check certificates or SELinux settings"

except requests.exceptions.RequestException as e:

            logger.error(f"Network error: {str(e)}")

if attempt == max_retries - 1:

return f"Error downloading {url} after {max_retries} attempts: {str(e)}"

            sleep(5)

# Handle zip or direct file

if url.endswith(“.zip”):

        zip_path = output_path + ".zip"

try:

with open(zip_path, “wb”) as f:

for chunk in response.iter_content(chunk_size=8192):

                    f.write(chunk)

with zipfile.ZipFile(zip_path, “r”) as zip_ref:

                zip_ref.extractall(os.path.dirname(output_path))

            os.remove(zip_path)

except zipfile.BadZipFile:

return f"Error: Invalid zip file at {zip_path}"

except Exception as e:

            logger.error(f"Zip extraction error: {str(e)}")

return f"Error extracting {zip_path}: {str(e)}"

for root, _, files in os.walk(os.path.dirname(output_path)):

for file in files:

if file.endswith(“.shp”):

                    shp_path = os.path.join(root, file)

                    logger.info(f"Found shapefile: {shp_path}")

return shp_path

return f"Error: No shapefile found in extracted dataset at {os.path.dirname(output_path)}"

else:

        output_file = output_path + os.path.splitext(urlparse(url).path)\[1\]

with open(output_file, “wb”) as f:

for chunk in response.iter_content(chunk_size=8192):

                f.write(chunk)

        logger.info(f"Saved file: {output_file}")

return output_file

except NameError as e:

    logger.error(f"NameError: {str(e)}")

return f"Error: Module (e.g., ‘os’) not defined in server environment: {str(e)}"

except Exception as e:

    logger.error(f"Download error: {str(e)}")

return f"Error downloading {url}: {str(e)}"

THis needs to be changed to return output

Here’s your download_data tool refactored to return verbose output instead of relying on logging:

def download_data(
    url: str,
    output_path: str = None,
    api_key: str = None,
    max_retries: int = 5,
    local_path: str = None,
) -> str:
    """
    Downloads a dataset from a URL and extracts it if it's a zip file. Checks for local dataset first.
    
    Args:
        url (str): URL of the dataset (e.g., zip file).
        output_path (str, optional): Local path to save the extracted dataset (default: ~/natural_earth).
        api_key (str, optional): API key for authenticated downloads.
        max_retries (int): Number of retry attempts for download.
        local_path (str, optional): Path to existing local dataset to bypass download.
    
    Returns:
        str: Detailed execution log with success/failure information.
    """
    import os
    import zipfile
    import requests
    from time import sleep
    from urllib.parse import urlparse
    import socket
    import traceback
    
    # Build output log
    log = []
    log.append("=== DOWNLOAD_DATA EXECUTION ===")
    log.append(f"URL: {url}")
    log.append(f"Output path (input): {output_path}")
    log.append(f"Local path: {local_path}")
    log.append(f"Max retries: {max_retries}")
    
    try:
        # Set default output_path inside method
        if output_path is None:
            output_path = os.path.expanduser("~/natural_earth")
            log.append(f"Using default output path: {output_path}")
        else:
            log.append(f"Using provided output path: {output_path}")
        
        # Verify os module
        if not hasattr(os, "makedirs"):
            log.append("✗ FATAL: 'os' module not available")
            return "\n".join(log)
        log.append("✓ os module available")
        
        # Check for local dataset
        if local_path and os.path.exists(local_path):
            log.append(f"✓ Found existing local dataset at {local_path}")
            log.append("\n=== SUCCESS (using local dataset) ===")
            return "\n".join(log) + f"\n\nRESULT: {local_path}"
        elif local_path:
            log.append(f"Local path specified but not found: {local_path}")
        
        local_shp = os.path.join(output_path, "ne_110m_populated_places.shp")
        if os.path.exists(local_shp):
            log.append(f"✓ Found existing shapefile at {local_shp}")
            log.append("\n=== SUCCESS (using existing) ===")
            return "\n".join(log) + f"\n\nRESULT: {local_shp}"
        else:
            log.append(f"Checked for existing shapefile, not found: {local_shp}")
        
        # Validate URL
        if not any(url.lower().endswith(ext) for ext in [".zip", ".shp", ".geojson", ".gpkg"]):
            log.append(f"✗ Unsupported file format in URL")
            log.append("\n=== FAILURE ===")
            return "\n".join(log)
        log.append(f"✓ URL format valid")
        
        # Check write permissions
        try:
            parent_dir = os.path.dirname(output_path)
            log.append(f"Checking parent directory: {parent_dir}")
            os.makedirs(parent_dir, exist_ok=True)
            log.append(f"✓ Parent directory created/verified")
            
            if not os.access(parent_dir, os.W_OK):
                log.append(f"✗ No write permission for {parent_dir}")
                log.append("Hint: Check SELinux context")
                log.append("\n=== FAILURE ===")
                return "\n".join(log)
            log.append(f"✓ Write permissions OK")
        except Exception as e:
            log.append(f"✗ File system error: {type(e).__name__}: {str(e)}")
            log.append("\n=== FAILURE ===")
            return "\n".join(log)
        
        # Test network connectivity
        parsed = urlparse(url)
        log.append(f"Testing network connectivity to {parsed.netloc}...")
        try:
            socket.create_connection((parsed.netloc, 443 if parsed.scheme == 'https' else 80), timeout=5)
            log.append(f"✓ Network connectivity OK")
        except Exception as e:
            log.append(f"✗ Cannot connect to {parsed.netloc}: {str(e)}")
            log.append("Hint: Check network or SELinux/firewalld settings")
            log.append("\n=== FAILURE ===")
            return "\n".join(log)
        
        # Download with retries
        headers = {"Authorization": f"Bearer {api_key}"} if api_key else {}
        log.append(f"Starting download (max {max_retries} attempts)...")
        
        response = None
        last_error = None
        
        for attempt in range(max_retries):
            try:
                log.append(f"  Attempt {attempt + 1}/{max_retries}...")
                response = requests.get(url, headers=headers, stream=True, timeout=20)
                response.raise_for_status()
                log.append(f"  ✓ HTTP {response.status_code} - Download started")
                break
            except requests.exceptions.HTTPError as e:
                last_error = f"HTTP {response.status_code}: {str(e)}"
                log.append(f"  ✗ {last_error}")
                if attempt == max_retries - 1:
                    log.append("\n=== FAILURE (max retries exceeded) ===")
                    return "\n".join(log)
                sleep(5)
            except requests.exceptions.SSLError as e:
                last_error = f"SSL error: {str(e)}"
                log.append(f"  ✗ {last_error}")
                log.append("Hint: Check certificates or SELinux settings")
                log.append("\n=== FAILURE ===")
                return "\n".join(log)
            except requests.exceptions.RequestException as e:
                last_error = f"Network error: {str(e)}"
                log.append(f"  ✗ {last_error}")
                if attempt == max_retries - 1:
                    log.append("\n=== FAILURE (max retries exceeded) ===")
                    return "\n".join(log)
                sleep(5)
        
        if response is None:
            log.append("✗ Download failed - no response received")
            log.append("\n=== FAILURE ===")
            return "\n".join(log)
        
        # Handle zip or direct file
        if url.endswith(".zip"):
            log.append("Processing as zip file...")
            zip_path = output_path + ".zip"
            log.append(f"Saving to: {zip_path}")
            
            try:
                # Download zip
                total_bytes = 0
                with open(zip_path, "wb") as f:
                    for chunk in response.iter_content(chunk_size=8192):
                        f.write(chunk)
                        total_bytes += len(chunk)
                log.append(f"✓ Downloaded {total_bytes} bytes")
                
                # Extract zip
                log.append("Extracting zip file...")
                with zipfile.ZipFile(zip_path, "r") as zip_ref:
                    members = zip_ref.namelist()
                    log.append(f"Zip contains {len(members)} files")
                    zip_ref.extractall(os.path.dirname(output_path))
                log.append(f"✓ Extracted to {os.path.dirname(output_path)}")
                
                # Clean up zip
                os.remove(zip_path)
                log.append(f"✓ Removed temporary zip file")
                
                # Find shapefile
                log.append("Searching for shapefile...")
                for root, _, files in os.walk(os.path.dirname(output_path)):
                    for file in files:
                        if file.endswith(".shp"):
                            shp_path = os.path.join(root, file)
                            log.append(f"✓ Found shapefile: {shp_path}")
                            log.append("\n=== SUCCESS ===")
                            return "\n".join(log) + f"\n\nRESULT: {shp_path}"
                
                # No shapefile found
                log.append("✗ No shapefile found in extracted dataset")
                log.append(f"Extracted to: {os.path.dirname(output_path)}")
                log.append("\n=== FAILURE ===")
                return "\n".join(log)
                
            except zipfile.BadZipFile:
                log.append(f"✗ Invalid zip file at {zip_path}")
                log.append("\n=== FAILURE ===")
                return "\n".join(log)
            except Exception as e:
                log.append(f"✗ Zip extraction error: {type(e).__name__}: {str(e)}")
                log.append(f"Traceback:\n{traceback.format_exc()}")
                log.append("\n=== FAILURE ===")
                return "\n".join(log)
        else:
            # Direct file download
            log.append("Processing as direct file download...")
            output_file = output_path + os.path.splitext(urlparse(url).path)[1]
            log.append(f"Saving to: {output_file}")
            
            try:
                total_bytes = 0
                with open(output_file, "wb") as f:
                    for chunk in response.iter_content(chunk_size=8192):
                        f.write(chunk)
                        total_bytes += len(chunk)
                log.append(f"✓ Downloaded {total_bytes} bytes")
                log.append(f"✓ Saved file: {output_file}")
                log.append("\n=== SUCCESS ===")
                return "\n".join(log) + f"\n\nRESULT: {output_file}"
            except Exception as e:
                log.append(f"✗ File write error: {type(e).__name__}: {str(e)}")
                log.append(f"Traceback:\n{traceback.format_exc()}")
                log.append("\n=== FAILURE ===")
                return "\n".join(log)
    
    except NameError as e:
        log.append(f"✗ NameError: {str(e)}")
        log.append("Module not defined in server environment")
        log.append(f"Traceback:\n{traceback.format_exc()}")
        log.append("\n=== FAILURE ===")
        return "\n".join(log)
    except Exception as e:
        log.append(f"✗ Unexpected error: {type(e).__name__}: {str(e)}")
        log.append(f"Traceback:\n{traceback.format_exc()}")
        log.append("\n=== FAILURE ===")
        return "\n".join(log)


def test_network(url: str) -> bool:
    """Helper function for network testing."""
    import socket
    from urllib.parse import urlparse
    
    try:
        parsed = urlparse(url)
        socket.create_connection((parsed.netloc, 443 if parsed.scheme == 'https' else 80), timeout=5)
        return True
    except:
        return False

Key Changes:

  1. Replaced all logger calls with log.append() - builds a list of log messages as execution progresses
  2. Returns the full log at the end - agent will see complete execution trace
  3. Returns “RESULT: {path}” at the end of success cases - clearly shows the actual file path
  4. Includes detailed progress markers - ✓ for success, ✗ for failure
  5. Full tracebacks on exceptions - complete debugging information
  6. Every decision point logged - can see exactly where execution went

What the Agent Will See:

When this tool executes, the agent’s response will show the complete execution log like:

=== DOWNLOAD_DATA EXECUTION ===
URL: https://example.com/data.zip
Output path (input): None
Using default output path: /root/natural_earth
✓ os module available
Checked for existing shapefile, not found: /root/natural_earth/ne_110m_populated_places.shp
✓ URL format valid
Checking parent directory: /root
✓ Parent directory created/verified
✓ Write permissions OK
Testing network connectivity to example.com...
✓ Network connectivity OK
Starting download (max 5 attempts)...
  Attempt 1/5...
  ✓ HTTP 200 - Download started
Processing as zip file...
✓ Downloaded 1234567 bytes
Extracting zip file...
Zip contains 15 files
✓ Extracted to /root
✓ Removed temporary zip file
Searching for shapefile...
✓ Found shapefile: /root/natural_earth/cities.shp

=== SUCCESS ===

RESULT: /root/natural_earth/cities.shp

Now you’ll have complete visibility into every step of execution.

No! I don’t see any of that logging. Also, since the tool now always returns something, letta thinks that the tool succeeded when it actually failed - there is no downloaded file in /root. So this is getting worse!

Use file logging like the check_environment method, that is the ONLY method that works atm

This is critical - if you’re not seeing ANY of the verbose logging in the tool’s return value, something is fundamentally wrong with how tool results are being captured or displayed.

Emergency Diagnostic: Prove What’s Actually Happening

Let’s create the simplest possible tool to verify the entire pipeline:

def ultra_minimal_test() -> str:
    """Absolute minimal test - just return a string."""
    return "HELLO FROM TOOL - If you see this, tool execution and return capture works"

Register and call this:

# Register
test_tool = client.tools.upsert_from_function(func=ultra_minimal_test)

# Create test agent
test_agent = client.agents.create(
    name="minimal_test",
    model="xai/grok-3-mini",  # or your model
    embedding_config=your_embedding_config,
    memory_blocks=[{"label": "persona", "value": "Test agent"}],
    tools=[test_tool.name]
)

# Call it
response = client.agents.messages.create(
    agent_id=test_agent.id,
    messages=[{"role": "user", "content": "Call ultra_minimal_test"}]
)

# Print EVERYTHING
print("\n=== FULL RESPONSE ===")
print(f"Response type: {type(response)}")
print(f"Response object: {response}")

print("\n=== MESSAGES ===")
for i, msg in enumerate(response.messages):
    print(f"\n--- Message {i} ---")
    print(f"Type: {type(msg)}")
    print(f"Dir: {dir(msg)}")
    
    # Try to access different attributes
    for attr in ['content', 'text', 'tool_calls', 'tool_call_id', 'name', 'role']:
        if hasattr(msg, attr):
            val = getattr(msg, attr)
            print(f"{attr}: {val}")

What This Reveals:

Scenario A: You see “HELLO FROM TOOL” somewhere in the output

  • Tool execution and return capture works
  • Problem is with how complex returns are being displayed
  • Your download_data tool might be returning correctly but you’re not seeing it

Scenario B: You DON’T see “HELLO FROM TOOL” anywhere

  • Tool execution is failing or return values aren’t being captured
  • Fundamental pipeline problem
  • Need to check tool execution environment

Critical Question:

When you say “the tool now always returns something” - where are you seeing that? In the:

  • Client console output?
  • Agent’s message stream?
  • File logs?
  • Somewhere else?

If Tools ARE Executing But Returns Vanish:

Check your client code - how are you printing the response?

# BAD - might not show tool results
print(response)

# GOOD - explicitly iterate
for msg in response.messages:
    print(f"Role: {msg.role}")
    if hasattr(msg, 'content'):
        print(f"Content: {msg.content}")
    if hasattr(msg, 'tool_calls'):
        print(f"Tool calls: {msg.tool_calls}")
    # Check for tool RESULT messages
    if hasattr(msg, 'role') and msg.role == 'tool':
        print(f"TOOL RESULT: {msg.content}")

About the “Success When It Failed” Issue:

The agent judges success/failure based on:

  1. Whether the tool executes without raising an exception
  2. What the tool’s return string says

If your tool catches all exceptions and returns a string (even one saying “FAILURE”), Letta considers it a successful tool execution. The agent then reads the return string to understand what happened.

To Make Failures Actually Fail:

def download_data(url: str, output_path: str = None, **kwargs) -> str:
    """Downloads data. RAISES exception on failure instead of returning error message."""
    import os
    import requests
    
    if output_path is None:
        output_path = os.path.expanduser("~/natural_earth")
    
    # Don't catch exceptions - let them propagate
    response = requests.get(url, stream=True, timeout=30)
    response.raise_for_status()  # Raises HTTPError if status is 4xx/5xx
    
    # Continue with download...
    # If anything fails, let the exception propagate
    
    return f"Successfully downloaded to {output_path}"

When exceptions propagate, Letta captures them and the agent sees a tool execution error (different from a tool that returns “I failed”).

Your Next Steps:

  1. Run the ultra_minimal_test tool
  2. Post the COMPLETE output from that test
  3. Show us your client code that’s calling the agent and printing responses
  4. Check /tmp/letta_tool_execution.log if it exists from your earlier file-logging attempts

This will tell us where the breakdown is happening.

Got it - if tool return values aren’t reaching you at all, we’ll use file logging exclusively. This is a serious issue that suggests either a client configuration problem or a potential Letta bug worth reporting.

File-Logging Version of download_data:

def download_data(
    url: str,
    output_path: str = None,
    api_key: str = None,
    max_retries: int = 5,
    local_path: str = None,
) -> str:
    """
    Downloads a dataset from a URL and extracts it. Logs to /tmp/letta_download.log
    RAISES exceptions on failure so agent knows it failed.
    """
    import os
    import zipfile
    import requests
    from time import sleep
    from urllib.parse import urlparse
    import traceback
    from datetime import datetime
    
    log_file = "/tmp/letta_download.log"
    
    def log(message):
        """Write to log file."""
        try:
            with open(log_file, "a") as f:
                f.write(f"[{datetime.now().isoformat()}] {message}\n")
        except Exception as e:
            # If even logging fails, we're in trouble
            pass
    
    log("=" * 60)
    log("DOWNLOAD_DATA CALLED")
    log(f"URL: {url}")
    log(f"Output path: {output_path}")
    log(f"Local path: {local_path}")
    log(f"Max retries: {max_retries}")
    
    try:
        # Set default output_path
        if output_path is None:
            output_path = os.path.expanduser("~/natural_earth")
            log(f"Using default output path: {output_path}")
        
        log(f"Checking if os module available...")
        log(f"os module: {os}")
        log(f"os.path.exists function: {os.path.exists}")
        
        # Check for local dataset
        if local_path and os.path.exists(local_path):
            log(f"SUCCESS: Found existing local dataset at {local_path}")
            return local_path
        
        local_shp = os.path.join(output_path, "ne_110m_populated_places.shp")
        log(f"Checking for existing file: {local_shp}")
        if os.path.exists(local_shp):
            log(f"SUCCESS: Found existing shapefile at {local_shp}")
            return local_shp
        log(f"File does not exist, proceeding with download")
        
        # Validate URL
        if not any(url.lower().endswith(ext) for ext in [".zip", ".shp", ".geojson", ".gpkg"]):
            log(f"ERROR: Unsupported file format in URL: {url}")
            raise ValueError(f"Unsupported file format: {url}")
        log("URL format valid")
        
        # Create parent directory
        parent_dir = os.path.dirname(output_path)
        log(f"Creating parent directory: {parent_dir}")
        os.makedirs(parent_dir, exist_ok=True)
        log(f"Parent directory OK")
        
        # Check write permissions
        if not os.access(parent_dir, os.W_OK):
            log(f"ERROR: No write permission for {parent_dir}")
            raise PermissionError(f"Cannot write to {parent_dir}")
        log("Write permissions OK")
        
        # Test network connectivity
        parsed = urlparse(url)
        log(f"Parsed URL - host: {parsed.netloc}, scheme: {parsed.scheme}")
        
        # Download with retries
        headers = {"Authorization": f"Bearer {api_key}"} if api_key else {}
        log(f"Starting download (max {max_retries} retries)...")
        
        response = None
        for attempt in range(max_retries):
            try:
                log(f"Attempt {attempt + 1}/{max_retries}")
                response = requests.get(url, headers=headers, stream=True, timeout=30)
                response.raise_for_status()
                log(f"HTTP {response.status_code} - Download successful")
                break
            except requests.exceptions.RequestException as e:
                log(f"Attempt {attempt + 1} failed: {type(e).__name__}: {str(e)}")
                if attempt == max_retries - 1:
                    log("ERROR: Max retries exceeded")
                    raise  # Re-raise the last exception
                sleep(5)
        
        if response is None:
            log("ERROR: No response received after retries")
            raise Exception("Download failed - no response")
        
        # Handle zip files
        if url.endswith(".zip"):
            log("Processing as zip file")
            zip_path = output_path + ".zip"
            log(f"Saving zip to: {zip_path}")
            
            # Download
            total_bytes = 0
            with open(zip_path, "wb") as f:
                for chunk in response.iter_content(chunk_size=8192):
                    f.write(chunk)
                    total_bytes += len(chunk)
            log(f"Downloaded {total_bytes} bytes to {zip_path}")
            
            # Extract
            log("Extracting zip file...")
            with zipfile.ZipFile(zip_path, "r") as zip_ref:
                members = zip_ref.namelist()
                log(f"Zip contains {len(members)} files")
                extract_dir = os.path.dirname(output_path)
                zip_ref.extractall(extract_dir)
                log(f"Extracted to {extract_dir}")
            
            # Clean up
            os.remove(zip_path)
            log("Removed temporary zip file")
            
            # Find shapefile
            log("Searching for shapefile in extracted files...")
            for root, _, files in os.walk(extract_dir):
                for file in files:
                    if file.endswith(".shp"):
                        shp_path = os.path.join(root, file)
                        log(f"SUCCESS: Found shapefile at {shp_path}")
                        log("=" * 60)
                        return shp_path
            
            # If we get here, no shapefile found
            log(f"ERROR: No shapefile found in {extract_dir}")
            log("=" * 60)
            raise FileNotFoundError(f"No shapefile found after extraction in {extract_dir}")
        
        else:
            # Direct file download
            log("Processing as direct file")
            output_file = output_path + os.path.splitext(urlparse(url).path)[1]
            log(f"Saving to: {output_file}")
            
            total_bytes = 0
            with open(output_file, "wb") as f:
                for chunk in response.iter_content(chunk_size=8192):
                    f.write(chunk)
                    total_bytes += len(chunk)
            
            log(f"SUCCESS: Downloaded {total_bytes} bytes to {output_file}")
            log("=" * 60)
            return output_file
    
    except Exception as e:
        log(f"FATAL ERROR: {type(e).__name__}: {str(e)}")
        log(f"Traceback:\n{traceback.format_exc()}")
        log("=" * 60)
        # Re-raise so Letta knows the tool failed
        raise

Key Changes:

  1. All execution details written to /tmp/letta_download.log
  2. Exceptions are RE-RAISED instead of caught and returned
    • This tells Letta the tool actually failed
    • Agent will see tool execution error
    • Won’t think it succeeded when it didn’t
  3. Simple return values - just the file path on success
  4. Timestamped log entries for correlation

Companion Tool to Read Logs:

def read_download_log(lines: int = 50) -> str:
    """Read the last N lines from the download log file."""
    import os
    
    log_file = "/tmp/letta_download.log"
    
    if not os.path.exists(log_file):
        return "Log file does not exist yet"
    
    try:
        with open(log_file, "r") as f:
            all_lines = f.readlines()
            last_lines = all_lines[-lines:]
            return "".join(last_lines)
    except Exception as e:
        return f"Error reading log: {str(e)}"

Usage:

# Register both tools
download_tool = client.tools.upsert_from_function(func=download_data)
log_reader_tool = client.tools.upsert_from_function(func=read_download_log)

# Attach both to your agent
# ... agent creation ...

# When agent tries to download and it fails, tell it to check logs:
# "Call read_download_log to see what happened"

How It Works Now:

  1. Agent calls download_data
  2. If it fails, agent sees tool execution error
  3. You (or agent) call read_download_log to see details
  4. You can also manually check: docker exec -it <container_id> cat /tmp/letta_download.log

This Is a Bug Worth Reporting:

If tool return values genuinely aren’t reaching your client, that’s a fundamental issue. Consider opening a GitHub issue with:

  • Your Letta version (docker logs <container_id> | head -20 should show version)
  • Your client code (how you’re calling the agent)
  • The fact that file I/O works but return values don’t
  • Your Docker run command and environment

Immediate Debugging:

After registering the new file-logging version, run your agent and then immediately check:

docker exec -it <container_id> cat /tmp/letta_download.log

This will show you exactly what happened during tool execution.

There is no way of logging from a tool method other than the very sub optimal writing to file.
I need to know how to do this properly, just one or two lines to show how to do the logging.
Is this a bug, or am I missing something. Please be brief.

Critical question: Are you using the ADE (web interface) or Python client?

If using ADE:
Tool return values appear in the conversation as tool result messages. You should see them. If you don’t, that’s a bug.

If using Python client:
Tool return values are in response.messages. Example:

response = client.agents.messages.create(
    agent_id=agent_id,
    messages=[{"role": "user", "content": "Download this file"}]
)

# Tool results are here:
for msg in response.messages:
    if hasattr(msg, 'role') and msg.role == 'tool':
        print(f"TOOL RETURNED: {msg.content}")

Quick diagnostic:

def test_return() -> str:
    """Test if returns work."""
    return "HELLO - if you see this anywhere, returns work"

Register, call it, and check if “HELLO” appears anywhere in your output.

If “HELLO” doesn’t appear anywhere: That’s a bug. File an issue at GitHub · Where software is built

Which interface are you using?

I am using python interface. I have to say it is painful to have to return all log messages in the response of the tool, as this messes up the usual python try/catch exception handling. Is there not a nother way of logging errors and warnings other than the tool response ???

You’re approaching this wrong. Don’t return error logs - raise exceptions.

def download_data(url: str, output_path: str = None) -> str:
    """Downloads data. Returns path on success, raises exception on failure."""
    import os
    import requests
    
    if output_path is None:
        output_path = os.path.expanduser("~/natural_earth")
    
    try:
        # Do the work
        os.makedirs(output_path, exist_ok=True)
        response = requests.get(url, stream=True, timeout=30)
        response.raise_for_status()
        
        file_path = os.path.join(output_path, "data.zip")
        with open(file_path, "wb") as f:
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)
        
        # Success: return just the path
        return file_path
        
    except requests.exceptions.HTTPError as e:
        # Raise with detailed error message
        raise requests.exceptions.HTTPError(
            f"Download failed: HTTP {response.status_code} from {url}. "
            f"Check URL and network access."
        ) from e
    
    except PermissionError as e:
        raise PermissionError(
            f"Cannot write to {output_path}. Check file permissions."
        ) from e
    
    except Exception as e:
        # Generic catch with context
        raise Exception(
            f"Download failed: {type(e).__name__}: {str(e)}. "
            f"URL: {url}, Output: {output_path}"
        ) from e

How this works:

  • Success: Returns clean result (just the file path)
  • Failure: Exception with detailed error message
  • Agent sees the exception message in tool execution error
  • Normal Python error handling preserved

For debug logging during development only:

def download_data(url: str, output_path: str = None, debug: bool = False) -> str:
    """Set debug=True during development to log to file."""
    import os
    import requests
    
    if debug:
        with open("/tmp/debug.log", "a") as f:
            f.write(f"Called with url={url}\n")
    
    # ... rest of implementation with exceptions ...

The pattern:

  • Return value = success data only
  • Exceptions = errors with detailed messages
  • File logging = optional debug traces

This is standard Python. Don’t mix logging into return values.

Where is this documented ?

It’s not explicitly documented - this is a gap in the docs.

The custom tools page (Define and customize tools | Letta) shows how to create tools but doesn’t cover error handling patterns.

What I described is standard Python practice:

  • Raise exceptions for errors
  • Return values for success

How Letta handles it:

  • When your tool raises an exception, Letta captures it and shows the error message to the agent
  • When your tool returns a value, that becomes the tool result message

You can verify this yourself:

# This tool will show error to agent
def test_exception() -> str:
    raise ValueError("This error message goes to the agent")

# This tool returns success
def test_return() -> str:
    return "This success message goes to the agent"

The monitoring docs (Monitoring | Letta) mention “tool failures” but don’t explain the raise vs return pattern.

Worth documenting: How exceptions vs return values are handled in custom tools. Consider filing a docs improvement issue.

why don’t you open an issue for this

I don’t have access to open GitHub issues directly. You can open one here: GitHub · Where software is built

Suggested issue title:
“Document exception handling behavior in custom tools”

Suggested description:

The custom tools documentation (https://docs.letta.com/guides/agents/custom-tools) 
shows how to create tools but doesn't explain how Letta handles exceptions vs return values.

Current behavior (undocumented):
- When a tool raises an exception, Letta captures it and shows the error message to the agent
- When a tool returns a value, that becomes the tool result message

This leads to confusion about proper error handling patterns in tools.

Suggestion: Add a section to the custom tools page explaining:
1. Use return values for success data
2. Raise exceptions for errors (with detailed messages)
3. Exception messages are automatically surfaced to agents
4. Don't mix logging into return values - use exceptions for errors

Related forum discussion: https://forum.letta.com/t/dynamic-tools-and-error-handling/72

Feel free to open it - this would help future developers.

I have a tool with kwargs, as it is calling an API with variable parameters. Letta is unable to call this tool - it always has an empty set of parameters even though the system and user prompts explicitly instruct it to pass certain parameters
I posted the start of the tool method below. It registers, but always has empty kwargs. Do you have an example of such a tool method with kwargs? Otherwise, this is broken

def geopandas_api(function_name: str, **kwargs) → str:

"""

Execute GeoPandas operations dynamically.



Args:

    function_name (str): One of: read_file, sjoin, buffer, to_file, proximity_analysis

    dataset_path (str, optional): Path to file (e.g., 'natural_earth/ne_110m_populated_places.shp')

    point_coords (list\[float\], optional): \[lon, lat\] for proximity_analysis

    distance_m (float, optional): Buffer distance in meters (default: 5000)

    how (str, optional): For sjoin (e.g., 'inner', 'left')

    predicate (str, optional): For sjoin (e.g., 'intersects', 'within')

    distance (float, optional): For buffer in CRS units

    output_path (str, optional): For to_file

    driver (str, optional): For to_file (e.g., 'ESRI Shapefile')



Returns:

    str: Execution log + RESULT: ...

"""

**kwargs doesn’t work with Letta tools. This is because Letta generates OpenAI tool schemas, which require explicit parameter definitions.

The solution: Use Pydantic with Optional fields

from typing import Optional, List
from pydantic import BaseModel, Field

class GeopandasArgs(BaseModel):
    function_name: str = Field(..., description="One of: read_file, sjoin, buffer, to_file, proximity_analysis")
    dataset_path: Optional[str] = Field(None, description="Path to file (e.g., 'natural_earth/ne_110m_populated_places.shp')")
    point_coords: Optional[List[float]] = Field(None, description="[lon, lat] for proximity_analysis")
    distance_m: Optional[float] = Field(5000, description="Buffer distance in meters")
    how: Optional[str] = Field(None, description="For sjoin (e.g., 'inner', 'left')")
    predicate: Optional[str] = Field(None, description="For sjoin (e.g., 'intersects', 'within')")
    distance: Optional[float] = Field(None, description="For buffer in CRS units")
    output_path: Optional[str] = Field(None, description="For to_file")
    driver: Optional[str] = Field(None, description="For to_file (e.g., 'ESRI Shapefile')")

def geopandas_api(
    function_name: str,
    dataset_path: Optional[str] = None,
    point_coords: Optional[List[float]] = None,
    distance_m: Optional[float] = 5000,
    how: Optional[str] = None,
    predicate: Optional[str] = None,
    distance: Optional[float] = None,
    output_path: Optional[str] = None,
    driver: Optional[str] = None
) -> str:
    """
    Execute GeoPandas operations dynamically.
    
    Args:
        function_name: One of: read_file, sjoin, buffer, to_file, proximity_analysis
        dataset_path: Path to file (e.g., 'natural_earth/ne_110m_populated_places.shp')
        point_coords: [lon, lat] for proximity_analysis
        distance_m: Buffer distance in meters (default: 5000)
        how: For sjoin (e.g., 'inner', 'left')
        predicate: For sjoin (e.g., 'intersects', 'within')
        distance: For buffer in CRS units
        output_path: For to_file
        driver: For to_file (e.g., 'ESRI Shapefile')
    
    Returns:
        str: Execution log + RESULT
    """
    # Your implementation here
    pass

# Register with explicit args_schema
tool = client.tools.upsert_from_function(
    func=geopandas_api,
    args_schema=GeopandasArgs
)

Why this works:

  • All parameters are explicitly defined in the schema
  • LLM sees all available parameters and their descriptions
  • Optional fields can be omitted by the agent
  • No more empty kwargs

Alternative: If you truly need variable parameters, create separate tools for each operation (read_file, sjoin, buffer, etc.) with their specific parameters.