Skip to main content
A complete Python implementation of the WarpGrep agent loop using OpenAI-compatible tool calling.

Overview

The agent loop:
  1. Send query + repo structure to the API with tool definitions
  2. Receive structured tool_calls from the response (no XML parsing needed)
  3. Execute tools locally (ripgrep, file reads, directory listing, glob)
  4. Send results back as tool messages
  5. Repeat until finish is called (max 6 turns)

Installation

pip install openai
You’ll also need ripgrep installed:
# macOS
brew install ripgrep

# Ubuntu/Debian
apt-get install ripgrep

# Windows
choco install ripgrep

Complete Implementation

Tool Definitions

Define the tools the model can call. These are passed in every API request.
TOOLS = [
    {
        "type": "function",
        "function": {
            "name": "grep_search",
            "description": "Search for a regex pattern in file contents.",
            "parameters": {
                "type": "object",
                "properties": {
                    "pattern": {"type": "string", "description": "Regex pattern to search for."},
                    "path": {"type": "string", "description": "File or directory to search in."},
                    "glob": {"type": "string", "description": "Glob pattern to filter files (e.g. '*.py')."},
                    "limit": {"type": "integer", "description": "Limit output to first N matching lines."},
                },
                "required": ["pattern"],
            },
        },
    },
    {
        "type": "function",
        "function": {
            "name": "read",
            "description": "Read entire files or specific line ranges.",
            "parameters": {
                "type": "object",
                "properties": {
                    "path": {"type": "string", "description": "Absolute file path to read."},
                    "lines": {"type": "string", "description": "Optional line range (e.g. '1-50' or '1-20,45-80')."},
                },
                "required": ["path"],
            },
        },
    },
    {
        "type": "function",
        "function": {
            "name": "list_directory",
            "description": "Execute ls or find commands to explore directory structure.",
            "parameters": {
                "type": "object",
                "properties": {
                    "command": {"type": "string", "description": "Full ls or find command."},
                },
                "required": ["command"],
            },
        },
    },
    {
        "type": "function",
        "function": {
            "name": "glob",
            "description": "Find files by name/extension using glob patterns. Returns absolute paths sorted by modification time.",
            "parameters": {
                "type": "object",
                "properties": {
                    "pattern": {"type": "string", "description": "Glob pattern to match files (e.g. '*.py', 'src/**/*.js')."},
                    "path": {"type": "string", "description": "Directory to search in. Defaults to repository root."},
                },
                "required": ["pattern"],
            },
        },
    },
    {
        "type": "function",
        "function": {
            "name": "finish",
            "description": "Submit final answer with all relevant code locations.",
            "parameters": {
                "type": "object",
                "properties": {
                    "files": {"type": "string", "description": "One file per line as path:lines (e.g. 'src/auth.py:1-50\\nsrc/user.py')."},
                },
                "required": ["files"],
            },
        },
    },
]

API Client

import os
import json
from openai import OpenAI

client = OpenAI(
    api_key=os.environ["MORPH_API_KEY"],
    base_url="https://api.morphllm.com/v1",
)


def call_api(messages: list[dict]) -> dict:
    """Call WarpGrep API, return the assistant message (with tool_calls)."""
    response = client.chat.completions.create(
        model="morph-warp-grep-v2.1",
        messages=messages,
        tools=TOOLS,
        temperature=0.0,
        max_tokens=2048,
    )
    return response.choices[0].message

Tool Executors

Each tool call from the model is executed locally. These functions run ripgrep, read files, list directories, and find files by glob pattern.
import subprocess
from pathlib import Path
import fnmatch

MAX_GREP_LINES = 200
MAX_LIST_LINES = 200
MAX_READ_LINES = 800
MAX_GLOB_FILES = 100


