Skip to main content
A complete Python implementation of the Warp Grep agent loop. This guide covers parsing, tool execution, and result formatting.
For the raw API protocol and message formats, see Direct API Access.

Overview

The agent loop:
  1. Send query + repo structure to the API
  2. Parse XML tool calls from the response
  3. Execute tools locally (ripgrep, file reads, tree)
  4. Format results and send back
  5. Repeat until finish is called (max 4 turns)

Installation

pip install requests
You’ll also need ripgrep installed:
# macOS
brew install ripgrep

# Ubuntu/Debian
apt-get install ripgrep

# Windows
choco install ripgrep

Complete Implementation

API Client

import os
import requests

MORPH_API_KEY = os.environ.get("MORPH_API_KEY")
API_URL = "https://api.morphllm.com/v1/chat/completions"

def call_api(messages: list[dict]) -> str:
    """Call the Warp Grep API and return the response content."""
    response = requests.post(
        API_URL,
        headers={
            "Authorization": f"Bearer {MORPH_API_KEY}",
            "Content-Type": "application/json",
        },
        json={
            "model": "morph-warp-grep",
            "messages": messages,
            "temperature": 0.0,
            "max_tokens": 2048,
        },
    )
    response.raise_for_status()
    return response.json()["choices"][0]["message"]["content"]

XML Parser

import re
from dataclasses import dataclass

@dataclass
class ToolCall:
    name: str
    args: dict[str, str]

def parse_tool_calls(response: str) -> list[ToolCall]:
    """Parse XML tool calls from model response."""
    # Remove <think> blocks
    response = re.sub(r"<think>.*?</think>", "", response, flags=re.DOTALL)
    
    tool_calls = []
    
    # Parse each tool type
    for tool_name in ["grep", "read", "list_directory", "finish"]:
        pattern = rf"<{tool_name}>(.*?)</{tool_name}>"
        for match in re.finditer(pattern, response, re.DOTALL):
            content = match.group(1)
            args = parse_xml_elements(content)
            tool_calls.append(ToolCall(name=tool_name, args=args))
    
    return tool_calls

def parse_xml_elements(content: str) -> dict[str, str]:
    """Parse nested XML elements into a dictionary."""
    args = {}
    # Match <element>value</element>
    pattern = r"<(\w+)>(.*?)</\1>"
    for match in re.finditer(pattern, content, re.DOTALL):
        key = match.group(1)
        value = match.group(2).strip()
        
        # Handle nested <file> elements in finish
        if key == "file":
            if "files" not in args:
                args["files"] = []
            file_args = parse_xml_elements(value)
            args["files"].append(file_args)
        else:
            args[key] = value
    
    return args

Tool Executors

import subprocess
from pathlib import Path

MAX_GREP_LINES = 200
MAX_LIST_LINES = 200
MAX_READ_LINES = 800

def execute_grep(repo_root: str, pattern: str, sub_dir: str = ".", glob: str = None) -> str:
    """Execute ripgrep and return formatted output."""
    path = Path(repo_root) / sub_dir
    
    cmd = [
        "rg",
        "--line-number",
        "--no-heading",
        "--color", "never",
        "-C", "1",  # 1 line of context
    ]
    
    if glob:
        cmd.extend(["--glob", glob])
    
    cmd.extend([pattern, str(path)])
    
    try:
        result = subprocess.run(
            cmd,
            capture_output=True,
            text=True,
            timeout=10,
            cwd=repo_root,
        )
        output = result.stdout
    except subprocess.TimeoutExpired:
        return "Error: search timed out"
    except Exception as e:
        return f"Error: {e}"
    
    lines = output.strip().split("\n") if output.strip() else []
    
    if len(lines) > MAX_GREP_LINES:
        return "query not specific enough, tool called tried to return too much context and failed"
    
    return output.strip() if output.strip() else "no matches"


