blob: 460b43076241cc953b7c95f9935ff6e70fd340c2 [file] [log] [blame]
#!/usr/bin/env fuchsia-vendored-python
# Copyright 2025 The Fuchsia Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import argparse
import json
import logging
import os
import subprocess
import sys
import tempfile
from dataclasses import dataclass
from urllib import request
# prompt 1 - get key lines to highlight in stack trace.
_PROMPT_GET_KEY_LINES = (
"You are an expert Fuchsia OS developer's assistant.\n"
"Your task is to analyze the following test failure stack trace and return a\n"
"JSON object. Your response MUST be ONLY a valid, minified JSON object.\n"
"\n"
"The JSON object must have the following structure:\n"
# Doubled braces here to escape them for .format()
"{{\n"
' "primary_key_line": "...",\n'
' "secondary_key_lines": ["...", "..."]\n'
# And doubled braces here
"}}\n"
"\n"
"INSTRUCTIONS:\n"
"1. `primary_key_line`: You MUST identify the single most important line\n"
" that indicates the root cause. This line MUST be an exact, VERBATIM\n"
" copy of a line from the stack trace. It is the single line a\n"
" developer would click on to debug the error. \n"
" **IT MUST BE AN ACTUAL LINE FROM THE LOG.**\n"
" **DO NOT** return generic summaries like '[FAILED] tests::it_works',\n"
" 'test failed.', 'fatal exception', or 'thread X failed...'.\n"
" The primary line MUST be actionable. A developer needs to be able to look\n"
" at the line and know where to start debugging. Generic messages like\n"
" 'thread panicked' are useless. The line must contain a file path, a line\n"
" number, or a specific error that directly points to the code that failed.\n"
" Triple-check this. Think from the point of a developer: will this line\n"
" give them additional insight, or is it something obvious they already know?\n"
" **DO** select the specific line with the PANIC, the assertion error, or\n"
" the file path and line number (e.g., '[...][it_works] ERROR: [...]').\n"
"\n"
"2. `secondary_key_lines`: You MUST identify a list of 0 or more other\n"
" lines that provide helpful context. Use this for cases of uncertainty\n"
" or to highlight related symptoms. This value MUST be an array of\n"
" strings, each copied verbatim. If there are no other useful lines,\n"
" return an empty array: [].\n"
"\n"
"Stack Trace:\n"
"{stack_trace}\n"
)
# prompt 2 - pass git diff as well to get root cause analysis.
_PROMPT_ANALYZE_FAILURE_V2 = (
"\nYou are an expert Fuchsia OS developer's assistant. Your task is to\n"
"analyze a test failure and provide a concise, standardized debugging\n"
"report. Format your entire response for a plain text terminal. Do not use\n"
"any markdown.\n"
"\n"
"Your response MUST strictly follow this format, using these exact headers:\n"
"\n"
"## POTENTIAL ERROR\n"
"Analyze the stack trace and git diff to determine the most likely root\n"
"cause. Provide ONLY the root cause analysis. Keep the analysis as brief\n"
"as possible, ideally under 4 lines, but expand if necessary to include\n"
"all relevant context.\n"
"\n"
"IMPORTANT: Do not include the original stack trace or git diff in your\n"
"response. Your output must ONLY contain the analysis sections, starting\n"
"with the '## POTENTIAL ERROR' header.\n"
"\n"
"---\n"
"CONTEXT\n"
"---\n"
"\n"
"Stack Trace:\n"
"{stack_trace}\n"
"\n"
"Git Diff:\n"
"{git_diff}\n"
)
# prompt 3 - get files for gemini analysis
_PROMPT_VERBOSITY_3_ASK_FOR_FILES = (
"\nYou are an automated debugging assistant. Analyze the provided stack\n"
"trace, git diff, and current working directory (PWD).\n"
"Your task is to identify which files, if any, you need to read to provide\n"
"a complete diagnosis.\n"
"Respond ONLY with a valid JSON array of strings, where each string is a\n"
"file path relative to the PWD.\n"
'Example: ["src/foo/bar.cc", "BUILD.gn"]\n'
"If you do not need to read any files, respond with an empty array: [].\n"
"\n"
"PWD: {pwd}\n"
"---\n"
"STACK TRACE:\n"
"{stack_trace}\n"
"---\n"
"GIT DIFF:\n"
"{git_diff}\n"
"---\n"
)
# Prompt 3 (Perform): Get analysis for verbosity 3 (log + diff + files).
_PROMPT_ANALYZE_FAILURE_V3_PERFORM = (
"\nYou are an expert Fuchsia OS developer's assistant. Your task is to\n"
"analyze a test failure and provide a concise, standardized debugging\n"
"report. Format your entire response for a plain text terminal. Do not use\n"
"any markdown.\n"
"\n"
"Your response MUST strictly follow this format, using these exact headers:\n"
"\n"
"## ROOT CAUSE ANALYSIS\n"
"Analyze the stack trace, git diff, and any provided file contents to\n"
"determine the most likely root cause. Clearly state the error type (e.g.,\n"
"Null Pointer Exception, Race Condition, Assertion Failure) and explain the\n"
"logic that leads to the failure. Keep the analysis as brief as possible,\n"
"ideally under 4 lines, but expand if necessary to include all relevant\n"
"context.\n"
"\n"
"## SUGGESTED FIX\n"
"Provide a concise, best-practice code suggestion to resolve the issue.\n"
"If the fix is uncertain, suggest the most logical next step for debugging\n"
"(e.g., \"Add a log statement to check the value of 'my_var' before the\n"
"call to 'do_thing()'\"). When providing a code suggestion, include the\n"
"relevant code snippet with comments explaining the proposed change.\n"
"\n"
"IMPORTANT: Do not include the original stack trace, git diff, or file\n"
"contents in your response. Your output must ONLY contain the analysis\n"
"sections, starting with the '## ROOT CAUSE ANALYSIS' header.\n"
"\n"
"---\n"
"CONTEXT\n"
"---\n"
"\n"
"Stack Trace:\n"
"{stack_trace}\n"
"\n"
"Git Diff:\n"
"{diff_section}\n"
"\n"
"File Contents:\n"
"{file_contents_context}\n"
)
@dataclass
class GeminiAnalysisResult:
"""A structured result from a call to the Gemini API."""
text: str
error: bool = False
def _blocking_gemini_call(
api_key: str, gemini_model: str, data: bytes
) -> GeminiAnalysisResult:
"""synchronous function to make the web request."""
logging.info("Making Gemini API call with data: %s", data.decode("utf-8"))
url = f"https://generativelanguage.googleapis.com/v1beta/models/{gemini_model}:generateContent?key={api_key}"
headers = {"Content-Type": "application/json"}
req = request.Request(url, data=data, headers=headers, method="POST")
try:
with request.urlopen(req, timeout=45) as response:
if response.status < 200 or response.status >= 300:
error_result = GeminiAnalysisResult(
f"API Error: {response.status} {response.reason}",
error=True,
)
logging.error("Gemini API call failed: %s", error_result.text)
return error_result
response_body = response.read().decode("utf-8")
logging.info("Gemini API response: %s", response_body)
response_json = json.loads(response_body)
candidates = response_json.get("candidates", [])
if candidates:
content = candidates[0].get("content", {})
parts = content.get("parts", [])
if parts:
success_result = GeminiAnalysisResult(
parts[0].get(
"text",
"Error: Could not extract text from response.",
)
)
logging.info(
"Successfully extracted text from Gemini response: %s",
success_result.text,
)
return success_result
error_result = GeminiAnalysisResult(
"Error: Unexpected response format.", error=True
)
logging.error("Failed to parse Gemini response: %s", response_body)
return error_result
except Exception as e:
error_result = GeminiAnalysisResult(
f"Error during API call: {e}", error=True
)
logging.error("Exception during Gemini API call: %s", e)
return error_result
def _call_gemini_with_prompt(
api_key: str, gemini_model: str, prompt: str, **kwargs: str
) -> GeminiAnalysisResult:
"""Formats a prompt, creates a payload, and calls the Gemini API."""
formatted_prompt = prompt.format(**kwargs)
payload = {"contents": [{"parts": [{"text": formatted_prompt}]}]}
data = json.dumps(payload).encode("utf-8")
return _blocking_gemini_call(api_key, gemini_model, data)
def _read_files_for_gemini(requested_files: list[str]) -> str:
if not requested_files:
return ""
file_contents = {}
logging.info("Reading files for Gemini: %s", requested_files)
for file_path in requested_files:
if not os.path.isfile(file_path):
print(
f"Gemini analysis warning: Path '{file_path}' is not a valid file.",
file=sys.stderr,
)
logging.warning(
"Warning: Path '%s' is not a valid file.", file_path
)
continue
try:
# Skip files larger than 1MB.
if os.path.getsize(file_path) > 1_000_000:
print(
f"Gemini analysis warning: File '{file_path}' is too large (>1MB), skipping.",
file=sys.stderr,
)
logging.warning(
"Warning: File '%s' is too large (>1MB), skipping.",
file_path,
)
continue
# Read in binary mode to check for null bytes (binary indicator). This is a heuristic and may not always be accurate.
with open(file_path, "rb") as f:
raw_content = f.read()
if b"\0" in raw_content:
print(
f"Gemini analysis warning: File '{file_path}' appears to be binary, skipping.",
file=sys.stderr,
)
logging.warning(
"Warning: File '%s' appears to be binary, skipping.",
file_path,
)
continue
# Decode as UTF-8 if not binary.
content = raw_content.decode("utf-8", errors="replace")
file_contents[file_path] = content
logging.info("Content of %s:\n%s", file_path, content)
except Exception as e:
print(
f"Gemini analysis warning: Could not read file at '{file_path}': {e}, skipping.",
file=sys.stderr,
)
logging.warning(
"Warning: Could not read file at '%s': %s, skipping.",
file_path,
e,
)
context_block = "\nADDITIONAL FILE CONTEXT:\n---\n"
for path, content in file_contents.items():
context_block += f"Content of file '{path}':\n{content}\n---\n"
return context_block
def get_annotated_log(
api_key: str,
gemini_model: str,
stack_trace: str,
) -> GeminiAnalysisResult:
"""Calls Gemini to get only the key lines for annotation."""
return _call_gemini_with_prompt(
api_key,
gemini_model,
_PROMPT_GET_KEY_LINES,
stack_trace=stack_trace,
)
def get_failure_analysis(
api_key: str,
gemini_model: str,
stack_trace: str,
git_diff: str | None,
pwd: str,
verbosity: int,
) -> GeminiAnalysisResult:
"""Gets the full failure analysis report (for v2 or v3)."""
if verbosity == 2:
return _call_gemini_with_prompt(
api_key,
gemini_model,
_PROMPT_ANALYZE_FAILURE_V2,
stack_trace=stack_trace,
git_diff=git_diff or "No local changes.",
)
# Verbosity level 3
file_list_result = _call_gemini_with_prompt(
api_key,
gemini_model,
_PROMPT_VERBOSITY_3_ASK_FOR_FILES,
pwd=pwd,
stack_trace=stack_trace,
git_diff=git_diff or "No local changes.",
)
if file_list_result.error:
return file_list_result
requested_files = []
try:
cleaned_response = (
file_list_result.text.strip()
.removeprefix("```json")
.removesuffix("```")
.strip()
)
parsed_json = json.loads(cleaned_response)
if isinstance(parsed_json, list):
requested_files = [str(item) for item in parsed_json]
except (json.JSONDecodeError, TypeError):
print(
f"Could not parse file list from Gemini: {file_list_result.text}",
file=sys.stderr,
)
file_contents_context = _read_files_for_gemini(requested_files)
diff_section = (
f"Recent code changes that might be related (git diff HEAD):\n---\n{git_diff}\n---\n"
if git_diff
else ""
)
return _call_gemini_with_prompt(
api_key,
gemini_model,
_PROMPT_ANALYZE_FAILURE_V3_PERFORM,
stack_trace=stack_trace,
diff_section=diff_section,
file_contents_context=file_contents_context,
)
def _print_colorized_log(error_log: str, annotation_text: str) -> None:
"""Parses annotation JSON and prints the colorized log."""
COLOR_RED = "\033[91m" # red for primary
COLOR_YELLOW = "\033[93m" # yellow for secondary
COLOR_RESET = "\033[0m"
primary_line = ""
secondary_lines = set()
try:
# clean up text in case of markdown or other noise
cleaned_text = (
annotation_text.strip()
.removeprefix("```json")
.removesuffix("```")
.strip()
)
data = json.loads(cleaned_text)
# get primary line and strip for comparison
primary_line = data.get("primary_key_line", "").strip()
# get secondary lines and strip them for comparison
secondary_lines = set(
s.strip() for s in data.get("secondary_key_lines", []) if s.strip()
)
except (json.JSONDecodeError, TypeError):
logging.warning(
"Could not parse JSON from annotation, falling back to raw text: %s",
annotation_text,
)
# fallback: treat the whole response as the single primary line
primary_line = annotation_text.strip()
# print the original log, colorizing key lines
for line in error_log.splitlines():
stripped_line = line.strip()
if stripped_line and stripped_line == primary_line:
print(f"{COLOR_RED}{line}{COLOR_RESET}")
elif stripped_line and stripped_line in secondary_lines:
print(f"{COLOR_YELLOW}{line}{COLOR_RESET}")
else:
print(line)
# if no new line add one
if error_log and not error_log.endswith("\n"):
print()
def main() -> None:
parser = argparse.ArgumentParser(
description="Analyze test failures with Gemini."
)
parser.add_argument("--api-key", required=True, help="Gemini API key.")
parser.add_argument(
"--gemini-model",
default="gemini-2.5-flash-lite",
help="The Gemini model to use for the analysis.",
)
parser.add_argument(
"--verbosity",
type=int,
default=2,
choices=range(1, 4),
help="Verbosity level (1-3).",
)
args = parser.parse_args()
# set up logging to a persistent temporary file
log_file = None
try:
with tempfile.NamedTemporaryFile(
mode="w",
delete=False, # persist the file on exit
suffix=".log",
prefix="gemini_analysis_",
encoding="utf-8",
) as log_file_obj:
log_file = log_file_obj.name
logging.basicConfig(
filename=log_file,
level=logging.DEBUG,
format="%(asctime)s - %(levelname)s - %(message)s",
filemode="w", # overwrite on each run
force=True,
)
logging.info("Starting Gemini analysis with args: %s", args)
logging.info("Logging to: %s", log_file)
except Exception as e:
print(f"Error setting up logging: {e}", file=sys.stderr)
log_file = None # continue without logging if setup fails
# run git diff and capture the output
git_diff = None
if args.verbosity > 1:
try:
result = subprocess.run(
["git", "diff", "HEAD"],
capture_output=True,
text=True,
check=False,
)
if result.stdout:
git_diff = result.stdout
logging.info("Git diff:\n%s", git_diff)
if result.stderr:
print("--- git diff stderr ---", file=sys.stderr)
print(result.stderr, file=sys.stderr)
logging.warning("Git diff stderr:\n%s", result.stderr)
except FileNotFoundError:
print("git command not found, skipping git diff.", file=sys.stderr)
logging.warning("git command not found, skipping git diff.")
except Exception as e:
print(
f"An error occurred while running git diff: {e}",
file=sys.stderr,
)
logging.error("An error occurred while running git diff: %s", e)
# read stdin for the error log
error_log = sys.stdin.read()
logging.info("Error log from stdin:\n%s", error_log)
# get key lines stack trace first
annotation = get_annotated_log(
args.api_key,
args.gemini_model,
error_log,
)
if annotation.error:
# if error print error log
print(error_log, end="")
print(f"\nGemini annotation failed: {annotation.text}", file=sys.stderr)
if log_file:
print(f"See {log_file} for details.", file=sys.stderr)
sys.exit(1)
# print colorized log
_print_colorized_log(error_log, annotation.text)
logging.info("Annotation output:\n%s", annotation.text)
# get full analysis
if args.verbosity > 1:
# print the status message here, so user knows
print("\nRunning Gemini analysis...", file=sys.stderr)
pwd = os.getcwd()
analysis = get_failure_analysis(
args.api_key,
args.gemini_model,
error_log,
git_diff,
pwd,
args.verbosity,
)
if analysis.error:
print(f"Gemini analysis failed: {analysis.text}", file=sys.stderr)
if log_file:
print(f"See {log_file} for details.", file=sys.stderr)
sys.exit(1)
else:
print("--- Gemini Failure Analysis ---")
print(analysis.text)
logging.info("Final analysis output:\n%s", analysis.text)
else:
# verbosity 1 is done, no further analysis needed
logging.info("Analysis complete (annotation only).")
if __name__ == "__main__":
main()