def execute_grep(pattern: str, path: str = ".", glob_filter: str = None, limit: int = None) -> str:
    """Execute ripgrep and return output."""
    cmd = ["rg", "--line-number", "--no-heading", "--color", "never", "-C", "1"]

    if glob_filter:
        cmd.extend(["--glob", glob_filter])

    cmd.extend([pattern, path])

    try:
        result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
        output = result.stdout
    except subprocess.TimeoutExpired:
        return "Error: search timed out"
    except Exception as e:
        return f"Error: {e}"

    lines = output.strip().split("\n") if output.strip() else []

    if limit and len(lines) > limit:
        lines = lines[:limit]
        return "\n".join(lines) + f"\n... (truncated at {limit} lines)"

    if len(lines) > MAX_GREP_LINES:
        return "\n".join(lines[:MAX_GREP_LINES]) + f"\n... (truncated at {MAX_GREP_LINES} lines)"

    return output.strip() if output.strip() else "no matches"


def execute_read(path: str, lines: str = None) -> str:
    """Read file contents with optional line range."""
    file_path = Path(path)

    if not file_path.exists():
        return f"[FILE NOT FOUND] {path} does not exist"

    try:
        with open(file_path, "r") as f:
            all_lines = f.readlines()
    except Exception as e:
        return f"Error: {e}"

    if lines:
        selected = []
        for range_part in lines.split(","):
            range_part = range_part.strip()
            if "-" in range_part:
                start, end = map(int, range_part.split("-"))
            else:
                start = end = int(range_part)
            selected.extend(range(start - 1, min(end, len(all_lines))))

        output_lines = []
        for idx in sorted(set(selected)):
            if 0 <= idx < len(all_lines):
                output_lines.append(f"{idx + 1}|{all_lines[idx].rstrip()}")
    else:
        output_lines = [f"{i + 1}|{line.rstrip()}" for i, line in enumerate(all_lines)]

    if len(output_lines) > MAX_READ_LINES:
        output_lines = output_lines[:MAX_READ_LINES]
        output_lines.append(f"... truncated ({len(all_lines)} total lines)")

    return "\n".join(output_lines)


def execute_list_directory(command: str) -> str:
    """Extract path from command and list directory contents."""
    # Extract path from the command string
    tokens = command.strip().split()
    path_tokens = [t for t in tokens[1:] if not t.startswith("-") and not t.startswith("|")]
    dir_path = Path(path_tokens[0]) if path_tokens else Path(".")

    if not dir_path.exists():
        return f"Error: directory not found: {dir_path}"

    skip = {".git", "node_modules", "__pycache__", ".venv", "venv", "dist", "build", ".next"}
    lines = []

    def walk(p: Path, depth: int = 0):
        if depth > 3 or len(lines) >= MAX_LIST_LINES:
            return
        try:
            for item in sorted(p.iterdir()):
                if item.name.startswith(".") or item.name in skip:
                    continue
                indent = "  " * depth
                suffix = "/" if item.is_dir() else ""
                lines.append(f"{indent}{item.name}{suffix}")
                if item.is_dir():
                    walk(item, depth + 1)
        except PermissionError:
            pass

    walk(dir_path)
    return "\n".join(lines[:MAX_LIST_LINES])


def execute_glob(pattern: str, path: str = None) -> str:
    """Find files matching a glob pattern, sorted by mtime (newest first)."""
    search_dir = Path(path) if path else Path(".")

    if not search_dir.exists() or not search_dir.is_dir():
        return f"Error: directory not found: {search_dir}"

    # Use rglob for recursive search
    if "/" in pattern or "**" in pattern:
        matches = list(search_dir.glob(pattern))
    else:
        matches = list(search_dir.rglob(pattern))

    # Filter out junk directories
    skip = {".git", "node_modules", "__pycache__", ".venv", "venv", "dist", "build"}
    matches = [m for m in matches if m.is_file() and not any(s in m.parts for s in skip)]

    # Sort by mtime descending (newest first)
    matches.sort(key=lambda p: p.stat().st_mtime, reverse=True)

    # Cap at max results
    matches = matches[:MAX_GLOB_FILES]

    if not matches:
        return "no matches"

    abs_paths = [str(m.resolve()) for m in matches]
    header = f'Found {len(abs_paths)} file(s) matching "{pattern}" within {search_dir.resolve()}, sorted by modification time (newest first):'
    return f"{header}\n---\n" + "\n".join(abs_paths) + "\n---"