def execute_read(repo_root: str, path: str, lines: str = None) -> str:
    """Read file contents with optional line range."""
    file_path = Path(repo_root) / path
    
    if not file_path.exists():
        return f"Error: file not found: {path}"
    
    try:
        with open(file_path, "r") as f:
            all_lines = f.readlines()
    except Exception as e:
        return f"Error: {e}"
    
    if lines:
        # Parse line ranges like "1-50" or "1-20,45-80"
        selected = []
        for range_part in lines.split(","):
            if "-" in range_part:
                start, end = map(int, range_part.split("-"))
            else:
                start = end = int(range_part)
            # Convert to 0-indexed
            selected.extend(range(start - 1, min(end, len(all_lines))))
        
        output_lines = []
        prev_idx = -2
        for idx in sorted(set(selected)):
            if idx < 0 or idx >= len(all_lines):
                continue
            if prev_idx >= 0 and idx > prev_idx + 1:
                output_lines.append("...")
            output_lines.append(f"{idx + 1}|{all_lines[idx].rstrip()}")
            prev_idx = idx
    else:
        output_lines = [f"{i + 1}|{line.rstrip()}" for i, line in enumerate(all_lines)]
    
    if len(output_lines) > MAX_READ_LINES:
        output_lines = output_lines[:MAX_READ_LINES]
        output_lines.append(f"... truncated ({len(all_lines)} total lines)")
    
    return "\n".join(output_lines)


def execute_list_directory(repo_root: str, path: str, pattern: str = None) -> str:
    """List directory structure using tree."""
    dir_path = Path(repo_root) / path
    
    if not dir_path.exists():
        return f"Error: directory not found: {path}"
    
    cmd = [
        "tree",
        "-L", "3",
        "-i",
        "-F",
        "--noreport",
        "-I", "__pycache__|node_modules|.git|*.pyc|.DS_Store|.venv|venv|dist|build",
        str(dir_path),
    ]
    
    try:
        result = subprocess.run(
            cmd,
            capture_output=True,
            text=True,
            timeout=5,
            cwd=repo_root,
        )
        output = result.stdout
    except FileNotFoundError:
        # Fallback if tree not installed
        return fallback_list_dir(dir_path, pattern)
    except Exception as e:
        return f"Error: {e}"
    
    lines = output.strip().split("\n") if output.strip() else []
    
    # Apply regex filter if provided
    if pattern:
        import re as regex
        try:
            compiled = regex.compile(pattern)
            lines = [l for l in lines if compiled.search(l)]
        except:
            pass
    
    if len(lines) > MAX_LIST_LINES:
        return "query not specific enough, tool called tried to return too much context and failed"
    
    return "\n".join(lines)


def fallback_list_dir(dir_path: Path, pattern: str = None, max_depth: int = 3) -> str:
    """Fallback directory listing without tree command."""
    import re as regex
    
    lines = []
    compiled = regex.compile(pattern) if pattern else None
    
    def walk(p: Path, depth: int = 0):
        if depth > max_depth:
            return
        try:
            for item in sorted(p.iterdir()):
                if item.name.startswith("."):
                    continue
                if item.name in {"node_modules", "__pycache__", "venv", ".venv", "dist", "build"}:
                    continue
                
                rel = item.relative_to(dir_path.parent)
                indent = "  " * depth
                suffix = "/" if item.is_dir() else ""
                line = f"{indent}{item.name}{suffix}"
                
                if compiled is None or compiled.search(line):
                    lines.append(line)
                
                if item.is_dir():
                    walk(item, depth + 1)
        except PermissionError:
            pass
    
    walk(dir_path)
    return "\n".join(lines[:MAX_LIST_LINES])

Result Formatter

def format_result(tool_call: ToolCall, output: str) -> str:
    """Format tool result with XML wrapper."""
    if tool_call.name == "grep":
        attrs = f'pattern="{tool_call.args.get("pattern", "")}"'
        if "sub_dir" in tool_call.args:
            attrs += f' sub_dir="{tool_call.args["sub_dir"]}"'
        if "glob" in tool_call.args:
            attrs += f' glob="{tool_call.args["glob"]}"'
        return f"<grep {attrs}>\n{output}\n</grep>"
    
    elif tool_call.name == "read":
        attrs = f'path="{tool_call.args.get("path", "")}"'
        if "lines" in tool_call.args:
            attrs += f' lines="{tool_call.args["lines"]}"'
        return f"<read {attrs}>\n{output}\n</read>"
    
    elif tool_call.name == "list_directory":
        attrs = f'path="{tool_call.args.get("path", "")}"'
        return f"<list_directory {attrs}>\n{output}\n</list_directory>"
    
    return output


