mirror of
https://github.com/espressif/esp-idf
synced 2025-03-10 09:39:10 -04:00
At present, the diag tool uses its default purge file. However, users may find it beneficial to specify and reuse their own purge file. A new command line option, --purge, has been introduced to allow users to provide their own purge file to diag. When this option is used, the default purge file is ignored. Signed-off-by: Frantisek Hrbata <frantisek.hrbata@espressif.com>
1211 lines
44 KiB
Python
1211 lines
44 KiB
Python
# SPDX-FileCopyrightText: 2024-2025 Espressif Systems (Shanghai) CO LTD
|
|
# SPDX-License-Identifier: Apache-2.0
|
|
import atexit
|
|
import difflib
|
|
import os
|
|
import re
|
|
import shutil
|
|
import sys
|
|
import textwrap
|
|
import traceback
|
|
import uuid
|
|
import zipfile
|
|
from pathlib import Path
|
|
from string import Template
|
|
from subprocess import run
|
|
from tempfile import TemporaryDirectory
|
|
from typing import Any
|
|
from typing import Dict
|
|
from typing import List
|
|
from typing import Optional
|
|
from typing import Tuple
|
|
|
|
import click
|
|
import yaml
|
|
from idf_py_actions.tools import PropertyDict
|
|
from idf_py_actions.tools import red_print
|
|
from idf_py_actions.tools import yellow_print
|
|
|
|
|
|
# Logging levels and configurations.
|
|
LOG_FATAL = 1
|
|
LOG_ERROR = 2
|
|
LOG_WARNING = 3
|
|
LOG_INFO = 4
|
|
LOG_DEBUG = 5
|
|
LOG_LEVEL = LOG_INFO
|
|
LOG_COLORS = True
|
|
LOG_PREFIX = False
|
|
|
|
# A temporary directory is used to store the report. Once it is completely
|
|
# generated, it is moved to its final location.
|
|
TMP_DIR = TemporaryDirectory()
|
|
TMP_DIR_PATH = Path(TMP_DIR.name)
|
|
TMP_DIR_REPORT_PATH = TMP_DIR_PATH / 'report'
|
|
TMP_DIR_REPORT_REDACTED_PATH = TMP_DIR_PATH / 'redacted'
|
|
|
|
# The full debug log is stored in the report directory alongside other
|
|
# collected files.
|
|
LOG_FILE = None
|
|
LOG_FILE_PATH = TMP_DIR_REPORT_PATH / 'diag.log'
|
|
|
|
# Fixed path for the built-in recipes
|
|
BUILTIN_RECIPES_PATH = Path(__file__).parent / 'diag' / 'recipes'
|
|
|
|
|
|
def cleanup() -> None:
|
|
"""Perform cleanup operations in case of unexpected termination."""
|
|
try:
|
|
if LOG_FILE:
|
|
LOG_FILE.close()
|
|
shutil.rmtree(TMP_DIR_PATH)
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
atexit.register(cleanup)
|
|
|
|
|
|
def exception_tb() -> Optional[str]:
|
|
"""Return a string containing the message from the most recent exception,
|
|
along with its traceback, if available.
|
|
"""
|
|
ex_type, ex_value, ex_traceback = sys.exc_info()
|
|
in_exception = ex_type is not None
|
|
if not in_exception:
|
|
return None
|
|
ex_msg = f'exception {ex_type}:'
|
|
if str(ex_value):
|
|
ex_msg += f' {ex_value}'
|
|
tb = ''.join(traceback.format_tb(ex_traceback))
|
|
ex_msg += '\n' + tb.rstrip()
|
|
ex_msg = textwrap.indent(ex_msg, prefix='> ')
|
|
return ex_msg
|
|
|
|
|
|
def exception_msg() -> Optional[str]:
|
|
"""Return a string containing the message from the most recent exception,
|
|
if available.
|
|
"""
|
|
ex_type, ex_value, ex_traceback = sys.exc_info()
|
|
in_exception = ex_type is not None
|
|
if not in_exception or not str(ex_value):
|
|
return None
|
|
return str(ex_value)
|
|
|
|
|
|
def log(level: int, msg: str, prefix: str) -> None:
|
|
"""Logs a message with a specified severity level and prefix.
|
|
|
|
This function outputs log messages to standard error (stderr) based on the
|
|
provided severity level. All messages are also saved to a log file, which
|
|
is part of the diagnostic report. The log file entries include a severity
|
|
prefix but do not contain any color formatting.
|
|
|
|
Parameters:
|
|
level (int): The severity level of the log message.
|
|
msg (str): The message to be logged.
|
|
prefix (str): A character prefix to indicate the log severity.
|
|
|
|
Returns:
|
|
None
|
|
"""
|
|
global LOG_FILE
|
|
if LOG_PREFIX:
|
|
log_prefix = f'{prefix} '
|
|
else:
|
|
log_prefix = ''
|
|
|
|
if LOG_FILE:
|
|
try:
|
|
log_msg = textwrap.indent(msg, prefix=f'{prefix} ')
|
|
LOG_FILE.write(log_msg + '\n')
|
|
LOG_FILE.flush()
|
|
except Exception:
|
|
LOG_FILE.close()
|
|
LOG_FILE = None
|
|
err(f'Cannot write to log file "{LOG_FILE}". Logging to file is turned off.')
|
|
|
|
if level > LOG_LEVEL:
|
|
return
|
|
|
|
msg = textwrap.indent(msg, prefix=log_prefix)
|
|
|
|
if not LOG_COLORS or level not in (LOG_FATAL, LOG_ERROR, LOG_WARNING):
|
|
print(msg, file=sys.stderr)
|
|
sys.stderr.flush()
|
|
return
|
|
|
|
if level == LOG_ERROR or level == LOG_FATAL:
|
|
red_print(msg)
|
|
elif level == LOG_WARNING:
|
|
yellow_print(msg)
|
|
|
|
|
|
def die(msg: str) -> None:
|
|
"""Irrecoverable fatal error."""
|
|
fatal(msg)
|
|
die_msg = 'ESP-IDF diagnostic command failed.'
|
|
if LOG_LEVEL != LOG_DEBUG:
|
|
# If the log level for stderr is not set to debug, suggest it.
|
|
die_msg += f' Using the "-d/--debug" option may provide more information.'
|
|
# Avoid calling fatal, as it may print the exception again if present.
|
|
log(LOG_FATAL, die_msg, 'F')
|
|
sys.exit(128)
|
|
|
|
|
|
def fatal(msg: str) -> None:
|
|
"""A fatal message, along with the exception traceback logged for
|
|
debugging if available. Used by the die function."""
|
|
ex_msg = exception_msg()
|
|
if ex_msg:
|
|
msg += f': {ex_msg}'
|
|
log(LOG_FATAL, 'fatal: ' + msg, 'F')
|
|
ex_tb = exception_tb()
|
|
if ex_tb:
|
|
dbg(ex_tb)
|
|
|
|
|
|
def err(msg: str) -> None:
|
|
"""A error message, along with the exception traceback logged for
|
|
debugging if available."""
|
|
ex_msg = exception_msg()
|
|
if ex_msg:
|
|
msg += f': {ex_msg}'
|
|
log(LOG_ERROR, 'error: ' + msg, 'E')
|
|
ex_tb = exception_tb()
|
|
if ex_tb:
|
|
dbg(ex_tb)
|
|
|
|
|
|
def warn(msg: str) -> None:
|
|
"""A warning message, along with the exception traceback logged for
|
|
debugging if available."""
|
|
ex_msg = exception_msg()
|
|
if ex_msg:
|
|
msg += f': {ex_msg}'
|
|
log(LOG_WARNING, 'warning: ' + msg, 'W')
|
|
ex_tb = exception_tb()
|
|
if ex_tb:
|
|
dbg(ex_tb)
|
|
|
|
|
|
def info(msg: str) -> None:
|
|
log(LOG_INFO, msg, 'I')
|
|
|
|
|
|
def dbg(msg: str) -> None:
|
|
log(LOG_DEBUG, msg, 'D')
|
|
|
|
|
|
def set_logger(debug: bool,
|
|
prefix: bool,
|
|
file: bool,
|
|
colors: bool) -> None:
|
|
"""Configure the logging settings for the application.
|
|
|
|
Parameters:
|
|
debug (bool): If True, enables debug logging to standard error.
|
|
prefix (bool): If True, adds a one-character prefix to each log message to identify the log level.
|
|
file (bool): If True, specifies that all log messages should be stored in a file, regardless of the log level setting.
|
|
colors (bool): If True, enables the use of ANSI color codes in log messages.
|
|
|
|
Returns:
|
|
None
|
|
"""
|
|
global LOG_LEVEL
|
|
global LOG_FILE
|
|
global LOG_COLORS
|
|
global LOG_PREFIX
|
|
|
|
if not colors:
|
|
LOG_COLORS = False
|
|
|
|
if debug:
|
|
LOG_LEVEL = LOG_DEBUG
|
|
|
|
if prefix:
|
|
LOG_PREFIX = True
|
|
|
|
if file:
|
|
try:
|
|
LOG_FILE_PATH.parent.mkdir(parents=True, exist_ok=True)
|
|
LOG_FILE = open(LOG_FILE_PATH, 'w')
|
|
except Exception:
|
|
err(f'Cannot open log file "{LOG_FILE}". Log file will not be generated.')
|
|
|
|
|
|
def diff_dirs(dir1: Path, dir2: Path) -> None:
|
|
"""Show differences in files between two directories."""
|
|
dir1_root_path = Path(dir1).resolve()
|
|
dir2_root_path = Path(dir2).resolve()
|
|
dbg(f'diff "{dir1_root_path}" to "{dir2_root_path}"')
|
|
for dir1_file_path in dir1_root_path.rglob('*'):
|
|
if not dir1_file_path.is_file():
|
|
continue
|
|
dir2_file_path = dir2_root_path / dir1_file_path.relative_to(dir1_root_path)
|
|
|
|
with open(dir1_file_path, 'r') as f1, open(dir2_file_path, 'r') as f2:
|
|
diff = difflib.unified_diff(
|
|
f1.readlines(),
|
|
f2.readlines(),
|
|
fromfile=str(dir1_file_path.relative_to(dir1_root_path.parent)),
|
|
tofile=str(dir2_file_path.relative_to(dir2_root_path.parent)),
|
|
n=0
|
|
)
|
|
for line in diff:
|
|
dbg(line.strip())
|
|
|
|
|
|
def redact_files(dir1: Path, dir2: Path, purge: list) -> None:
|
|
"""Show differences in files between two directories."""
|
|
|
|
regexes: List = []
|
|
for entry in purge:
|
|
regex = re.compile(entry['regex'])
|
|
repl = entry['repl']
|
|
regexes.append((regex, repl))
|
|
|
|
dir1_root_path = Path(dir1).resolve()
|
|
dir2_root_path = Path(dir2).resolve()
|
|
dbg(f'redacting files in "{dir1_root_path}" into "{dir2_root_path}"')
|
|
for dir1_file_path in dir1_root_path.rglob('*'):
|
|
if not dir1_file_path.is_file():
|
|
continue
|
|
dir2_file_path = dir2_root_path / dir1_file_path.relative_to(dir1_root_path)
|
|
dir2_file_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
with open(dir1_file_path, 'r') as f1, open(dir2_file_path, 'w') as f2:
|
|
data = f1.read()
|
|
for regex, repl in regexes:
|
|
data = regex.sub(repl, data)
|
|
f2.write(data)
|
|
|
|
diff_dirs(dir1, dir2)
|
|
|
|
|
|
def validate_recipe(recipe: Dict) -> None:
|
|
"""Validate the loaded recipe file. This is done manually to avoid any
|
|
dependencies and to provide more informative error messages.
|
|
"""
|
|
recipe_keys = ['description', 'tags', 'output', 'steps']
|
|
step_keys = ['name', 'cmds', 'output']
|
|
recipe_description = recipe.get('description')
|
|
recipe_tags = recipe.get('tags')
|
|
recipe_output = recipe.get('output')
|
|
recipe_steps = recipe.get('steps')
|
|
|
|
for key in recipe:
|
|
if key not in recipe_keys:
|
|
raise RuntimeError(f'Unknown recipe key "{key}", expecting "{recipe_keys}"')
|
|
|
|
if not recipe_description:
|
|
raise RuntimeError(f'Recipe is missing "description" key')
|
|
|
|
if type(recipe_description) is not str:
|
|
raise RuntimeError(f'Recipe "description" key is not of type "str"')
|
|
|
|
if recipe_tags:
|
|
if type(recipe_tags) is not list:
|
|
raise RuntimeError(f'Recipe "tags" key is not of type "list"')
|
|
for tag in recipe_tags:
|
|
if type(tag) is not str:
|
|
raise RuntimeError(f'Recipe tag value "{tag}" is not of type "str"')
|
|
|
|
if recipe_output:
|
|
if type(recipe_output) is not str:
|
|
raise RuntimeError(f'Recipe "output" key is not of type "str"')
|
|
|
|
if not recipe_steps:
|
|
raise RuntimeError(f'Recipe is missing "steps" key')
|
|
|
|
if type(recipe_steps) is not list:
|
|
raise RuntimeError(f'Recipe "steps" key is not of type "list"')
|
|
|
|
for step in recipe_steps:
|
|
for key in step:
|
|
if key not in step_keys:
|
|
raise RuntimeError(f'Unknown recipe step key "{key}", expecting "{step_keys}"')
|
|
|
|
step_name = step.get('name')
|
|
step_output = step.get('output')
|
|
step_cmds = step.get('cmds')
|
|
|
|
if not step_name:
|
|
raise RuntimeError(f'Recipe step is missing "name" key')
|
|
if type(step_name) is not str:
|
|
raise RuntimeError(f'Recipe step "name" key is not of type "str"')
|
|
if not step_cmds:
|
|
raise RuntimeError(f'Recipe step is missing "cmds" key')
|
|
if type(step_cmds) is not list:
|
|
raise RuntimeError(f'Recipe step "cmds" key is not of type "list"')
|
|
if step_output:
|
|
if type(step_output) is not str:
|
|
raise RuntimeError(f'Step "output" key is not of type "str"')
|
|
|
|
for cmd in step_cmds:
|
|
if 'exec' in cmd:
|
|
cmd_exec_keys = ['exec', 'cmd', 'output', 'stderr', 'timeout', 'append']
|
|
|
|
exec_cmd = cmd.get('cmd')
|
|
output = cmd.get('output')
|
|
stderr = cmd.get('stderr')
|
|
timeout = cmd.get('timeout')
|
|
append = cmd.get('append')
|
|
|
|
for key in cmd:
|
|
if key not in cmd_exec_keys:
|
|
raise RuntimeError((f'Unknown "exec" command argument "{key}" in step "{step_name}", '
|
|
f'expecting "{cmd_exec_keys}"'))
|
|
|
|
# Required arguments
|
|
if not exec_cmd:
|
|
raise RuntimeError(f'Command "exec" in step "{step_name}" is missing "cmd" argument')
|
|
if type(exec_cmd) is list:
|
|
for arg in exec_cmd:
|
|
if type(arg) is not str:
|
|
raise RuntimeError((f'List entry "{arg}" in "cmd" argument for command "exec" '
|
|
f'in step "{step_name}" is not of type "str"'))
|
|
elif type(exec_cmd) is not str:
|
|
raise RuntimeError(f'Command "exec" in step "{step_name}" is not of type "list" or "str"')
|
|
|
|
# Optional arguments
|
|
if output and type(output) is not str:
|
|
raise RuntimeError(f'Argument "output" for command "exec" in step "{step_name}" is not of type "str"')
|
|
if stderr and type(stderr) is not str:
|
|
raise RuntimeError(f'Argument "stderr" for command "exec" in step "{step_name}" is not of type "str"')
|
|
if timeout and type(timeout) is not int:
|
|
raise RuntimeError(f'Argument "timeout" for command "exec" in step "{step_name}" is not of type "int"')
|
|
if append and type(append) is not bool:
|
|
raise RuntimeError(f'Argument "append" for command "exec" in step "{step_name}" is not of type "bool"')
|
|
|
|
elif 'file' in cmd:
|
|
cmd_file_keys = ['file', 'path', 'output']
|
|
|
|
path = cmd.get('path')
|
|
output = cmd.get('output')
|
|
|
|
for key in cmd:
|
|
if key not in cmd_file_keys:
|
|
raise RuntimeError((f'Unknown "file" command argument "{key}" in step "{step_name}", '
|
|
f'expecting "{cmd_file_keys}"'))
|
|
|
|
# Required arguments
|
|
if not path:
|
|
raise RuntimeError(f'Command "file" in step "{step_name}" is missing "path" argument')
|
|
if type(path) is not str:
|
|
raise RuntimeError(f'Argument "path" for command "file" in step "{step_name}" is not of type "str"')
|
|
|
|
# Optional arguments
|
|
if output and type(output) is not str:
|
|
raise RuntimeError(f'Argument "output" for command "file" in step "{step_name}" is not of type "str"')
|
|
|
|
elif 'env' in cmd:
|
|
cmd_env_keys = ['env', 'vars', 'regex', 'output', 'append']
|
|
|
|
variables = cmd.get('vars')
|
|
regex = cmd.get('regex')
|
|
output = cmd.get('output')
|
|
append = cmd.get('append')
|
|
|
|
for key in cmd:
|
|
if key not in cmd_env_keys:
|
|
raise RuntimeError((f'Unknown "env" command argument "{key}" in step "{step_name}", '
|
|
f'expecting "{cmd_env_keys}"'))
|
|
|
|
# Required arguments
|
|
if not variables and not regex:
|
|
raise RuntimeError(f'Command "env" in step "{step_name}" is missing both "vars" and "regex" arguments')
|
|
if variables:
|
|
if type(variables) is not list:
|
|
raise RuntimeError(f'Argument "vars" for command "env" in step "{step_name}" is not of type "list"')
|
|
for var in variables:
|
|
if type(var) is not str:
|
|
raise RuntimeError((f'List entry "{var}" in "vars" argument for command "env" '
|
|
f'in step "{step_name}" is not of type "str"'))
|
|
if regex:
|
|
if type(regex) is not str:
|
|
raise RuntimeError(f'Argument "regex" for command "env" in step "{step_name}" is not of type "str"')
|
|
try:
|
|
re.compile(regex)
|
|
except re.error as e:
|
|
raise RuntimeError((f'Argument "regex" for command "env" in step "{step_name}" is not '
|
|
f'a valid regular expression: {e}'))
|
|
|
|
# Optional arguments
|
|
if output and type(output) is not str:
|
|
raise RuntimeError(f'Argument "output" for command "env" in step "{step_name}" is not of type "str"')
|
|
if append and type(append) is not bool:
|
|
raise RuntimeError(f'Argument "append" for command "env" in step "{step_name}" is not of type "bool"')
|
|
|
|
elif 'glob' in cmd:
|
|
cmd_glob_keys = ['glob', 'pattern', 'path', 'regex', 'mtime', 'recursive', 'relative', 'output']
|
|
|
|
pattern = cmd.get('pattern')
|
|
path = cmd.get('path')
|
|
regex = cmd.get('regex')
|
|
mtime = cmd.get('mtime')
|
|
recursive = cmd.get('recursive')
|
|
relative = cmd.get('relative')
|
|
output = cmd.get('output')
|
|
|
|
for key in cmd:
|
|
if key not in cmd_glob_keys:
|
|
raise RuntimeError((f'Unknown "glob" command argument "{key}" in step "{step_name}", '
|
|
f'expecting "{cmd_glob_keys}"'))
|
|
# Required arguments
|
|
if not pattern:
|
|
raise RuntimeError(f'Command "glob" in step "{step_name}" is missing "pattern" argument')
|
|
if type(pattern) is not str:
|
|
raise RuntimeError(f'Argument "pattern" for command "glob" in step "{step_name}" is not of type "str"')
|
|
if not path:
|
|
raise RuntimeError(f'Command "glob" in step "{step_name}" is missing "path" argument')
|
|
if type(path) is not str:
|
|
raise RuntimeError(f'Argument "path" for command "glob" in step "{step_name}" is not of type "str"')
|
|
|
|
# Optional arguments
|
|
if regex:
|
|
if type(regex) is not str:
|
|
raise RuntimeError(f'Argument "regex" for command "glob" in step "{step_name}" is not of type "str"')
|
|
try:
|
|
re.compile(regex)
|
|
except re.error as e:
|
|
raise RuntimeError((f'Argument "regex" for command "glob" in step "{step_name}" is not '
|
|
f'a valid regular expression: {e}'))
|
|
if mtime and type(mtime) is not bool:
|
|
raise RuntimeError(f'Argument "mtime" for command "glob" in step "{step_name}" is not of type "bool"')
|
|
if recursive and type(recursive) is not bool:
|
|
raise RuntimeError(f'Argument "recursive" for command "glob" in step "{step_name}" is not of type "bool"')
|
|
if relative and type(relative) is not bool:
|
|
raise RuntimeError(f'Argument "relative" for command "glob" in step "{step_name}" is not of type "bool"')
|
|
if output and type(output) is not str:
|
|
raise RuntimeError(f'Argument "output" for command "glob" in step "{step_name}" is not of type "str"')
|
|
|
|
else:
|
|
raise RuntimeError(f'Unknown command "{cmd}" in step "{step_name}"')
|
|
|
|
|
|
def validate_purge(purge: Any) -> None:
|
|
"""Validate the loaded purge file. This is done manually to avoid any
|
|
dependencies and to provide more informative error messages.
|
|
"""
|
|
|
|
if type(purge) is not list:
|
|
raise RuntimeError(f'Purge is not of type "list"')
|
|
|
|
regex_keys = ['regex', 'repl']
|
|
|
|
for entry in purge:
|
|
if type(entry) is not dict:
|
|
raise RuntimeError(f'Purge entry "{entry}" is not of type "dict"')
|
|
|
|
if 'regex' in entry:
|
|
for key in entry:
|
|
if key not in regex_keys:
|
|
raise RuntimeError((f'Unknown purge key "{key}" in "{entry}", '
|
|
f'expecting "{regex_keys}"'))
|
|
|
|
regex = entry.get('regex')
|
|
repl = entry.get('repl')
|
|
|
|
# Required arguments
|
|
if type(regex) is not str:
|
|
raise RuntimeError(f'Argument "regex" for purge entry "{entry}" is not of type "str"')
|
|
try:
|
|
re.compile(regex)
|
|
except re.error as e:
|
|
raise RuntimeError((f'Argument "regex" for purge entry "{entry}" is not '
|
|
f'a valid regular expression: {e}'))
|
|
|
|
if not repl:
|
|
raise RuntimeError(f'Purge entry "{entry}" is missing "repl" argument')
|
|
if type(repl) is not str:
|
|
raise RuntimeError(f'Argument "repl" for purge entry "{entry}" is not of type "str"')
|
|
|
|
else:
|
|
raise RuntimeError(f'Unknown purge entry "{entry}"')
|
|
|
|
|
|
def get_output_path(src: Optional[str],
|
|
dst: Optional[str],
|
|
step: Dict,
|
|
recipe: Dict,
|
|
src_root: Optional[str] = None) -> Path:
|
|
"""Construct the output file path based on source, destination, and recipe output.
|
|
|
|
Parameters:
|
|
src (Optional[str]): The source file path. This can be None, for example,
|
|
when used in an exec command.
|
|
dst (Optional[str]): The destination file path or directory. If it ends with
|
|
a '/' character, it is considered a directory, and the
|
|
src file name is appended to it. Otherwise it is the
|
|
file where the output should be saved. This can also be
|
|
None, in which case the src file name is used as the
|
|
output file name.
|
|
step (Dict): The step this file belongs to, used to obtain the step'
|
|
global output directory.
|
|
recipe (Dict): The recipe this file belongs to, used to obtain the recipe's
|
|
global output directory.
|
|
src_root (Optional[str]): The src file directory, used to determine the
|
|
relative source file path for constructing the
|
|
relative destination path. For example, if src
|
|
is "/dir/dir2/dir3/file.txt" and src_root is
|
|
"/dir/" and dst is "/output/", the destination
|
|
file path will be "/output/dir2/dir3/file.txt".
|
|
|
|
Returns:
|
|
Path: The constructed output file path.
|
|
"""
|
|
dst_path = TMP_DIR_REPORT_PATH
|
|
# recipe global output directory
|
|
recipe_root = recipe.get('output')
|
|
# step global output directory
|
|
step_root = step.get('output')
|
|
|
|
if recipe_root:
|
|
dst_path = dst_path / recipe_root
|
|
|
|
if step_root:
|
|
dst_path = dst_path / step_root
|
|
|
|
if dst:
|
|
dst_path = dst_path / dst
|
|
if dst.endswith('/') and src:
|
|
if src_root:
|
|
src_rel_path = Path(src).relative_to(src_root)
|
|
dst_path = dst_path / src_rel_path
|
|
else:
|
|
dst_path = dst_path / Path(src).name
|
|
elif src:
|
|
dst_path = dst_path / Path(src).name
|
|
|
|
return dst_path
|
|
|
|
|
|
def cmd_file(args: Dict, step: Dict, recipe: Dict) -> None:
|
|
"""file command"""
|
|
src = args['path']
|
|
dst = args.get('output')
|
|
|
|
dst_path = get_output_path(src, dst, step, recipe)
|
|
|
|
try:
|
|
dst_path.parent.mkdir(parents=True, exist_ok=True)
|
|
shutil.copy(src, dst_path)
|
|
except Exception:
|
|
warn(f'Cannot copy file "{src}"')
|
|
|
|
|
|
def cmd_exec(args: Dict, step: Dict, recipe: Dict) -> None:
|
|
"""exec command"""
|
|
cmd = args['cmd']
|
|
stdout = args.get('output')
|
|
stderr = args.get('stderr')
|
|
timeout = args.get('timeout')
|
|
append = args.get('append', False)
|
|
|
|
stdout_path = get_output_path(None, stdout, step, recipe)
|
|
stderr_path = get_output_path(None, stderr, step, recipe)
|
|
|
|
# If cmd is a string, execute it using the shell.
|
|
if isinstance(cmd, list):
|
|
shell = False
|
|
else:
|
|
shell = True
|
|
|
|
try:
|
|
p = run(cmd, shell=shell, text=True, capture_output=True, timeout=timeout)
|
|
except Exception:
|
|
warn(f'Exec command "{cmd}" failed')
|
|
return
|
|
|
|
if p.returncode:
|
|
warn(f'Exec command "{cmd}" failed with exit code {p.returncode}')
|
|
if p.stderr:
|
|
dbg(f'stderr: "{p.stderr}"')
|
|
|
|
if stdout and p.stdout:
|
|
try:
|
|
stdout_path.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(stdout_path, 'a' if append else 'w') as f:
|
|
f.write(p.stdout)
|
|
except Exception:
|
|
warn(f'Cannot write exec command "{cmd}" stdout to "{stdout}"')
|
|
|
|
if stderr and p.stderr:
|
|
try:
|
|
stderr_path.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(stderr_path, 'a' if append else 'w') as f:
|
|
f.write(p.stderr)
|
|
except Exception:
|
|
warn(f'Cannot write exec command "{cmd}" stderr to "{stderr}"')
|
|
|
|
|
|
def cmd_env(args: Dict, step: Dict, recipe: Dict) -> None:
|
|
"""env command"""
|
|
variables = args.get('vars', [])
|
|
regex_str = args.get('regex')
|
|
output = args.get('output')
|
|
append = args.get('append', False)
|
|
regex = re.compile(regex_str) if regex_str else None
|
|
|
|
output_path = get_output_path(None, output, step, recipe)
|
|
found_list: List = []
|
|
out_list: List = []
|
|
|
|
for var, val in os.environ.items():
|
|
if var in variables:
|
|
found_list.append(var)
|
|
continue
|
|
|
|
if not regex:
|
|
continue
|
|
|
|
match = regex.match(var)
|
|
if match:
|
|
found_list.append(var)
|
|
|
|
for var in found_list:
|
|
val = os.environ[var]
|
|
out_list.append(f'{var}={val}')
|
|
|
|
if output:
|
|
try:
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(output_path, 'a' if append else 'w') as f:
|
|
f.write('\n'.join(out_list))
|
|
except Exception:
|
|
warn(f'Cannot write env command output to "{output}"')
|
|
|
|
|
|
def get_latest_modified_file(file_paths: List[Path]) -> Optional[Path]:
|
|
"""Return the most recently modified file from the file_paths list"""
|
|
file_path = None
|
|
file_mtime = 0.0
|
|
|
|
for file in file_paths:
|
|
mtime = file.stat().st_mtime
|
|
if mtime < file_mtime:
|
|
continue
|
|
file_mtime = mtime
|
|
file_path = file
|
|
|
|
return file_path
|
|
|
|
|
|
def cmd_glob(args: Dict, step: Dict, recipe: Dict) -> None:
|
|
"""glob command"""
|
|
pattern = args['pattern']
|
|
dir_path = Path(args['path'])
|
|
output = args.get('output')
|
|
mtime = args.get('mtime', False)
|
|
recursive = args.get('recursive', False)
|
|
relative = args.get('relative', False)
|
|
regex_str = args.get('regex')
|
|
|
|
try:
|
|
if recursive:
|
|
file_paths = list(dir_path.rglob(pattern))
|
|
else:
|
|
file_paths = list(dir_path.glob(pattern))
|
|
except Exception:
|
|
warn(f'Cannot glob "{pattern}" in "{dir_path}"')
|
|
return
|
|
|
|
file_paths = [file_path for file_path in file_paths if file_path.is_file()]
|
|
if not file_paths:
|
|
warn(f'No files matching glob "{pattern}" found in "{dir_path}"')
|
|
return
|
|
|
|
if regex_str:
|
|
file_paths_match = []
|
|
regex = re.compile(regex_str, flags=re.MULTILINE)
|
|
for file_path in file_paths:
|
|
try:
|
|
with open(file_path, 'r') as f:
|
|
data = f.read()
|
|
match = regex.search(data)
|
|
if match:
|
|
file_paths_match.append(file_path)
|
|
except Exception:
|
|
err(f'Failed to search for the regex "{regex_str}" in "{file_path}"')
|
|
|
|
if not file_paths_match:
|
|
warn(f'No files with content matching regex "{regex_str}" found in "{dir_path}"')
|
|
return
|
|
file_paths = file_paths_match
|
|
|
|
if mtime:
|
|
last_modified_file = get_latest_modified_file(file_paths)
|
|
if not last_modified_file:
|
|
err(f'No last modified file found for "{pattern}" found in "{dir_path}"')
|
|
return
|
|
file_paths = [last_modified_file]
|
|
|
|
for file_path in file_paths:
|
|
# If the relative flag is enabled, save the file in the output directory while
|
|
# maintaining the same relative path as in the source directory.
|
|
dst_path = get_output_path(str(file_path), output, step, recipe, str(dir_path) if relative else None)
|
|
try:
|
|
dst_path.parent.mkdir(parents=True, exist_ok=True)
|
|
if dst_path.is_file():
|
|
# A file already exists in the report directory. Attempt to
|
|
# create a new name by appending numerical suffixes.
|
|
cnt = 1
|
|
while True:
|
|
new_dst_path = dst_path.with_name(dst_path.name + f'.{cnt}')
|
|
if not new_dst_path.exists():
|
|
warn(f'File "{dst_path.name}" for "{file_path}" already exists. Using "{new_dst_path.name}"')
|
|
dst_path = new_dst_path
|
|
break
|
|
cnt += 1
|
|
dbg(f'copy "{file_path}" to "{dst_path}"')
|
|
shutil.copy(file_path, dst_path)
|
|
except Exception:
|
|
warn(f'Cannot copy glob file "{file_path}"')
|
|
|
|
|
|
def process_recipe(recipe: Dict) -> None:
|
|
"""execute commands for every stage in a recipe"""
|
|
for step in recipe['steps']:
|
|
step_name = step['name']
|
|
dbg(f'Processing step "{step_name}"')
|
|
print(f'* {step_name}')
|
|
for cmd in step['cmds']:
|
|
dbg(f'cmd: "{cmd}"')
|
|
if 'file' in cmd:
|
|
cmd_file(cmd, step, recipe)
|
|
elif 'exec' in cmd:
|
|
cmd_exec(cmd, step, recipe)
|
|
elif 'env' in cmd:
|
|
cmd_env(cmd, step, recipe)
|
|
elif 'glob' in cmd:
|
|
cmd_glob(cmd, step, recipe)
|
|
else:
|
|
err(f'Unknow command "{cmd}" in step "{step_name}"')
|
|
|
|
|
|
def get_recipes(cmdl_recipes: Optional[Tuple],
|
|
cmdl_tags: Optional[Tuple],
|
|
append: bool,
|
|
variables: Dict) -> Dict:
|
|
"""Load and return a dictionary of recipes.
|
|
|
|
This function retrieves recipes based on the provided command line inputs
|
|
and filters them using specified tags. It can also append additional
|
|
recipes to a set of built-in recipes.
|
|
|
|
Args:
|
|
cmdl_recipes (Optional[Tuple]): A tuple containing recipe names for
|
|
built-in recipes or paths to
|
|
user-provided recipes.
|
|
cmdl_tags (Optional[Tuple]): A tuple of tags used to filter the loaded
|
|
recipes.
|
|
append (bool): A flag indicating whether to append the `cmdl_recipes`
|
|
to the built-in recipes. If False, only the recipes
|
|
specified in `cmdl_recipes` and filtered by `cmdl_tags`
|
|
are used. Used to allow run additional recipes along
|
|
with the built-in ones.
|
|
variables (Dict): A dictionary of variables to be replaced within the
|
|
recipes.
|
|
|
|
Returns:
|
|
Dict: A dictionary where each key is a recipe filename and each value
|
|
is a dictionary representing the recipe.
|
|
"""
|
|
|
|
builtin_recipe_files: Dict = {}
|
|
recipe_files: List = []
|
|
recipes: Dict = {}
|
|
|
|
for recipe_path in BUILTIN_RECIPES_PATH.glob('*.yml'):
|
|
builtin_recipe_files[recipe_path.stem] = str(recipe_path.resolve())
|
|
dbg(f'Builtin recipes "{builtin_recipe_files}"')
|
|
|
|
if cmdl_recipes:
|
|
for recipe_file in cmdl_recipes:
|
|
recipe_path = Path(recipe_file).resolve()
|
|
if recipe_path.is_file():
|
|
recipe_files.append(str(recipe_path))
|
|
continue
|
|
|
|
if recipe_file in builtin_recipe_files:
|
|
recipe_files.append(builtin_recipe_files[recipe_file])
|
|
continue
|
|
|
|
die(f'Cannot find recipe "{recipe_file}"')
|
|
|
|
if append:
|
|
recipe_files += list(builtin_recipe_files.values())
|
|
else:
|
|
recipe_files += list(builtin_recipe_files.values())
|
|
|
|
recipe_files = list(set(recipe_files))
|
|
recipe_files.sort()
|
|
dbg(f'Recipe files to use "{recipe_files}"')
|
|
|
|
# Load recipes
|
|
for recipe_file in recipe_files:
|
|
dbg(f'Loading recipe file "{recipe_file}"')
|
|
try:
|
|
with open(recipe_file, 'r') as f:
|
|
data = f.read()
|
|
formatted = Template(data).safe_substitute(**variables)
|
|
recipe = yaml.safe_load(formatted)
|
|
recipes[recipe_file] = recipe
|
|
except Exception:
|
|
die(f'Cannot load diagnostic recipe "{recipe_file}"')
|
|
|
|
if cmdl_tags:
|
|
dbg('Filtering recipe file with tags "{}"'.format(', '.join(cmdl_tags)))
|
|
recipes_tagged: Dict = {}
|
|
for recipe_file, recipe in recipes.items():
|
|
recipe_tags = recipe.get('tags')
|
|
|
|
if not recipe_tags:
|
|
continue
|
|
|
|
for cmdl_tag in cmdl_tags:
|
|
if cmdl_tag in recipe_tags:
|
|
recipes_tagged[recipe_file] = recipe
|
|
break
|
|
|
|
recipes = recipes_tagged
|
|
|
|
if not recipes:
|
|
die(f'No recipes available')
|
|
|
|
return recipes
|
|
|
|
|
|
def act_list_recipes(cmdl_recipes: Optional[Tuple],
|
|
cmdl_tags: Optional[Tuple],
|
|
append: bool,
|
|
variables: Dict) -> None:
|
|
|
|
"""Display a list of available recipes along with their details"""
|
|
try:
|
|
recipes = get_recipes(cmdl_recipes, cmdl_tags, append, variables)
|
|
except Exception:
|
|
die(f'Unable to create list of recipe files')
|
|
|
|
for recipe_file, recipe in recipes.items():
|
|
builtin = BUILTIN_RECIPES_PATH == Path(recipe_file).parent
|
|
|
|
try:
|
|
validate_recipe(recipe)
|
|
valid = True
|
|
except Exception:
|
|
valid = False
|
|
|
|
print(recipe_file)
|
|
print(' description: {}'.format(recipe.get('description', '')))
|
|
print(' short name: {}'.format(Path(recipe_file).stem if builtin else ''))
|
|
print(' valid: {}'.format(valid))
|
|
print(' builtin: {}'.format(builtin))
|
|
print(' tags: {}'.format(', '.join(recipe.get('tags', ''))))
|
|
|
|
|
|
def act_check_recipes(cmdl_recipes: Optional[Tuple],
|
|
cmdl_tags: Optional[Tuple],
|
|
append: bool,
|
|
variables: Dict) -> None:
|
|
"""Verify recipes"""
|
|
try:
|
|
recipes = get_recipes(cmdl_recipes, cmdl_tags, append, variables)
|
|
except Exception:
|
|
die(f'Unable to create list of recipe files')
|
|
|
|
error = False
|
|
for recipe_file, recipe in recipes.items():
|
|
print(f'Checking recipe "{recipe_file}"')
|
|
try:
|
|
validate_recipe(recipe)
|
|
print('Recipe is valid')
|
|
except Exception:
|
|
err('validation failed')
|
|
print('Recipe is invalid.')
|
|
error = True
|
|
|
|
if error:
|
|
die('Recipes validation failed')
|
|
|
|
|
|
def act_zip(directory: str, output: Optional[str], force: bool) -> None:
|
|
"""Compress the report directory into a zip file"""
|
|
archive_dir_path = Path(directory).expanduser()
|
|
archive_path = Path(output or directory).with_suffix('.zip').expanduser()
|
|
|
|
info(f'Creating archive "{archive_path}"')
|
|
|
|
if not archive_dir_path.exists() or not archive_dir_path.is_dir():
|
|
die(f'The path "{archive_dir_path}" either does not exist or is not a directory.')
|
|
|
|
if archive_path.exists():
|
|
if not archive_path.is_file():
|
|
die((f'Directory entry "{archive_path}" already exists and is not a regular file. '
|
|
f'Please use the --output option or remove "{archive_path}" manually.'))
|
|
if not force:
|
|
die((f'Archive file "{archive_path}" already exists. '
|
|
f'Please use the --output option or --force option to overwrite the existing '
|
|
f'"{archive_path}" archive.'))
|
|
try:
|
|
with zipfile.ZipFile(archive_path, 'w', zipfile.ZIP_DEFLATED) as f:
|
|
for file in archive_dir_path.rglob('*'):
|
|
print(f'adding: {file}')
|
|
f.write(file, file.relative_to(archive_dir_path.parent))
|
|
except Exception:
|
|
die(f'Cannot create zip archive for "{directory}" directory.')
|
|
|
|
info(f'The archive "{archive_path}" is prepared and can be included with your issue report.')
|
|
|
|
|
|
def create(action: str,
|
|
ctx: click.core.Context,
|
|
args: PropertyDict,
|
|
debug: bool,
|
|
log_prefix: bool,
|
|
force: bool,
|
|
no_color: bool,
|
|
zip_directory: Optional[str],
|
|
list_recipes: bool,
|
|
check_recipes: bool,
|
|
cmdl_recipes: Tuple,
|
|
cmdl_tags: Tuple,
|
|
purge_file: str,
|
|
append: bool,
|
|
output: Optional[str]) -> None:
|
|
|
|
if list_recipes or check_recipes or zip_directory:
|
|
# Generate a log file only when the report is produced.
|
|
log_to_file = False
|
|
else:
|
|
log_to_file = True
|
|
|
|
set_logger(debug=debug,
|
|
prefix=log_prefix,
|
|
file=log_to_file,
|
|
colors=not no_color)
|
|
|
|
if zip_directory:
|
|
# Archive recipes command
|
|
act_zip(zip_directory, output, force)
|
|
return
|
|
|
|
project_dir = args.project_dir
|
|
|
|
# Set up variables that can be utilized in the recipe.
|
|
recipe_variables: Dict = {}
|
|
|
|
if project_dir:
|
|
recipe_variables['PROJECT_DIR'] = project_dir
|
|
else:
|
|
warn(f'Project directory is not set')
|
|
|
|
build_dir = args.build_dir
|
|
if build_dir:
|
|
recipe_variables['BUILD_DIR'] = build_dir
|
|
else:
|
|
warn(f'Build directory is not set')
|
|
|
|
recipe_variables['IDF_PATH'] = os.environ['IDF_PATH']
|
|
recipe_variables['REPORT_DIR'] = str(TMP_DIR_REPORT_PATH)
|
|
|
|
dbg(f'Recipe variables: {recipe_variables}')
|
|
dbg(f'Project directory: {project_dir}')
|
|
dbg(f'Build directory: {build_dir}')
|
|
|
|
if list_recipes:
|
|
# List recipes command
|
|
act_list_recipes(cmdl_recipes, cmdl_tags, append, recipe_variables)
|
|
return
|
|
|
|
if check_recipes:
|
|
# Validate recipes command
|
|
act_check_recipes(cmdl_recipes, cmdl_tags, append, recipe_variables)
|
|
return
|
|
|
|
recipes: Dict = {}
|
|
|
|
if not output:
|
|
output_dir_path = Path('idf-diag-{}'.format(uuid.uuid4())).expanduser()
|
|
else:
|
|
output_dir_path = Path(output).expanduser()
|
|
|
|
info(f'Creating report in "{output_dir_path}" directory.')
|
|
|
|
# Check output directory
|
|
dbg(f'Using "{output_dir_path}" as report directory')
|
|
|
|
try:
|
|
output_dir_path_exists = output_dir_path.exists()
|
|
except Exception:
|
|
die(f'Cannot get report directory "{output_dir_path}" status')
|
|
|
|
if output_dir_path_exists:
|
|
if not output_dir_path.is_dir():
|
|
die((f'Directory entry "{output_dir_path}" already exists and is not a directory. '
|
|
f'Please select a directory that does not exist or remove "{output_dir_path}" '
|
|
f'manually.'))
|
|
if not force:
|
|
die((f'Report directory "{output_dir_path}" already exists. '
|
|
f'Please select a directory that does not exist or use the "-f/--force" '
|
|
f'option to delete the existing "{output_dir_path}" directory.'))
|
|
try:
|
|
dbg(f'Removing existing report "{output_dir_path}" directory')
|
|
shutil.rmtree(output_dir_path)
|
|
except Exception:
|
|
die(f'Cannot remove existing "{output_dir_path}" directory')
|
|
|
|
# Get recipe files
|
|
try:
|
|
recipes = get_recipes(cmdl_recipes, cmdl_tags, append, recipe_variables)
|
|
except Exception:
|
|
die(f'Unable to create list of recipe files')
|
|
|
|
# Validate recipes
|
|
try:
|
|
for recipe_file, recipe in recipes.items():
|
|
dbg(f'Validating recipe file "{recipe_file}"')
|
|
validate_recipe(recipe)
|
|
except Exception:
|
|
die(f'File "{recipe_file}" is not a valid diagnostic file')
|
|
|
|
# Load purge file
|
|
dbg(f'Purge file: {purge_file}')
|
|
try:
|
|
with open(purge_file, 'r') as f:
|
|
purge = yaml.safe_load(f.read())
|
|
except Exception:
|
|
die(f'Cannot load purge file "{purge_file}"')
|
|
|
|
# Validate purge file
|
|
try:
|
|
validate_purge(purge)
|
|
except Exception:
|
|
die(f'File "{purge_file}" is not a valid purge file')
|
|
|
|
# Cook recipes
|
|
try:
|
|
for recipe_file, recipe in recipes.items():
|
|
desc = recipe.get('description')
|
|
dbg(f'Processing recipe "{desc}" file "{recipe_file}"')
|
|
print(f'{desc}')
|
|
process_recipe(recipe)
|
|
except Exception:
|
|
die(f'Cannot process diagnostic file "{recipe_file}"')
|
|
|
|
dbg(f'Report is done.')
|
|
|
|
global LOG_FILE
|
|
if LOG_FILE:
|
|
LOG_FILE.close()
|
|
LOG_FILE = None
|
|
|
|
try:
|
|
redact_files(TMP_DIR_REPORT_PATH, TMP_DIR_REPORT_REDACTED_PATH, purge)
|
|
except Exception:
|
|
err(f'The redaction was unsuccessful')
|
|
|
|
try:
|
|
shutil.move(TMP_DIR_REPORT_REDACTED_PATH, output_dir_path)
|
|
except Exception:
|
|
die(f'Cannot move diagnostic report directory from "{TMP_DIR_REPORT_REDACTED_PATH}" to "{output_dir_path}"')
|
|
|
|
info((f'The report has been created in the "{output_dir_path}" directory. '
|
|
f'Please make sure to thoroughly check it for any sensitive information '
|
|
f'before sharing and remove files you do not want to share. Kindly include '
|
|
f'any additional files you find relevant that were not automatically added. '
|
|
f'Please archive the contents of the final report directory using the command:\n'
|
|
f'"idf.py diag --zip {output_dir_path}".'))
|
|
|
|
|
|
def action_extensions(base_actions: Dict, project_path: str) -> Any:
|
|
return {
|
|
'actions': {
|
|
'diag': {
|
|
'callback': create,
|
|
'help': 'Create diagnostic report.',
|
|
'options': [
|
|
{
|
|
'names': ['-d', '--debug'],
|
|
'is_flag': True,
|
|
'help': 'Print debug information, including exception tracebacks.',
|
|
},
|
|
{
|
|
'names': ['--no-color'],
|
|
'is_flag': True,
|
|
'help': 'Do not emit ANSI color codes.',
|
|
},
|
|
{
|
|
'names': ['--log-prefix'],
|
|
'is_flag': True,
|
|
'help': 'Add a severity character at the beginning of log messages.',
|
|
},
|
|
{
|
|
'names': ['-z', '--zip', 'zip_directory'],
|
|
'metavar': 'PATH',
|
|
'help': 'Create zip archive for diagnostic report in PATH.',
|
|
},
|
|
{
|
|
'names': ['-l', '--list', 'list_recipes'],
|
|
'is_flag': True,
|
|
'help': 'Show information about available recipes.',
|
|
},
|
|
{
|
|
'names': ['-c', '--check', 'check_recipes'],
|
|
'is_flag': True,
|
|
'help': 'Validate recipes.',
|
|
},
|
|
{
|
|
'names': ['-r', '--recipe', 'cmdl_recipes'],
|
|
'multiple': True,
|
|
'metavar': 'RECIPE',
|
|
'type': str,
|
|
'help': ('Recipe to use. This option can be specified multiple times. '
|
|
'By default, all built-in recipes are used. RECIPE refers to '
|
|
'the recipe file path or the file name stem for built-in recipes.'),
|
|
},
|
|
{
|
|
'names': ['-t', '--tag', 'cmdl_tags'],
|
|
'multiple': True,
|
|
'metavar': 'TAG',
|
|
'type': str,
|
|
'help': ('Consider only recipes containing TAG. This option can be specified '
|
|
'multiple times. By default, all recipes are used. Use -l/--list-recipes '
|
|
'option to see recipe TAG information.'),
|
|
},
|
|
{
|
|
'names': ['-a', '--append'],
|
|
'is_flag': True,
|
|
'help': ('Use recipes specified with the -r/--recipe option in '
|
|
'combination with the built-in recipes.'),
|
|
},
|
|
{
|
|
'names': ['-f', '--force'],
|
|
'is_flag': True,
|
|
'help': 'Delete the target file or directory if it already exists before creating it.',
|
|
},
|
|
{
|
|
'names': ['-o', '--output'],
|
|
'metavar': 'PATH',
|
|
'type': str,
|
|
'help': ('Diagnostic report directory PATH or zip file archive PATH. '
|
|
'If not specified, the report-UUID is used as the report directory, '
|
|
'and the report directory specified with the --zip option with a zip '
|
|
'extension is used for the zip file archive.')
|
|
},
|
|
{
|
|
'names': ['-p', '--purge', 'purge_file'],
|
|
'metavar': 'PATH',
|
|
'type': str,
|
|
'default': str(Path(__file__).parent / 'diag' / 'purge' / 'purge.yml'),
|
|
'help': ('Purge file PATH containing a description of what information '
|
|
'should be redacted from the resulting report. '
|
|
'Default is "tools/idf_py_actions/diag/purge/purge.yml"')
|
|
},
|
|
],
|
|
},
|
|
},
|
|
}
|