Overview
The agent loop:- Send query + repo structure to the API with tool definitions
- Receive structured
tool_callsfrom the response (no XML parsing needed) - Execute tools locally (ripgrep, file reads, directory listing, glob)
- Send results back as
toolmessages - Repeat until
finishis called (max 6 turns)
Installation
Copy
Ask AI
pip install openai
ripgrep installed:
Copy
Ask AI
# macOS
brew install ripgrep
# Ubuntu/Debian
apt-get install ripgrep
# Windows
choco install ripgrep
Complete Implementation
Tool Definitions
Define the tools the model can call. These are passed in every API request.Copy
Ask AI
TOOLS = [
{
"type": "function",
"function": {
"name": "grep_search",
"description": "Search for a regex pattern in file contents.",
"parameters": {
"type": "object",
"properties": {
"pattern": {"type": "string", "description": "Regex pattern to search for."},
"path": {"type": "string", "description": "File or directory to search in."},
"glob": {"type": "string", "description": "Glob pattern to filter files (e.g. '*.py')."},
"limit": {"type": "integer", "description": "Limit output to first N matching lines."},
},
"required": ["pattern"],
},
},
},
{
"type": "function",
"function": {
"name": "read",
"description": "Read entire files or specific line ranges.",
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string", "description": "Absolute file path to read."},
"lines": {"type": "string", "description": "Optional line range (e.g. '1-50' or '1-20,45-80')."},
},
"required": ["path"],
},
},
},
{
"type": "function",
"function": {
"name": "list_directory",
"description": "Execute ls or find commands to explore directory structure.",
"parameters": {
"type": "object",
"properties": {
"command": {"type": "string", "description": "Full ls or find command."},
},
"required": ["command"],
},
},
},
{
"type": "function",
"function": {
"name": "glob",
"description": "Find files by name/extension using glob patterns. Returns absolute paths sorted by modification time.",
"parameters": {
"type": "object",
"properties": {
"pattern": {"type": "string", "description": "Glob pattern to match files (e.g. '*.py', 'src/**/*.js')."},
"path": {"type": "string", "description": "Directory to search in. Defaults to repository root."},
},
"required": ["pattern"],
},
},
},
{
"type": "function",
"function": {
"name": "finish",
"description": "Submit final answer with all relevant code locations.",
"parameters": {
"type": "object",
"properties": {
"files": {"type": "string", "description": "One file per line as path:lines (e.g. 'src/auth.py:1-50\\nsrc/user.py')."},
},
"required": ["files"],
},
},
},
]
API Client
Copy
Ask AI
import os
import json
from openai import OpenAI
client = OpenAI(
api_key=os.environ["MORPH_API_KEY"],
base_url="https://api.morphllm.com/v1",
)
def call_api(messages: list[dict]) -> dict:
"""Call WarpGrep API, return the assistant message (with tool_calls)."""
response = client.chat.completions.create(
model="morph-warp-grep-v2.1",
messages=messages,
tools=TOOLS,
temperature=0.0,
max_tokens=2048,
)
return response.choices[0].message
Tool Executors
Each tool call from the model is executed locally. These functions run ripgrep, read files, list directories, and find files by glob pattern.Copy
Ask AI
import subprocess
from pathlib import Path
import fnmatch
MAX_GREP_LINES = 200
MAX_LIST_LINES = 200
MAX_READ_LINES = 800
MAX_GLOB_FILES = 100
def execute_grep(pattern: str, path: str = ".", glob_filter: str = None, limit: int = None) -> str:
"""Execute ripgrep and return output."""
cmd = ["rg", "--line-number", "--no-heading", "--color", "never", "-C", "1"]
if glob_filter:
cmd.extend(["--glob", glob_filter])
cmd.extend([pattern, path])
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
output = result.stdout
except subprocess.TimeoutExpired:
return "Error: search timed out"
except Exception as e:
return f"Error: {e}"
lines = output.strip().split("\n") if output.strip() else []
if limit and len(lines) > limit:
lines = lines[:limit]
return "\n".join(lines) + f"\n... (truncated at {limit} lines)"
if len(lines) > MAX_GREP_LINES:
return "\n".join(lines[:MAX_GREP_LINES]) + f"\n... (truncated at {MAX_GREP_LINES} lines)"
return output.strip() if output.strip() else "no matches"
def execute_read(path: str, lines: str = None) -> str:
"""Read file contents with optional line range."""
file_path = Path(path)
if not file_path.exists():
return f"[FILE NOT FOUND] {path} does not exist"
try:
with open(file_path, "r") as f:
all_lines = f.readlines()
except Exception as e:
return f"Error: {e}"
if lines:
selected = []
for range_part in lines.split(","):
range_part = range_part.strip()
if "-" in range_part:
start, end = map(int, range_part.split("-"))
else:
start = end = int(range_part)
selected.extend(range(start - 1, min(end, len(all_lines))))
output_lines = []
for idx in sorted(set(selected)):
if 0 <= idx < len(all_lines):
output_lines.append(f"{idx + 1}|{all_lines[idx].rstrip()}")
else:
output_lines = [f"{i + 1}|{line.rstrip()}" for i, line in enumerate(all_lines)]
if len(output_lines) > MAX_READ_LINES:
output_lines = output_lines[:MAX_READ_LINES]
output_lines.append(f"... truncated ({len(all_lines)} total lines)")
return "\n".join(output_lines)
def execute_list_directory(command: str) -> str:
"""Extract path from command and list directory contents."""
# Extract path from the command string
tokens = command.strip().split()
path_tokens = [t for t in tokens[1:] if not t.startswith("-") and not t.startswith("|")]
dir_path = Path(path_tokens[0]) if path_tokens else Path(".")
if not dir_path.exists():
return f"Error: directory not found: {dir_path}"
skip = {".git", "node_modules", "__pycache__", ".venv", "venv", "dist", "build", ".next"}
lines = []
def walk(p: Path, depth: int = 0):
if depth > 3 or len(lines) >= MAX_LIST_LINES:
return
try:
for item in sorted(p.iterdir()):
if item.name.startswith(".") or item.name in skip:
continue
indent = " " * depth
suffix = "/" if item.is_dir() else ""
lines.append(f"{indent}{item.name}{suffix}")
if item.is_dir():
walk(item, depth + 1)
except PermissionError:
pass
walk(dir_path)
return "\n".join(lines[:MAX_LIST_LINES])
def execute_glob(pattern: str, path: str = None) -> str:
"""Find files matching a glob pattern, sorted by mtime (newest first)."""
search_dir = Path(path) if path else Path(".")
if not search_dir.exists() or not search_dir.is_dir():
return f"Error: directory not found: {search_dir}"
# Use rglob for recursive search
if "/" in pattern or "**" in pattern:
matches = list(search_dir.glob(pattern))
else:
matches = list(search_dir.rglob(pattern))
# Filter out junk directories
skip = {".git", "node_modules", "__pycache__", ".venv", "venv", "dist", "build"}
matches = [m for m in matches if m.is_file() and not any(s in m.parts for s in skip)]
# Sort by mtime descending (newest first)
matches.sort(key=lambda p: p.stat().st_mtime, reverse=True)
# Cap at max results
matches = matches[:MAX_GLOB_FILES]
if not matches:
return "no matches"
abs_paths = [str(m.resolve()) for m in matches]
header = f'Found {len(abs_paths)} file(s) matching "{pattern}" within {search_dir.resolve()}, sorted by modification time (newest first):'
return f"{header}\n---\n" + "\n".join(abs_paths) + "\n---"
Tool Dispatcher
Route each tool call to the right executor.Copy
Ask AI
def dispatch_tool(name: str, arguments: dict) -> str:
"""Execute a tool call and return the output string."""
if name == "grep_search":
return execute_grep(
pattern=arguments["pattern"],
path=arguments.get("path", "."),
glob_filter=arguments.get("glob"),
limit=arguments.get("limit"),
)
elif name == "read":
return execute_read(
path=arguments["path"],
lines=arguments.get("lines"),
)
elif name == "list_directory":
return execute_list_directory(arguments["command"])
elif name == "glob":
return execute_glob(
pattern=arguments["pattern"],
path=arguments.get("path"),
)
else:
return f"Unknown tool: {name}"
Agent Loop
The main loop ties everything together using standard OpenAI tool calling flow.Copy
Ask AI
def get_repo_structure(repo_root: str, max_depth: int = 2) -> str:
"""Build flat absolute path listing for initial message."""
root = Path(repo_root).resolve()
skip = {".git", "node_modules", "__pycache__", ".venv", "venv", "dist", "build"}
lines = [str(root)]
def walk(p: Path, depth: int):
if depth > max_depth:
return
try:
for item in sorted(p.iterdir()):
if item.name.startswith(".") or item.name in skip:
continue
lines.append(str(item))
if item.is_dir():
walk(item, depth + 1)
except PermissionError:
pass
walk(root, 0)
return "\n".join(lines)
def search_codebase(query: str, repo_root: str) -> list[dict]:
"""
Run the WarpGrep agent loop.
Returns a list of {path, content} dicts with the relevant code.
"""
repo_structure = get_repo_structure(repo_root)
initial_content = (
f"<repo_structure>\n{repo_structure}\n</repo_structure>\n\n"
f"<search_string>\n{query}\n</search_string>"
)
messages = [{"role": "user", "content": initial_content}]
max_turns = 6
for turn in range(1, max_turns + 1):
# Call API
assistant_msg = call_api(messages)
# Add assistant message to history
messages.append(assistant_msg.model_dump())
tool_calls = assistant_msg.tool_calls or []
if not tool_calls:
print(f"Turn {turn}: No tool calls, terminating")
break
# Check for finish
finish_call = next((tc for tc in tool_calls if tc.function.name == "finish"), None)
if finish_call:
args = json.loads(finish_call.function.arguments)
return resolve_finish(args.get("files", ""))
# Execute all tool calls
for tc in tool_calls:
args = json.loads(tc.function.arguments)
output = dispatch_tool(tc.function.name, args)
messages.append({
"role": "tool",
"tool_call_id": tc.id,
"content": output,
})
# Add turn counter
remaining = max_turns - turn
if remaining <= 1:
turn_msg = f"You have used {turn} turns, you only have 1 turn remaining. You have run out of turns to explore the code base and MUST call the finish tool now"
else:
turn_msg = f"You have used {turn} turn{'s' if turn != 1 else ''} and have {remaining} remaining"
messages.append({"role": "user", "content": turn_msg})
print(f"Turn {turn}: Executed {len(tool_calls)} tools")
return []
def resolve_finish(files_str: str) -> list[dict]:
"""Read file ranges from a finish call."""
results = []
for line in files_str.strip().splitlines():
line = line.strip()
if not line:
continue
if ":" in line:
path, lines = line.rsplit(":", 1)
if lines == "*":
lines = None
else:
path, lines = line, None
content = execute_read(path, lines)
results.append({"path": path, "content": content})
return results
Usage
Copy
Ask AI
if __name__ == "__main__":
results = search_codebase(
query="Find where user authentication is implemented",
repo_root="/path/to/your/repo",
)
for r in results:
print(f"\n{'='*60}")
print(f"File: {r['path']}")
print('='*60)
print(r['content'])
Next Steps
- Direct API Access — Full protocol reference
- TypeScript SDK Tool — Use WarpGrep in TypeScript agents
- MCP Integration — Use via Model Context Protocol