def format_turn_message(turn: int, chars_used: int = 0, max_chars: int = 160000) -> str:
    """Format the turn counter message."""
    remaining = 4 - turn
    
    if turn >= 3:
        msg = "You have used 3 turns, you only have 1 turn remaining. You have run out of turns to explore the code base and MUST call the finish tool now"
    else:
        msg = f"You have used {turn} turn{'s' if turn != 1 else ''} and have {remaining} remaining"
    
    pct = int((chars_used / max_chars) * 100) if max_chars > 0 else 0
    budget = f"<context_budget>{pct}% ({chars_used // 1000}K/{max_chars // 1000}K chars)</context_budget>"
    
    return f"\n{msg}\n{budget}"

Agent Loop

def get_repo_structure(repo_root: str) -> str:
    """Get initial repo structure for the first message."""
    output = execute_list_directory(repo_root, ".", None)
    return f"<repo_structure>\n{output}\n</repo_structure>"


def search_codebase(query: str, repo_root: str) -> list[dict]:
    """
    Run the Warp Grep agent loop.
    
    Returns a list of {path, content} dicts with the relevant code.
    """
    # System prompt (abbreviated - see full version in docs)
    system_prompt = """You are a code search agent. Find all relevant code for a given query.

You have 4 turns max. Each turn allows up to 8 parallel tool calls.
The 4th turn MUST be a `finish` call.

Tools:
- <grep><pattern>...</pattern><sub_dir>...</sub_dir><glob>...</glob></grep>
- <read><path>...</path><lines>...</lines></read>
- <list_directory><path>...</path><pattern>...</pattern></list_directory>
- <finish><file><path>...</path><lines>...</lines></file>...</finish>

Always start with <think>...</think> then tool calls.
"""
    
    messages = [
        {"role": "system", "content": system_prompt},
        {"role": "user", "content": f"{get_repo_structure(repo_root)}\n\n<search_string>\n{query}\n</search_string>"},
    ]
    
    max_turns = 4
    chars_used = sum(len(m["content"]) for m in messages)
    
    for turn in range(max_turns):
        # Call API
        response = call_api(messages)
        messages.append({"role": "assistant", "content": response})
        chars_used += len(response)
        
        # Parse tool calls
        tool_calls = parse_tool_calls(response)
        
        if not tool_calls:
            print(f"Turn {turn + 1}: No tool calls, terminating")
            break
        
        # Check for finish
        finish_call = next((tc for tc in tool_calls if tc.name == "finish"), None)
        if finish_call:
            return resolve_finish(repo_root, finish_call)
        
        # Execute tools
        results = []
        for tc in tool_calls:
            if tc.name == "grep":
                output = execute_grep(
                    repo_root,
                    tc.args.get("pattern", ""),
                    tc.args.get("sub_dir", "."),
                    tc.args.get("glob"),
                )
            elif tc.name == "read":
                output = execute_read(
                    repo_root,
                    tc.args.get("path", ""),
                    tc.args.get("lines"),
                )
            elif tc.name == "list_directory":
                output = execute_list_directory(
                    repo_root,
                    tc.args.get("path", "."),
                    tc.args.get("pattern"),
                )
            else:
                output = f"Unknown tool: {tc.name}"
            
            results.append(format_result(tc, output))
        
        # Send results back
        result_content = "\n\n".join(results) + format_turn_message(turn + 1, chars_used)
        messages.append({"role": "user", "content": result_content})
        chars_used += len(result_content)
        
        print(f"Turn {turn + 1}: Executed {len(tool_calls)} tools")
    
    return []


def resolve_finish(repo_root: str, finish_call: ToolCall) -> list[dict]:
    """Read file ranges from a finish call."""
    results = []
    
    files = finish_call.args.get("files", [])
    for file_spec in files:
        path = file_spec.get("path", "")
        lines = file_spec.get("lines")
        
        if lines == "*":
            lines = None  # Read entire file
        
        content = execute_read(repo_root, path, lines)
        results.append({"path": path, "content": content})
    
    return results