Tool Dispatcher

Route each tool call to the right executor.
def dispatch_tool(name: str, arguments: dict) -> str:
    """Execute a tool call and return the output string."""
    if name == "grep_search":
        return execute_grep(
            pattern=arguments["pattern"],
            path=arguments.get("path", "."),
            glob_filter=arguments.get("glob"),
            limit=arguments.get("limit"),
        )
    elif name == "read":
        return execute_read(
            path=arguments["path"],
            lines=arguments.get("lines"),
        )
    elif name == "list_directory":
        return execute_list_directory(arguments["command"])
    elif name == "glob":
        return execute_glob(
            pattern=arguments["pattern"],
            path=arguments.get("path"),
        )
    else:
        return f"Unknown tool: {name}"

Agent Loop

The main loop ties everything together using standard OpenAI tool calling flow.
def get_repo_structure(repo_root: str, max_depth: int = 2) -> str:
    """Build flat absolute path listing for initial message."""
    root = Path(repo_root).resolve()
    skip = {".git", "node_modules", "__pycache__", ".venv", "venv", "dist", "build"}
    lines = [str(root)]

    def walk(p: Path, depth: int):
        if depth > max_depth:
            return
        try:
            for item in sorted(p.iterdir()):
                if item.name.startswith(".") or item.name in skip:
                    continue
                lines.append(str(item))
                if item.is_dir():
                    walk(item, depth + 1)
        except PermissionError:
            pass

    walk(root, 0)
    return "\n".join(lines)


def search_codebase(query: str, repo_root: str) -> list[dict]:
    """
    Run the WarpGrep agent loop.

    Returns a list of {path, content} dicts with the relevant code.
    """
    repo_structure = get_repo_structure(repo_root)
    initial_content = (
        f"<repo_structure>\n{repo_structure}\n</repo_structure>\n\n"
        f"<search_string>\n{query}\n</search_string>"
    )

    messages = [{"role": "user", "content": initial_content}]

    max_turns = 6

    for turn in range(1, max_turns + 1):
        # Call API
        assistant_msg = call_api(messages)

        # Add assistant message to history
        messages.append(assistant_msg.model_dump())

        tool_calls = assistant_msg.tool_calls or []

        if not tool_calls:
            print(f"Turn {turn}: No tool calls, terminating")
            break

        # Check for finish
        finish_call = next((tc for tc in tool_calls if tc.function.name == "finish"), None)
        if finish_call:
            args = json.loads(finish_call.function.arguments)
            return resolve_finish(args.get("files", ""))

        # Execute all tool calls
        for tc in tool_calls:
            args = json.loads(tc.function.arguments)
            output = dispatch_tool(tc.function.name, args)
            messages.append({
                "role": "tool",
                "tool_call_id": tc.id,
                "content": output,
            })

        # Add turn counter
        remaining = max_turns - turn
        if remaining <= 1:
            turn_msg = f"You have used {turn} turns, you only have 1 turn remaining. You have run out of turns to explore the code base and MUST call the finish tool now"
        else:
            turn_msg = f"You have used {turn} turn{'s' if turn != 1 else ''} and have {remaining} remaining"
        messages.append({"role": "user", "content": turn_msg})

        print(f"Turn {turn}: Executed {len(tool_calls)} tools")

    return []


def resolve_finish(files_str: str) -> list[dict]:
    """Read file ranges from a finish call."""
    results = []

    for line in files_str.strip().splitlines():
        line = line.strip()
        if not line:
            continue

        if ":" in line:
            path, lines = line.rsplit(":", 1)
            if lines == "*":
                lines = None
        else:
            path, lines = line, None

        content = execute_read(path, lines)
        results.append({"path": path, "content": content})

    return results

Usage

if __name__ == "__main__":
    results = search_codebase(
        query="Find where user authentication is implemented",
        repo_root="/path/to/your/repo",
    )

    for r in results:
        print(f"\n{'='*60}")
        print(f"File: {r['path']}")
        print('='*60)
        print(r['content'])

Next Steps