For the raw API protocol and message formats, see Direct API Access.
Overview
The agent loop:- Send query + repo structure to the API
- Parse XML tool calls from the response
- Execute tools locally (ripgrep, file reads, tree)
- Format results and send back
- Repeat until
finishis called (max 4 turns)
Installation
Copy
Ask AI
pip install requests
ripgrep installed:
Copy
Ask AI
# macOS
brew install ripgrep
# Ubuntu/Debian
apt-get install ripgrep
# Windows
choco install ripgrep
Complete Implementation
API Client
Copy
Ask AI
import os
import requests
MORPH_API_KEY = os.environ.get("MORPH_API_KEY")
API_URL = "https://api.morphllm.com/v1/chat/completions"
def call_api(messages: list[dict]) -> str:
"""Call the Warp Grep API and return the response content."""
response = requests.post(
API_URL,
headers={
"Authorization": f"Bearer {MORPH_API_KEY}",
"Content-Type": "application/json",
},
json={
"model": "morph-warp-grep",
"messages": messages,
"temperature": 0.0,
"max_tokens": 2048,
},
)
response.raise_for_status()
return response.json()["choices"][0]["message"]["content"]
XML Parser
Copy
Ask AI
import re
from dataclasses import dataclass
@dataclass
class ToolCall:
name: str
args: dict[str, str]
def parse_tool_calls(response: str) -> list[ToolCall]:
"""Parse XML tool calls from model response."""
# Remove <think> blocks
response = re.sub(r"<think>.*?</think>", "", response, flags=re.DOTALL)
tool_calls = []
# Parse each tool type
for tool_name in ["grep", "read", "list_directory", "finish"]:
pattern = rf"<{tool_name}>(.*?)</{tool_name}>"
for match in re.finditer(pattern, response, re.DOTALL):
content = match.group(1)
args = parse_xml_elements(content)
tool_calls.append(ToolCall(name=tool_name, args=args))
return tool_calls
def parse_xml_elements(content: str) -> dict[str, str]:
"""Parse nested XML elements into a dictionary."""
args = {}
# Match <element>value</element>
pattern = r"<(\w+)>(.*?)</\1>"
for match in re.finditer(pattern, content, re.DOTALL):
key = match.group(1)
value = match.group(2).strip()
# Handle nested <file> elements in finish
if key == "file":
if "files" not in args:
args["files"] = []
file_args = parse_xml_elements(value)
args["files"].append(file_args)
else:
args[key] = value
return args
Tool Executors
Copy
Ask AI
import subprocess
from pathlib import Path
MAX_GREP_LINES = 200
MAX_LIST_LINES = 200
MAX_READ_LINES = 800
def execute_grep(repo_root: str, pattern: str, sub_dir: str = ".", glob: str = None) -> str:
"""Execute ripgrep and return formatted output."""
path = Path(repo_root) / sub_dir
cmd = [
"rg",
"--line-number",
"--no-heading",
"--color", "never",
"-C", "1", # 1 line of context
]
if glob:
cmd.extend(["--glob", glob])
cmd.extend([pattern, str(path)])
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=10,
cwd=repo_root,
)
output = result.stdout
except subprocess.TimeoutExpired:
return "Error: search timed out"
except Exception as e:
return f"Error: {e}"
lines = output.strip().split("\n") if output.strip() else []
if len(lines) > MAX_GREP_LINES:
return "query not specific enough, tool called tried to return too much context and failed"
return output.strip() if output.strip() else "no matches"
def execute_read(repo_root: str, path: str, lines: str = None) -> str:
"""Read file contents with optional line range."""
file_path = Path(repo_root) / path
if not file_path.exists():
return f"Error: file not found: {path}"
try:
with open(file_path, "r") as f:
all_lines = f.readlines()
except Exception as e:
return f"Error: {e}"
if lines:
# Parse line ranges like "1-50" or "1-20,45-80"
selected = []
for range_part in lines.split(","):
if "-" in range_part:
start, end = map(int, range_part.split("-"))
else:
start = end = int(range_part)
# Convert to 0-indexed
selected.extend(range(start - 1, min(end, len(all_lines))))
output_lines = []
prev_idx = -2
for idx in sorted(set(selected)):
if idx < 0 or idx >= len(all_lines):
continue
if prev_idx >= 0 and idx > prev_idx + 1:
output_lines.append("...")
output_lines.append(f"{idx + 1}|{all_lines[idx].rstrip()}")
prev_idx = idx
else:
output_lines = [f"{i + 1}|{line.rstrip()}" for i, line in enumerate(all_lines)]
if len(output_lines) > MAX_READ_LINES:
output_lines = output_lines[:MAX_READ_LINES]
output_lines.append(f"... truncated ({len(all_lines)} total lines)")
return "\n".join(output_lines)
def execute_list_directory(repo_root: str, path: str, pattern: str = None) -> str:
"""List directory structure using tree."""
dir_path = Path(repo_root) / path
if not dir_path.exists():
return f"Error: directory not found: {path}"
cmd = [
"tree",
"-L", "3",
"-i",
"-F",
"--noreport",
"-I", "__pycache__|node_modules|.git|*.pyc|.DS_Store|.venv|venv|dist|build",
str(dir_path),
]
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=5,
cwd=repo_root,
)
output = result.stdout
except FileNotFoundError:
# Fallback if tree not installed
return fallback_list_dir(dir_path, pattern)
except Exception as e:
return f"Error: {e}"
lines = output.strip().split("\n") if output.strip() else []
# Apply regex filter if provided
if pattern:
import re as regex
try:
compiled = regex.compile(pattern)
lines = [l for l in lines if compiled.search(l)]
except:
pass
if len(lines) > MAX_LIST_LINES:
return "query not specific enough, tool called tried to return too much context and failed"
return "\n".join(lines)
def fallback_list_dir(dir_path: Path, pattern: str = None, max_depth: int = 3) -> str:
"""Fallback directory listing without tree command."""
import re as regex
lines = []
compiled = regex.compile(pattern) if pattern else None
def walk(p: Path, depth: int = 0):
if depth > max_depth:
return
try:
for item in sorted(p.iterdir()):
if item.name.startswith("."):
continue
if item.name in {"node_modules", "__pycache__", "venv", ".venv", "dist", "build"}:
continue
rel = item.relative_to(dir_path.parent)
indent = " " * depth
suffix = "/" if item.is_dir() else ""
line = f"{indent}{item.name}{suffix}"
if compiled is None or compiled.search(line):
lines.append(line)
if item.is_dir():
walk(item, depth + 1)
except PermissionError:
pass
walk(dir_path)
return "\n".join(lines[:MAX_LIST_LINES])
Result Formatter
Copy
Ask AI
def format_result(tool_call: ToolCall, output: str) -> str:
"""Format tool result with XML wrapper."""
if tool_call.name == "grep":
attrs = f'pattern="{tool_call.args.get("pattern", "")}"'
if "sub_dir" in tool_call.args:
attrs += f' sub_dir="{tool_call.args["sub_dir"]}"'
if "glob" in tool_call.args:
attrs += f' glob="{tool_call.args["glob"]}"'
return f"<grep {attrs}>\n{output}\n</grep>"
elif tool_call.name == "read":
attrs = f'path="{tool_call.args.get("path", "")}"'
if "lines" in tool_call.args:
attrs += f' lines="{tool_call.args["lines"]}"'
return f"<read {attrs}>\n{output}\n</read>"
elif tool_call.name == "list_directory":
attrs = f'path="{tool_call.args.get("path", "")}"'
return f"<list_directory {attrs}>\n{output}\n</list_directory>"
return output
def format_turn_message(turn: int, chars_used: int = 0, max_chars: int = 160000) -> str:
"""Format the turn counter message."""
remaining = 4 - turn
if turn >= 3:
msg = "You have used 3 turns, you only have 1 turn remaining. You have run out of turns to explore the code base and MUST call the finish tool now"
else:
msg = f"You have used {turn} turn{'s' if turn != 1 else ''} and have {remaining} remaining"
pct = int((chars_used / max_chars) * 100) if max_chars > 0 else 0
budget = f"<context_budget>{pct}% ({chars_used // 1000}K/{max_chars // 1000}K chars)</context_budget>"
return f"\n{msg}\n{budget}"
Agent Loop
Copy
Ask AI
def get_repo_structure(repo_root: str) -> str:
"""Get initial repo structure for the first message."""
output = execute_list_directory(repo_root, ".", None)
return f"<repo_structure>\n{output}\n</repo_structure>"
def search_codebase(query: str, repo_root: str) -> list[dict]:
"""
Run the Warp Grep agent loop.
Returns a list of {path, content} dicts with the relevant code.
"""
# System prompt (abbreviated - see full version in docs)
system_prompt = """You are a code search agent. Find all relevant code for a given query.
You have 4 turns max. Each turn allows up to 8 parallel tool calls.
The 4th turn MUST be a `finish` call.
Tools:
- <grep><pattern>...</pattern><sub_dir>...</sub_dir><glob>...</glob></grep>
- <read><path>...</path><lines>...</lines></read>
- <list_directory><path>...</path><pattern>...</pattern></list_directory>
- <finish><file><path>...</path><lines>...</lines></file>...</finish>
Always start with <think>...</think> then tool calls.
"""
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": f"{get_repo_structure(repo_root)}\n\n<search_string>\n{query}\n</search_string>"},
]
max_turns = 4
chars_used = sum(len(m["content"]) for m in messages)
for turn in range(max_turns):
# Call API
response = call_api(messages)
messages.append({"role": "assistant", "content": response})
chars_used += len(response)
# Parse tool calls
tool_calls = parse_tool_calls(response)
if not tool_calls:
print(f"Turn {turn + 1}: No tool calls, terminating")
break
# Check for finish
finish_call = next((tc for tc in tool_calls if tc.name == "finish"), None)
if finish_call:
return resolve_finish(repo_root, finish_call)
# Execute tools
results = []
for tc in tool_calls:
if tc.name == "grep":
output = execute_grep(
repo_root,
tc.args.get("pattern", ""),
tc.args.get("sub_dir", "."),
tc.args.get("glob"),
)
elif tc.name == "read":
output = execute_read(
repo_root,
tc.args.get("path", ""),
tc.args.get("lines"),
)
elif tc.name == "list_directory":
output = execute_list_directory(
repo_root,
tc.args.get("path", "."),
tc.args.get("pattern"),
)
else:
output = f"Unknown tool: {tc.name}"
results.append(format_result(tc, output))
# Send results back
result_content = "\n\n".join(results) + format_turn_message(turn + 1, chars_used)
messages.append({"role": "user", "content": result_content})
chars_used += len(result_content)
print(f"Turn {turn + 1}: Executed {len(tool_calls)} tools")
return []
def resolve_finish(repo_root: str, finish_call: ToolCall) -> list[dict]:
"""Read file ranges from a finish call."""
results = []
files = finish_call.args.get("files", [])
for file_spec in files:
path = file_spec.get("path", "")
lines = file_spec.get("lines")
if lines == "*":
lines = None # Read entire file
content = execute_read(repo_root, path, lines)
results.append({"path": path, "content": content})
return results
Usage
Copy
Ask AI
if __name__ == "__main__":
results = search_codebase(
query="Find where user authentication is implemented",
repo_root="/path/to/your/repo",
)
for r in results:
print(f"\n{'='*60}")
print(f"File: {r['path']}")
print('='*60)
print(r['content'])
Single File Version
Copy-paste ready single file
Copy-paste ready single file
Copy
Ask AI
#!/usr/bin/env python3
"""
Warp Grep Agent - Complete Implementation
==========================================
A Python implementation of the Warp Grep code search agent.
Usage:
export MORPH_API_KEY=your_key
python warp_grep.py "Find authentication middleware" /path/to/repo
"""
import os
import re
import subprocess
import sys
from dataclasses import dataclass
from pathlib import Path
import requests
# Config
MORPH_API_KEY = os.environ.get("MORPH_API_KEY")
API_URL = "https://api.morphllm.com/v1/chat/completions"
MAX_TURNS = 4
MAX_GREP_LINES = 200
MAX_LIST_LINES = 200
MAX_READ_LINES = 800
SYSTEM_PROMPT = """You are a code search agent. Find all relevant code for a given query.
You have 4 turns max. The 4th turn MUST be a `finish` call.
Tools:
- <grep><pattern>REGEX</pattern><sub_dir>PATH</sub_dir><glob>*.py</glob></grep>
- <read><path>FILE</path><lines>1-50</lines></read>
- <list_directory><path>DIR</path><pattern>REGEX</pattern></list_directory>
- <finish><file><path>FILE</path><lines>1-50,80-100</lines></file></finish>
Always wrap reasoning in <think>...</think> then output tool calls.
"""
@dataclass
class ToolCall:
name: str
args: dict
def call_api(messages: list) -> str:
resp = requests.post(
API_URL,
headers={"Authorization": f"Bearer {MORPH_API_KEY}", "Content-Type": "application/json"},
json={"model": "morph-warp-grep", "messages": messages, "temperature": 0.0, "max_tokens": 2048},
)
resp.raise_for_status()
return resp.json()["choices"][0]["message"]["content"]
def parse_xml_elements(content: str) -> dict:
args = {}
for match in re.finditer(r"<(\w+)>(.*?)</\1>", content, re.DOTALL):
key, value = match.group(1), match.group(2).strip()
if key == "file":
args.setdefault("files", []).append(parse_xml_elements(value))
else:
args[key] = value
return args
def parse_tool_calls(response: str) -> list:
response = re.sub(r"<think>.*?</think>", "", response, flags=re.DOTALL)
calls = []
for name in ["grep", "read", "list_directory", "finish"]:
for match in re.finditer(rf"<{name}>(.*?)</{name}>", response, re.DOTALL):
calls.append(ToolCall(name=name, args=parse_xml_elements(match.group(1))))
return calls
def execute_grep(repo: str, pattern: str, sub_dir: str = ".", glob: str = None) -> str:
cmd = ["rg", "--line-number", "--no-heading", "--color", "never", "-C", "1"]
if glob:
cmd.extend(["--glob", glob])
cmd.extend([pattern, str(Path(repo) / sub_dir)])
try:
r = subprocess.run(cmd, capture_output=True, text=True, timeout=10, cwd=repo)
lines = r.stdout.strip().split("\n") if r.stdout.strip() else []
if len(lines) > MAX_GREP_LINES:
return "query not specific enough, tool called tried to return too much context and failed"
return r.stdout.strip() or "no matches"
except Exception as e:
return f"Error: {e}"
def execute_read(repo: str, path: str, lines: str = None) -> str:
fp = Path(repo) / path
if not fp.exists():
return f"Error: file not found: {path}"
try:
all_lines = fp.read_text().splitlines()
except Exception as e:
return f"Error: {e}"
if lines and lines != "*":
selected = []
for part in lines.split(","):
if "-" in part:
s, e = map(int, part.split("-"))
else:
s = e = int(part)
selected.extend(range(s - 1, min(e, len(all_lines))))
out, prev = [], -2
for i in sorted(set(selected)):
if 0 <= i < len(all_lines):
if prev >= 0 and i > prev + 1:
out.append("...")
out.append(f"{i + 1}|{all_lines[i]}")
prev = i
return "\n".join(out[:MAX_READ_LINES])
return "\n".join(f"{i + 1}|{l}" for i, l in enumerate(all_lines[:MAX_READ_LINES]))
def execute_list_directory(repo: str, path: str, pattern: str = None) -> str:
dp = Path(repo) / path
if not dp.exists():
return f"Error: directory not found: {path}"
try:
r = subprocess.run(
["tree", "-L", "3", "-i", "-F", "--noreport", "-I", "__pycache__|node_modules|.git", str(dp)],
capture_output=True, text=True, timeout=5, cwd=repo,
)
lines = r.stdout.strip().split("\n") if r.stdout.strip() else []
if pattern:
lines = [l for l in lines if re.search(pattern, l)]
if len(lines) > MAX_LIST_LINES:
return "query not specific enough, tool called tried to return too much context and failed"
return "\n".join(lines)
except Exception as e:
return f"Error: {e}"
def format_result(tc: ToolCall, output: str) -> str:
if tc.name == "grep":
attrs = f'pattern="{tc.args.get("pattern", "")}"'
if "sub_dir" in tc.args:
attrs += f' sub_dir="{tc.args["sub_dir"]}"'
return f"<grep {attrs}>\n{output}\n</grep>"
elif tc.name == "read":
attrs = f'path="{tc.args.get("path", "")}"'
if "lines" in tc.args:
attrs += f' lines="{tc.args["lines"]}"'
return f"<read {attrs}>\n{output}\n</read>"
elif tc.name == "list_directory":
return f'<list_directory path="{tc.args.get("path", "")}">\n{output}\n</list_directory>'
return output
def search_codebase(query: str, repo: str) -> list:
structure = execute_list_directory(repo, ".", None)
messages = [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": f"<repo_structure>\n{structure}\n</repo_structure>\n\n<search_string>\n{query}\n</search_string>"},
]
for turn in range(MAX_TURNS):
response = call_api(messages)
messages.append({"role": "assistant", "content": response})
tool_calls = parse_tool_calls(response)
if not tool_calls:
break
finish = next((tc for tc in tool_calls if tc.name == "finish"), None)
if finish:
return [{"path": f["path"], "content": execute_read(repo, f["path"], f.get("lines"))} for f in finish.args.get("files", [])]
results = []
for tc in tool_calls:
if tc.name == "grep":
out = execute_grep(repo, tc.args.get("pattern", ""), tc.args.get("sub_dir", "."), tc.args.get("glob"))
elif tc.name == "read":
out = execute_read(repo, tc.args.get("path", ""), tc.args.get("lines"))
elif tc.name == "list_directory":
out = execute_list_directory(repo, tc.args.get("path", "."), tc.args.get("pattern"))
else:
out = f"Unknown: {tc.name}"
results.append(format_result(tc, out))
remaining = MAX_TURNS - turn - 1
turn_msg = f"\nYou have used {turn + 1} turns and have {remaining} remaining.\n"
messages.append({"role": "user", "content": "\n\n".join(results) + turn_msg})
print(f"Turn {turn + 1}: {len(tool_calls)} tools")
return []
if __name__ == "__main__":
if len(sys.argv) < 3:
print("Usage: python warp_grep.py 'query' /path/to/repo")
sys.exit(1)
for r in search_codebase(sys.argv[1], sys.argv[2]):
print(f"\n{'='*60}\nFile: {r['path']}\n{'='*60}\n{r['content']}")
Next Steps
- Direct API Access — Full protocol reference
- TypeScript SDK Tool — Use Warp Grep in TypeScript agents
- MCP Integration — Use via Model Context Protocol