Usage

if __name__ == "__main__":
    results = search_codebase(
        query="Find where user authentication is implemented",
        repo_root="/path/to/your/repo",
    )
    
    for r in results:
        print(f"\n{'='*60}")
        print(f"File: {r['path']}")
        print('='*60)
        print(r['content'])

Single File Version

#!/usr/bin/env python3
"""
Warp Grep Agent - Complete Implementation
==========================================
A Python implementation of the Warp Grep code search agent.

Usage:
    export MORPH_API_KEY=your_key
    python warp_grep.py "Find authentication middleware" /path/to/repo
"""

import os
import re
import subprocess
import sys
from dataclasses import dataclass
from pathlib import Path

import requests

# Config
MORPH_API_KEY = os.environ.get("MORPH_API_KEY")
API_URL = "https://api.morphllm.com/v1/chat/completions"
MAX_TURNS = 4
MAX_GREP_LINES = 200
MAX_LIST_LINES = 200
MAX_READ_LINES = 800

SYSTEM_PROMPT = """You are a code search agent. Find all relevant code for a given query.

You have 4 turns max. The 4th turn MUST be a `finish` call.

Tools:
- <grep><pattern>REGEX</pattern><sub_dir>PATH</sub_dir><glob>*.py</glob></grep>
- <read><path>FILE</path><lines>1-50</lines></read>
- <list_directory><path>DIR</path><pattern>REGEX</pattern></list_directory>
- <finish><file><path>FILE</path><lines>1-50,80-100</lines></file></finish>

Always wrap reasoning in <think>...</think> then output tool calls.
"""


@dataclass
class ToolCall:
    name: str
    args: dict


def call_api(messages: list) -> str:
    resp = requests.post(
        API_URL,
        headers={"Authorization": f"Bearer {MORPH_API_KEY}", "Content-Type": "application/json"},
        json={"model": "morph-warp-grep", "messages": messages, "temperature": 0.0, "max_tokens": 2048},
    )
    resp.raise_for_status()
    return resp.json()["choices"][0]["message"]["content"]


def parse_xml_elements(content: str) -> dict:
    args = {}
    for match in re.finditer(r"<(\w+)>(.*?)</\1>", content, re.DOTALL):
        key, value = match.group(1), match.group(2).strip()
        if key == "file":
            args.setdefault("files", []).append(parse_xml_elements(value))
        else:
            args[key] = value
    return args


def parse_tool_calls(response: str) -> list:
    response = re.sub(r"<think>.*?</think>", "", response, flags=re.DOTALL)
    calls = []
    for name in ["grep", "read", "list_directory", "finish"]:
        for match in re.finditer(rf"<{name}>(.*?)</{name}>", response, re.DOTALL):
            calls.append(ToolCall(name=name, args=parse_xml_elements(match.group(1))))
    return calls


def execute_grep(repo: str, pattern: str, sub_dir: str = ".", glob: str = None) -> str:
    cmd = ["rg", "--line-number", "--no-heading", "--color", "never", "-C", "1"]
    if glob:
        cmd.extend(["--glob", glob])
    cmd.extend([pattern, str(Path(repo) / sub_dir)])
    try:
        r = subprocess.run(cmd, capture_output=True, text=True, timeout=10, cwd=repo)
        lines = r.stdout.strip().split("\n") if r.stdout.strip() else []
        if len(lines) > MAX_GREP_LINES:
            return "query not specific enough, tool called tried to return too much context and failed"
        return r.stdout.strip() or "no matches"
    except Exception as e:
        return f"Error: {e}"


def execute_read(repo: str, path: str, lines: str = None) -> str:
    fp = Path(repo) / path
    if not fp.exists():
        return f"Error: file not found: {path}"
    try:
        all_lines = fp.read_text().splitlines()
    except Exception as e:
        return f"Error: {e}"
    
    if lines and lines != "*":
        selected = []
        for part in lines.split(","):
            if "-" in part:
                s, e = map(int, part.split("-"))
            else:
                s = e = int(part)
            selected.extend(range(s - 1, min(e, len(all_lines))))
        out, prev = [], -2
        for i in sorted(set(selected)):
            if 0 <= i < len(all_lines):
                if prev >= 0 and i > prev + 1:
                    out.append("...")
                out.append(f"{i + 1}|{all_lines[i]}")
                prev = i
        return "\n".join(out[:MAX_READ_LINES])
    return "\n".join(f"{i + 1}|{l}" for i, l in enumerate(all_lines[:MAX_READ_LINES]))


def execute_list_directory(repo: str, path: str, pattern: str = None) -> str:
    dp = Path(repo) / path
    if not dp.exists():
        return f"Error: directory not found: {path}"
    try:
        r = subprocess.run(
            ["tree", "-L", "3", "-i", "-F", "--noreport", "-I", "__pycache__|node_modules|.git", str(dp)],
            capture_output=True, text=True, timeout=5, cwd=repo,
        )
        lines = r.stdout.strip().split("\n") if r.stdout.strip() else []
        if pattern:
            lines = [l for l in lines if re.search(pattern, l)]
        if len(lines) > MAX_LIST_LINES:
            return "query not specific enough, tool called tried to return too much context and failed"
        return "\n".join(lines)
    except Exception as e:
        return f"Error: {e}"


def format_result(tc: ToolCall, output: str) -> str:
    if tc.name == "grep":
        attrs = f'pattern="{tc.args.get("pattern", "")}"'
        if "sub_dir" in tc.args:
            attrs += f' sub_dir="{tc.args["sub_dir"]}"'
        return f"<grep {attrs}>\n{output}\n</grep>"
    elif tc.name == "read":
        attrs = f'path="{tc.args.get("path", "")}"'
        if "lines" in tc.args:
            attrs += f' lines="{tc.args["lines"]}"'
        return f"<read {attrs}>\n{output}\n</read>"
    elif tc.name == "list_directory":
        return f'<list_directory path="{tc.args.get("path", "")}">\n{output}\n</list_directory>'
    return output


def search_codebase(query: str, repo: str) -> list:
    structure = execute_list_directory(repo, ".", None)
    messages = [
        {"role": "system", "content": SYSTEM_PROMPT},
        {"role": "user", "content": f"<repo_structure>\n{structure}\n</repo_structure>\n\n<search_string>\n{query}\n</search_string>"},
    ]
    
    for turn in range(MAX_TURNS):
        response = call_api(messages)
        messages.append({"role": "assistant", "content": response})
        
        tool_calls = parse_tool_calls(response)
        if not tool_calls:
            break
        
        finish = next((tc for tc in tool_calls if tc.name == "finish"), None)
        if finish:
            return [{"path": f["path"], "content": execute_read(repo, f["path"], f.get("lines"))} for f in finish.args.get("files", [])]
        
        results = []
        for tc in tool_calls:
            if tc.name == "grep":
                out = execute_grep(repo, tc.args.get("pattern", ""), tc.args.get("sub_dir", "."), tc.args.get("glob"))
            elif tc.name == "read":
                out = execute_read(repo, tc.args.get("path", ""), tc.args.get("lines"))
            elif tc.name == "list_directory":
                out = execute_list_directory(repo, tc.args.get("path", "."), tc.args.get("pattern"))
            else:
                out = f"Unknown: {tc.name}"
            results.append(format_result(tc, out))
        
        remaining = MAX_TURNS - turn - 1
        turn_msg = f"\nYou have used {turn + 1} turns and have {remaining} remaining.\n"
        messages.append({"role": "user", "content": "\n\n".join(results) + turn_msg})
        print(f"Turn {turn + 1}: {len(tool_calls)} tools")
    
    return []


if __name__ == "__main__":
    if len(sys.argv) < 3:
        print("Usage: python warp_grep.py 'query' /path/to/repo")
        sys.exit(1)
    
    for r in search_codebase(sys.argv[1], sys.argv[2]):
        print(f"\n{'='*60}\nFile: {r['path']}\n{'='*60}\n{r['content']}")

Next Steps