feat(tools): Enforce utf-8 encoding with open() function

This commit is contained in:
Marek Fiala 2024-07-23 15:59:09 +02:00 committed by BOT
parent 305f1c1e5b
commit 2c814ef2fa
40 changed files with 115 additions and 124 deletions

View File

@ -126,13 +126,13 @@ class FuseTable(list):
field_name = p.field_name + p.group field_name = p.field_name + p.group
if field_name != '' and len(duplicates.intersection([field_name])) != 0: if field_name != '' and len(duplicates.intersection([field_name])) != 0:
fl_error = True fl_error = True
print('Field at %s, %s, %s, %s have dublicate field_name' % print('Field at %s, %s, %s, %s have duplicate field_name' %
(p.field_name, p.efuse_block, p.bit_start, p.bit_count)) (p.field_name, p.efuse_block, p.bit_start, p.bit_count))
if fl_error is True: if fl_error is True:
raise InputError('Field names must be unique') raise InputError('Field names must be unique')
def check_struct_field_name(self): def check_struct_field_name(self):
# check that stuctured fields have a root field # check that structured fields have a root field
for p in self: for p in self:
if '.' in p.field_name: if '.' in p.field_name:
name = '' name = ''
@ -454,7 +454,7 @@ def process_input_file(file, type_table):
def ckeck_md5_in_file(md5, filename): def ckeck_md5_in_file(md5, filename):
if os.path.exists(filename): if os.path.exists(filename):
with open(filename, 'r') as f: with open(filename, 'r', encoding='utf-8') as f:
for line in f: for line in f:
if md5 in line: if md5 in line:
return True return True
@ -478,12 +478,12 @@ def create_output_files(name, output_table, debug):
if ckeck_md5_in_file(output_table.md5_digest_table, file_c_path) is False: if ckeck_md5_in_file(output_table.md5_digest_table, file_c_path) is False:
status('Creating efuse *.h file ' + file_h_path + ' ...') status('Creating efuse *.h file ' + file_h_path + ' ...')
output = output_table.to_header(file_name) output = output_table.to_header(file_name)
with open(file_h_path, 'w') as f: with open(file_h_path, 'w', encoding='utf-8') as f:
f.write(output) f.write(output)
status('Creating efuse *.c file ' + file_c_path + ' ...') status('Creating efuse *.c file ' + file_c_path + ' ...')
output = output_table.to_c_file(file_name, debug) output = output_table.to_c_file(file_name, debug)
with open(file_c_path, 'w') as f: with open(file_c_path, 'w', encoding='utf-8') as f:
f.write(output) f.write(output)
else: else:
print('Source files do not require updating correspond to csv file.') print('Source files do not require updating correspond to csv file.')

View File

@ -69,7 +69,7 @@ def generate_tests_cases(target): # type: (str) -> None
messages = [random.randrange(0, 1 << max_key_size) for x in range(NUM_MESSAGES)] messages = [random.randrange(0, 1 << max_key_size) for x in range(NUM_MESSAGES)]
with open('digital_signature_test_cases.h', 'w') as f: with open('digital_signature_test_cases.h', 'w', encoding='utf-8') as f:
f.write('/*\n') f.write('/*\n')
year = datetime.datetime.now().year year = datetime.datetime.now().year
f.write(' * SPDX-FileCopyrightText: {year} Espressif Systems (Shanghai) CO LTD\n'.format(year=year)) f.write(' * SPDX-FileCopyrightText: {year} Espressif Systems (Shanghai) CO LTD\n'.format(year=year))

View File

@ -50,7 +50,7 @@ def main() -> None:
glob_iter = glob.glob(os.path.join(idf_path, 'components', '**', f'*.{extension}'), recursive=True) glob_iter = glob.glob(os.path.join(idf_path, 'components', '**', f'*.{extension}'), recursive=True)
source_files_iters.append(glob_iter) source_files_iters.append(glob_iter)
for filename in itertools.chain(*source_files_iters): for filename in itertools.chain(*source_files_iters):
with open(filename, 'r') as f_obj: with open(filename, 'r', encoding='utf-8') as f_obj:
file_contents = f_obj.read() file_contents = f_obj.read()
if ESP_SYSTEM_INIT_FN_STR not in file_contents: if ESP_SYSTEM_INIT_FN_STR not in file_contents:
continue continue
@ -88,7 +88,7 @@ def main() -> None:
# 3. Load startup entries list from STARTUP_ENTRIES_FILE, removing comments and empty lines # 3. Load startup entries list from STARTUP_ENTRIES_FILE, removing comments and empty lines
# #
startup_entries_expected_lines = [] startup_entries_expected_lines = []
with open(os.path.join(idf_path, STARTUP_ENTRIES_FILE), 'r') as startup_entries_expected_file: with open(os.path.join(idf_path, STARTUP_ENTRIES_FILE), 'r', encoding='utf-8') as startup_entries_expected_file:
for line in startup_entries_expected_file: for line in startup_entries_expected_file:
if line.startswith('#') or len(line.strip()) == 0: if line.startswith('#') or len(line.strip()) == 0:
continue continue

View File

@ -4,7 +4,6 @@
# #
# SPDX-License-Identifier: Apache-2.0 # SPDX-License-Identifier: Apache-2.0
# #
import json import json
import logging import logging
import os.path import os.path
@ -26,7 +25,7 @@ def get_prefix_map_gdbinit_path(prog_path): # type: (str) -> Any
logging.warning('%s does not exist. Please build the app with "idf.py build"', desc_path) logging.warning('%s does not exist. Please build the app with "idf.py build"', desc_path)
return '' return ''
with open(desc_path, 'r') as f: with open(desc_path, 'r', encoding='utf-8') as f:
project_desc = json.load(f) project_desc = json.load(f)
return project_desc.get('debug_prefix_map_gdbinit') return project_desc.get('debug_prefix_map_gdbinit')

View File

@ -138,7 +138,7 @@ def write_to_c_header(init_key: bytes, k1: bytes, k2_info: bytes, k1_encrypted_3
test_data_xts_aes_128: list, k1_encrypted_64: list, test_data_xts_aes_128: list, k1_encrypted_64: list,
xts_test_data_xts_aes_256: list, pubx: bytes, xts_test_data_xts_aes_256: list, pubx: bytes,
puby: bytes, k1_G_0: bytes, k1_G_1: bytes) -> None: puby: bytes, k1_G_0: bytes, k1_G_1: bytes) -> None:
with open('key_manager_test_cases.h', 'w') as file: with open('key_manager_test_cases.h', 'w', encoding='utf-8') as file:
header_content = """#include <stdint.h> header_content = """#include <stdint.h>
#define TEST_COUNT 5 #define TEST_COUNT 5

View File

@ -676,7 +676,7 @@ def main():
if input_is_binary: if input_is_binary:
output = table.to_csv() output = table.to_csv()
with sys.stdout if args.output == '-' else open(args.output, 'w') as f: with sys.stdout if args.output == '-' else open(args.output, 'w', encoding='utf-8') as f:
f.write(output) f.write(output)
else: else:
output = table.to_binary() output = table.to_binary()

View File

@ -1,14 +1,13 @@
#!/usr/bin/env python #!/usr/bin/env python
# SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD # SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0 # SPDX-License-Identifier: Apache-2.0
import argparse import argparse
def gen_header_file(path: str, subtypes: str) -> None: def gen_header_file(path: str, subtypes: str) -> None:
HDR_MESSAGE = '/* Automatically generated file. DO NOT EDIT. */\n\n' HDR_MESSAGE = '/* Automatically generated file. DO NOT EDIT. */\n\n'
PARTTOOL_USAGE = 'If you want to use parttool.py manually, please use the following as an extra argument:' PARTTOOL_USAGE = 'If you want to use parttool.py manually, please use the following as an extra argument:'
with open(path, 'w') as f: with open(path, 'w', encoding='utf-8') as f:
f.write(HDR_MESSAGE) f.write(HDR_MESSAGE)
if subtypes: if subtypes:
f.write('/*\n\t' + PARTTOOL_USAGE + '\n\t') f.write('/*\n\t' + PARTTOOL_USAGE + '\n\t')

View File

@ -92,7 +92,7 @@ class ParttoolTarget():
partition_table = gen.PartitionTable.from_binary(f.read()) partition_table = gen.PartitionTable.from_binary(f.read())
if partition_table is None: if partition_table is None:
with open(partition_table_file, 'r') as f: with open(partition_table_file, 'r', encoding='utf-8') as f:
f.seek(0) f.seek(0)
partition_table = gen.PartitionTable.from_csv(f.read()) partition_table = gen.PartitionTable.from_csv(f.read())
else: else:

View File

@ -1,9 +1,9 @@
#!/usr/bin/env python #!/usr/bin/env python
# esp32ulp_mapgen utility converts a symbol list provided by nm into an export script
# for the linker and a header file.
#
# SPDX-FileCopyrightText: 2016-2024 Espressif Systems (Shanghai) CO LTD # SPDX-FileCopyrightText: 2016-2024 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0 # SPDX-License-Identifier: Apache-2.0
#
# esp32ulp_mapgen utility converts a symbol list provided by nm into an export script
# for the linker and a header file.
import argparse import argparse
import os import os
import textwrap import textwrap
@ -64,7 +64,7 @@ def main() -> None:
args = parser.parse_args() args = parser.parse_args()
with open(args.outputfile + '.h', 'w') as f_h, open(args.outputfile + '.ld', 'w') as f_ld: with open(args.outputfile + '.h', 'w', encoding='utf-8') as f_h, open(args.outputfile + '.ld', 'w', encoding='utf-8') as f_ld:
gen_ld_h_from_sym(args.symfile, f_ld, f_h, int(args.base_addr, 0)) gen_ld_h_from_sym(args.symfile, f_ld, f_h, int(args.base_addr, 0))

View File

@ -316,7 +316,7 @@ def check_performance(idf_path: str) -> t.Callable[[str, float, str], None]:
""" """
def _find_perf_item(operator: str, path: str) -> float: def _find_perf_item(operator: str, path: str) -> float:
with open(path) as f: with open(path, encoding='utf-8') as f:
data = f.read() data = f.read()
match = re.search(fr'#define\s+IDF_PERFORMANCE_{operator}_{item.upper()}\s+([\d.]+)', data) match = re.search(fr'#define\s+IDF_PERFORMANCE_{operator}_{item.upper()}\s+([\d.]+)', data)
return float(match.group(1)) # type: ignore return float(match.group(1)) # type: ignore

View File

@ -1,6 +1,5 @@
# SPDX-FileCopyrightText: 2022-2023 Espressif Systems (Shanghai) CO LTD # SPDX-FileCopyrightText: 2022-2023 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Unlicense OR CC0-1.0 # SPDX-License-Identifier: Unlicense OR CC0-1.0
import logging import logging
import os import os
import re import re
@ -22,7 +21,7 @@ def get_sdk_path() -> str:
class CustomProcess(object): class CustomProcess(object):
def __init__(self, cmd: str, logfile: str, verbose:bool =True) -> None: def __init__(self, cmd: str, logfile: str, verbose:bool =True) -> None:
self.verbose = verbose self.verbose = verbose
self.f = open(logfile, 'w') self.f = open(logfile, 'w', encoding='utf-8')
if self.verbose: if self.verbose:
logging.info('Starting {} > {}'.format(cmd, self.f.name)) logging.info('Starting {} > {}'.format(cmd, self.f.name))
self.pexpect_proc = pexpect.spawn(cmd, timeout=60, logfile=self.f, encoding='utf-8', codec_errors='ignore') self.pexpect_proc = pexpect.spawn(cmd, timeout=60, logfile=self.f, encoding='utf-8', codec_errors='ignore')

View File

@ -132,7 +132,7 @@ def test_examples_protocol_https_server_simple(dut: Dut) -> None:
ssl_context.check_hostname = False ssl_context.check_hostname = False
ssl_context.load_verify_locations(cadata=server_cert_pem) ssl_context.load_verify_locations(cadata=server_cert_pem)
with open(CLIENT_CERT_FILE, 'w') as cert, open(CLIENT_KEY_FILE, 'w') as key: with open(CLIENT_CERT_FILE, 'w', encoding='utf-8') as cert, open(CLIENT_KEY_FILE, 'w', encoding='utf-8') as key:
cert.write(client_cert_pem) cert.write(client_cert_pem)
key.write(client_key_pem) key.write(client_key_pem)

View File

@ -1,11 +1,12 @@
# SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD # SPDX-FileCopyrightText: 2022-2024 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Unlicense OR CC0-1.0 # SPDX-License-Identifier: Unlicense OR CC0-1.0
import logging import logging
import os import os
import re import re
import ssl import ssl
import sys import sys
from threading import Event, Thread from threading import Event
from threading import Thread
import paho.mqtt.client as mqtt import paho.mqtt.client as mqtt
import pexpect import pexpect
@ -47,7 +48,7 @@ def on_message(client, userdata, msg): # type: (mqtt.Client, tuple, mqtt.client
event_client_received_binary.set() event_client_received_binary.set()
return return
recv_binary = binary + '.received' recv_binary = binary + '.received'
with open(recv_binary, 'w') as fw: with open(recv_binary, 'w', encoding='utf-8') as fw:
fw.write(msg.payload) fw.write(msg.payload)
raise ValueError('Received binary (saved as: {}) does not match the original file: {}'.format(recv_binary, binary)) raise ValueError('Received binary (saved as: {}) does not match the original file: {}'.format(recv_binary, binary))

View File

@ -21,7 +21,7 @@ def generate_token_data(hmac_key_file: str, output_file: Optional[str] = None) -
with open(output_file, 'wb') as out_file: with open(output_file, 'wb') as out_file:
out_file.write(token_data) out_file.write(token_data)
elif output_file.endswith('.hex'): elif output_file.endswith('.hex'):
with open(output_file, 'w') as out_file: with open(output_file, 'w', encoding='utf-8') as out_file:
out_file.write(token_hex) out_file.write(token_hex)
else: else:
print(f'Unsupported file format for output file: {output_file}') print(f'Unsupported file format for output file: {output_file}')

View File

@ -11,7 +11,7 @@ from pytest_embedded_idf import IdfDut
def run_gdb_test(dut: IdfDut) -> None: def run_gdb_test(dut: IdfDut) -> None:
with open(os.path.join(dut.logdir, 'ocd.txt'), 'w') as ocd_log, \ with open(os.path.join(dut.logdir, 'ocd.txt'), 'w', encoding='utf-8') as ocd_log, \
pexpect.spawn(f'openocd -f board/esp32c6-builtin.cfg', pexpect.spawn(f'openocd -f board/esp32c6-builtin.cfg',
timeout=60, timeout=60,
logfile=ocd_log, logfile=ocd_log,

View File

@ -47,7 +47,7 @@ def test_semihost_vfs(dut: IdfDut) -> None:
dut.expect_exact('example: Wrote 2776 bytes') dut.expect_exact('example: Wrote 2776 bytes')
dut.expect_exact('====================== HOST DATA START =========================') dut.expect_exact('====================== HOST DATA START =========================')
with open(HOST_FILE_PATH) as f: with open(HOST_FILE_PATH, encoding='utf-8') as f:
for line in f: for line in f:
if line.strip(): if line.strip():
dut.expect_exact(line.strip()) dut.expect_exact(line.strip())
@ -55,7 +55,7 @@ def test_semihost_vfs(dut: IdfDut) -> None:
dut.expect_exact('====================== HOST DATA END =========================') dut.expect_exact('====================== HOST DATA END =========================')
dut.expect_exact('example: Read 6121 bytes') dut.expect_exact('example: Read 6121 bytes')
with open(os.path.join(TEMP_DIR, 'esp32_stdout.txt')) as f: with open(os.path.join(TEMP_DIR, 'esp32_stdout.txt'), encoding='utf-8') as f:
def expected_content() -> t.Iterator[str]: def expected_content() -> t.Iterator[str]:
yield 'example: Switched to semihosted stdout' yield 'example: Switched to semihosted stdout'

View File

@ -14,7 +14,7 @@ def test_spiffsgen_example(dut: Dut) -> None:
base_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'spiffs_image') base_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'spiffs_image')
# Expect hello.txt is read successfully # Expect hello.txt is read successfully
with open(os.path.join(base_dir, 'hello.txt'), 'r') as hello_txt: with open(os.path.join(base_dir, 'hello.txt'), 'r', encoding='utf-8') as hello_txt:
dut.expect('Read from hello.txt: ' + hello_txt.read().rstrip()) dut.expect('Read from hello.txt: ' + hello_txt.read().rstrip())
# Expect alice.txt MD5 hash is computed accurately # Expect alice.txt MD5 hash is computed accurately

View File

@ -46,7 +46,7 @@ def test_examples_app_trace_basic(dut: IdfDut, openocd: OpenOcd) -> None:
assert 'Targets connected.' in dut.openocd.write('esp apptrace start file://apptrace.log 0 2000 3 0 0') assert 'Targets connected.' in dut.openocd.write('esp apptrace start file://apptrace.log 0 2000 3 0 0')
apptrace_wait_stop(dut.openocd) apptrace_wait_stop(dut.openocd)
with open(openocd._logfile) as oocd_log: # pylint: disable=protected-access with open(openocd._logfile, encoding='utf-8') as oocd_log: # pylint: disable=protected-access
cores = 1 if dut.app.sdkconfig.get('ESP_SYSTEM_SINGLE_CORE_MODE') is True else 2 cores = 1 if dut.app.sdkconfig.get('ESP_SYSTEM_SINGLE_CORE_MODE') is True else 2
params_str = 'App trace params: from {} cores,'.format(cores) params_str = 'App trace params: from {} cores,'.format(cores)
found = False found = False
@ -59,7 +59,7 @@ def test_examples_app_trace_basic(dut: IdfDut, openocd: OpenOcd) -> None:
'"{}" could not be found in {}'.format(params_str, openocd._logfile) # pylint: disable=protected-access '"{}" could not be found in {}'.format(params_str, openocd._logfile) # pylint: disable=protected-access
) )
with open('apptrace.log') as apptrace_log: with open('apptrace.log', encoding='utf-8') as apptrace_log:
for sample_num in range(1, 51): for sample_num in range(1, 51):
log_str = 'Apptrace test data[{}]:{}'.format(sample_num, sample_num * sample_num) log_str = 'Apptrace test data[{}]:{}'.format(sample_num, sample_num * sample_num)
found = False found = False

View File

@ -1,6 +1,5 @@
# SPDX-FileCopyrightText: 2023 Espressif Systems (Shanghai) CO LTD # SPDX-FileCopyrightText: 2023-2024 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0 # SPDX-License-Identifier: Apache-2.0
import argparse import argparse
import datetime import datetime
import json import json
@ -9,7 +8,8 @@ import signal
import sys import sys
from enum import Enum from enum import Enum
from functools import partial from functools import partial
from typing import Any, List from typing import Any
from typing import List
try: try:
import espytrace.apptrace import espytrace.apptrace
@ -47,7 +47,7 @@ app.layout = html.Div(
html.Div([ html.Div([
html.H2('Telemetry Data'), html.H2('Telemetry Data'),
html.Div(id='live-update-data'), html.Div(id='live-update-data'),
dcc.Graph(id='live-update-graph', style={'height': 800}), # Height of the plotting area setted to 800px dcc.Graph(id='live-update-graph', style={'height': 800}), # Height of the plotting area set to 800px
dcc.Interval( dcc.Interval(
id='interval-component', id='interval-component',
interval=5 * 100, # Graph will be updated every 500 ms interval=5 * 100, # Graph will be updated every 500 ms
@ -162,13 +162,13 @@ class CustomRequestHandler(espytrace.apptrace.TCPRequestHandler):
def read_json(file_path: str) -> Any: def read_json(file_path: str) -> Any:
with open(file_path, 'r') as f: with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f) data = json.load(f)
return data return data
def save_data(file_path: str) -> None: def save_data(file_path: str) -> None:
with open(file_path, 'w') as f: with open(file_path, 'w', encoding='utf-8') as f:
f.writelines(output_lines) f.writelines(output_lines)

View File

@ -99,7 +99,7 @@ class EfuseFlashEncSerial(IdfSerial):
with tempfile.NamedTemporaryFile(suffix='.json') as temp_file: with tempfile.NamedTemporaryFile(suffix='.json') as temp_file:
temp_file_path = temp_file.name temp_file_path = temp_file.name
espefuse.main(f'--virt -c {self.target} summary --format json --file {temp_file_path}'.split()) espefuse.main(f'--virt -c {self.target} summary --format json --file {temp_file_path}'.split())
with open(temp_file_path, 'r') as file: with open(temp_file_path, 'r', encoding='utf-8') as file:
efuse_summary = json.load(file) efuse_summary = json.load(file)
if efuse_name in efuse_summary: if efuse_name in efuse_summary:
data = efuse_summary[efuse_name] data = efuse_summary[efuse_name]

View File

@ -68,7 +68,7 @@ server_key = '-----BEGIN PRIVATE KEY-----\n'\
def create_file(server_file: str, file_data: str) -> None: def create_file(server_file: str, file_data: str) -> None:
with open(server_file, 'w+') as file: with open(server_file, 'w+', encoding='utf-8') as file:
file.write(file_data) file.write(file_data)

View File

@ -151,13 +151,13 @@ def start_https_server(ota_image_dir: str, server_ip: str, server_port: int, ser
if server_file is None: if server_file is None:
server_file = os.path.join(ota_image_dir, 'server_cert.pem') server_file = os.path.join(ota_image_dir, 'server_cert.pem')
cert_file_handle = open(server_file, 'w+') cert_file_handle = open(server_file, 'w+', encoding='utf-8')
cert_file_handle.write(server_cert) cert_file_handle.write(server_cert)
cert_file_handle.close() cert_file_handle.close()
if key_file is None: if key_file is None:
key_file = os.path.join(ota_image_dir, 'server_key.pem') key_file = os.path.join(ota_image_dir, 'server_key.pem')
key_file_handle = open('server_key.pem', 'w+') key_file_handle = open('server_key.pem', 'w+', encoding='utf-8')
key_file_handle.write(server_key) key_file_handle.write(server_key)
key_file_handle.close() key_file_handle.close()

View File

@ -80,13 +80,13 @@ def start_https_server(ota_image_dir: str, server_ip: str, server_port: int, ser
if server_file is None: if server_file is None:
server_file = os.path.join(ota_image_dir, 'server_cert.pem') server_file = os.path.join(ota_image_dir, 'server_cert.pem')
cert_file_handle = open(server_file, 'w+') cert_file_handle = open(server_file, 'w+', encoding='utf-8')
cert_file_handle.write(server_cert) cert_file_handle.write(server_cert)
cert_file_handle.close() cert_file_handle.close()
if key_file is None: if key_file is None:
key_file = os.path.join(ota_image_dir, 'server_key.pem') key_file = os.path.join(ota_image_dir, 'server_key.pem')
key_file_handle = open('server_key.pem', 'w+') key_file_handle = open('server_key.pem', 'w+', encoding='utf-8')
key_file_handle.write(server_key) key_file_handle.write(server_key)
key_file_handle.close() key_file_handle.close()
@ -102,12 +102,12 @@ def start_https_server(ota_image_dir: str, server_ip: str, server_port: int, ser
def start_tls1_3_server(ota_image_dir: str, server_port: int) -> subprocess.Popen: def start_tls1_3_server(ota_image_dir: str, server_port: int) -> subprocess.Popen:
os.chdir(ota_image_dir) os.chdir(ota_image_dir)
server_file = os.path.join(ota_image_dir, 'server_cert.pem') server_file = os.path.join(ota_image_dir, 'server_cert.pem')
cert_file_handle = open(server_file, 'w+') cert_file_handle = open(server_file, 'w+', encoding='utf-8')
cert_file_handle.write(server_cert) cert_file_handle.write(server_cert)
cert_file_handle.close() cert_file_handle.close()
key_file = os.path.join(ota_image_dir, 'server_key.pem') key_file = os.path.join(ota_image_dir, 'server_key.pem')
key_file_handle = open('server_key.pem', 'w+') key_file_handle = open('server_key.pem', 'w+', encoding='utf-8')
key_file_handle.write(server_key) key_file_handle.write(server_key)
key_file_handle.close() key_file_handle.close()

View File

@ -34,7 +34,7 @@ def test_examples_sysview_tracing(dut: IdfDut) -> None:
dut.gdb.write('c', non_blocking=True) dut.gdb.write('c', non_blocking=True)
time.sleep(1) # to avoid EOF file error time.sleep(1) # to avoid EOF file error
with open(dut.gdb._logfile) as fr: # pylint: disable=protected-access with open(dut.gdb._logfile, encoding='utf-8') as fr: # pylint: disable=protected-access
gdb_pexpect_proc = pexpect.fdpexpect.fdspawn(fr.fileno()) gdb_pexpect_proc = pexpect.fdpexpect.fdspawn(fr.fileno())
gdb_pexpect_proc.expect('Thread 2 "main" hit Breakpoint 1, app_main ()') gdb_pexpect_proc.expect('Thread 2 "main" hit Breakpoint 1, app_main ()')

View File

@ -49,7 +49,7 @@ def test_examples_sysview_tracing_heap_log(idf_path: str, dut: IdfDut) -> None:
sysviewtrace.expect(r'Found \d+ leaked bytes in \d+ blocks.', timeout=120) sysviewtrace.expect(r'Found \d+ leaked bytes in \d+ blocks.', timeout=120)
# Validate GDB logs # Validate GDB logs
with open(dut.gdb._logfile) as fr: # pylint: disable=protected-access with open(dut.gdb._logfile, encoding='utf-8') as fr: # pylint: disable=protected-access
gdb_pexpect_proc = pexpect.fdpexpect.fdspawn(fr.fileno()) gdb_pexpect_proc = pexpect.fdpexpect.fdspawn(fr.fileno())
gdb_pexpect_proc.expect_exact( gdb_pexpect_proc.expect_exact(
'Thread 2 "main" hit Temporary breakpoint 1, heap_trace_start (mode_param', timeout=10) # should be (mode_param=HEAP_TRACE_ALL) # TODO GCC-329 'Thread 2 "main" hit Temporary breakpoint 1, heap_trace_start (mode_param', timeout=10) # should be (mode_param=HEAP_TRACE_ALL) # TODO GCC-329

View File

@ -45,12 +45,12 @@ if __name__ == '__main__':
required_set = set() required_set = set()
for req_path in args.requirements: for req_path in args.requirements:
with open(req_path) as f: with open(req_path, encoding='utf-8') as f:
required_set |= set(i for i in map(str.strip, f.readlines()) if len(i) > 0 and not i.startswith('#')) required_set |= set(i for i in map(str.strip, f.readlines()) if len(i) > 0 and not i.startswith('#'))
constr_dict = {} # for example package_name -> package_name==1.0 constr_dict = {} # for example package_name -> package_name==1.0
for const_path in args.constraints: for const_path in args.constraints:
with open(const_path) as f: with open(const_path, encoding='utf-8') as f:
for con in [i for i in map(str.strip, f.readlines()) if len(i) > 0 and not i.startswith('#')]: for con in [i for i in map(str.strip, f.readlines()) if len(i) > 0 and not i.startswith('#')]:
if con.startswith('file://'): if con.startswith('file://'):
con = os.path.basename(con) con = os.path.basename(con)

View File

@ -104,7 +104,7 @@ class UnixShell(Shell):
'Go to the project directory and run:\n\n idf.py build"\n')) 'Go to the project directory and run:\n\n idf.py build"\n'))
def export(self) -> None: def export(self) -> None:
with open(self.script_file_path, 'w') as fd: with open(self.script_file_path, 'w', encoding='utf-8') as fd:
self.export_file(fd) self.export_file(fd)
print(f'. {self.script_file_path}') print(f'. {self.script_file_path}')
@ -133,7 +133,7 @@ class BashShell(UnixShell):
return autocom return autocom
def init_file(self) -> None: def init_file(self) -> None:
with open(self.script_file_path, 'w') as fd: with open(self.script_file_path, 'w', encoding='utf-8') as fd:
# We will use the --init-file option to pass a custom rc file, which will ignore .bashrc, # We will use the --init-file option to pass a custom rc file, which will ignore .bashrc,
# so we need to source .bashrc first. # so we need to source .bashrc first.
bashrc_path = os.path.expanduser('~/.bashrc') bashrc_path = os.path.expanduser('~/.bashrc')
@ -167,7 +167,7 @@ class ZshShell(UnixShell):
# If ZDOTDIR is unset, HOME is used instead. # If ZDOTDIR is unset, HOME is used instead.
# https://zsh.sourceforge.io/Doc/Release/Files.html#Startup_002fShutdown-Files # https://zsh.sourceforge.io/Doc/Release/Files.html#Startup_002fShutdown-Files
zdotdir = os.environ.get('ZDOTDIR', str(Path.home())) zdotdir = os.environ.get('ZDOTDIR', str(Path.home()))
with open(self.script_file_path, 'w') as fd: with open(self.script_file_path, 'w', encoding='utf-8') as fd:
# We will use the ZDOTDIR env variable to load our custom script in the newly spawned shell # We will use the ZDOTDIR env variable to load our custom script in the newly spawned shell
# so we need to source .zshrc first. # so we need to source .zshrc first.
zshrc_path = Path(zdotdir) / '.zshrc' zshrc_path = Path(zdotdir) / '.zshrc'
@ -211,7 +211,7 @@ class FishShell(UnixShell):
return stdout return stdout
def init_file(self) -> None: def init_file(self) -> None:
with open(self.script_file_path, 'w') as fd: with open(self.script_file_path, 'w', encoding='utf-8') as fd:
self.export_file(fd) self.export_file(fd)
def spawn(self) -> None: def spawn(self) -> None:
@ -249,7 +249,7 @@ class PowerShell(Shell):
print(f'{self.script_file_path}') print(f'{self.script_file_path}')
def init_file(self) -> None: def init_file(self) -> None:
with open(self.script_file_path, 'w') as fd: with open(self.script_file_path, 'w', encoding='utf-8') as fd:
# fd.write(f'{self.deactivate_cmd}\n') TODO in upcoming task IDF-10292 # fd.write(f'{self.deactivate_cmd}\n') TODO in upcoming task IDF-10292
for var, value in self.new_esp_idf_env.items(): for var, value in self.new_esp_idf_env.items():
if var == 'PATH': if var == 'PATH':
@ -297,7 +297,7 @@ class WinCmd(Shell):
print(f'call {self.script_file_path}') print(f'call {self.script_file_path}')
def init_file(self) -> None: def init_file(self) -> None:
with open(self.script_file_path, 'w') as fd: with open(self.script_file_path, 'w', encoding='utf-8') as fd:
fd.write('@echo off\n') fd.write('@echo off\n')
# fd.write(f'{self.deactivate_cmd}\n') TODO in upcoming task IDF-10292 # fd.write(f'{self.deactivate_cmd}\n') TODO in upcoming task IDF-10292
for var, value in self.new_esp_idf_env.items(): for var, value in self.new_esp_idf_env.items():

View File

@ -96,7 +96,7 @@ class KconfigWriter():
def update_file(self, kconfig_path, always_write): # type: (Path, bool) -> bool def update_file(self, kconfig_path, always_write): # type: (Path, bool) -> bool
try: try:
with open(kconfig_path, 'r') as f: with open(kconfig_path, 'r', encoding='utf-8') as f:
old_content = f.readlines() old_content = f.readlines()
except FileNotFoundError: except FileNotFoundError:
old_content = [''] old_content = ['']
@ -115,7 +115,7 @@ class KconfigWriter():
if file_needs_update: if file_needs_update:
print('\n' + 'Updating file: {}'.format(kconfig_path)) print('\n' + 'Updating file: {}'.format(kconfig_path))
with open(kconfig_path, 'w') as f: with open(kconfig_path, 'w', encoding='utf-8') as f:
f.writelines(new_content) f.writelines(new_content)
return file_needs_update return file_needs_update
@ -218,7 +218,7 @@ def generate_defines(soc_caps_dir, filename, always_write): # type: (Path, str,
def get_defines(header_path): # type: (Path) -> list[str] def get_defines(header_path): # type: (Path) -> list[str]
defines = [] defines = []
logging.info('Reading macros from {}...'.format(header_path)) logging.info('Reading macros from {}...'.format(header_path))
with open(header_path, 'r') as f: with open(header_path, 'r', encoding='utf-8') as f:
output = f.read() output = f.read()
for line in output.split('\n'): for line in output.split('\n'):

View File

@ -481,7 +481,7 @@ def init_cli(verbose_output: Optional[List]=None) -> Any:
# Otherwise, if we built any binaries print a message about # Otherwise, if we built any binaries print a message about
# how to flash them # how to flash them
def print_flashing_message(title: str, key: str) -> None: def print_flashing_message(title: str, key: str) -> None:
with open(os.path.join(args.build_dir, 'flasher_args.json')) as file: with open(os.path.join(args.build_dir, 'flasher_args.json'), encoding='utf-8') as file:
flasher_args: Dict[str, Any] = json.load(file) flasher_args: Dict[str, Any] = json.load(file)
def flasher_path(f: Union[str, 'os.PathLike[str]']) -> str: def flasher_path(f: Union[str, 'os.PathLike[str]']) -> str:
@ -789,7 +789,7 @@ def expand_file_arguments(argv: List[Any]) -> List[Any]:
visited.add(rel_path) visited.add(rel_path)
try: try:
with open(rel_path, 'r') as f: with open(rel_path, 'r', encoding='utf-8') as f:
for line in f: for line in f:
expanded_args.extend(expand_args(shlex.split(line), os.path.dirname(rel_path), file_stack + [file_name])) expanded_args.extend(expand_args(shlex.split(line), os.path.dirname(rel_path), file_stack + [file_name]))
except IOError: except IOError:

View File

@ -16,7 +16,7 @@ def get_type(action: str) -> str:
def replace_in_file(filename: str, pattern: str, replacement: str) -> None: def replace_in_file(filename: str, pattern: str, replacement: str) -> None:
with open(filename, 'r+') as f: with open(filename, 'r+', encoding='utf-8') as f:
content = f.read() content = f.read()
overwritten_content = re.sub(pattern, replacement, content, flags=re.M) overwritten_content = re.sub(pattern, replacement, content, flags=re.M)
f.seek(0) f.seek(0)

View File

@ -70,7 +70,7 @@ def action_extensions(base_actions: Dict, project_path: str) -> Dict:
if p.poll() is not None: if p.poll() is not None:
print('OpenOCD exited with {}'.format(p.poll())) print('OpenOCD exited with {}'.format(p.poll()))
break break
with open(name, 'r') as f: with open(name, 'r', encoding='utf-8') as f:
content = f.read() content = f.read()
if re.search(r'Listening on port \d+ for gdb connections', content): if re.search(r'Listening on port \d+ for gdb connections', content):
# expect OpenOCD has started successfully - stop watching # expect OpenOCD has started successfully - stop watching
@ -78,7 +78,7 @@ def action_extensions(base_actions: Dict, project_path: str) -> Dict:
time.sleep(0.5) time.sleep(0.5)
# OpenOCD exited or is not listening -> print full log and terminate # OpenOCD exited or is not listening -> print full log and terminate
with open(name, 'r') as f: with open(name, 'r', encoding='utf-8') as f:
print(f.read()) print(f.read())
raise FatalError('Action "{}" failed due to errors in OpenOCD'.format(target), ctx) raise FatalError('Action "{}" failed due to errors in OpenOCD'.format(target), ctx)
@ -194,7 +194,7 @@ def action_extensions(base_actions: Dict, project_path: str) -> Dict:
name = processes[target + '_outfile_name'] name = processes[target + '_outfile_name']
pos = 0 pos = 0
while True: while True:
with open(name, 'r') as f: with open(name, 'r', encoding='utf-8') as f:
f.seek(pos) f.seek(pos)
for line in f: for line in f:
print(line.rstrip()) print(line.rstrip())
@ -212,7 +212,7 @@ def action_extensions(base_actions: Dict, project_path: str) -> Dict:
desc_path = os.path.join(args.build_dir, 'project_description.json') desc_path = os.path.join(args.build_dir, 'project_description.json')
if not os.path.exists(desc_path): if not os.path.exists(desc_path):
ensure_build_directory(args, ctx.info_name) ensure_build_directory(args, ctx.info_name)
with open(desc_path, 'r') as f: with open(desc_path, 'r', encoding='utf-8') as f:
project_desc = json.load(f) project_desc = json.load(f)
return project_desc return project_desc
@ -237,7 +237,7 @@ def action_extensions(base_actions: Dict, project_path: str) -> Dict:
local_dir = project_desc['build_dir'] local_dir = project_desc['build_dir']
args = ['openocd'] + shlex.split(openocd_arguments) args = ['openocd'] + shlex.split(openocd_arguments)
openocd_out_name = os.path.join(local_dir, OPENOCD_OUT_FILE) openocd_out_name = os.path.join(local_dir, OPENOCD_OUT_FILE)
openocd_out = open(openocd_out_name, 'w') openocd_out = open(openocd_out_name, 'w', encoding='utf-8')
try: try:
process = subprocess.Popen(args, stdout=openocd_out, stderr=subprocess.STDOUT, bufsize=1) process = subprocess.Popen(args, stdout=openocd_out, stderr=subprocess.STDOUT, bufsize=1)
except Exception as e: except Exception as e:
@ -350,7 +350,7 @@ def action_extensions(base_actions: Dict, project_path: str) -> Dict:
if gdbgui_port is not None: if gdbgui_port is not None:
gdbgui_args += ['--port', gdbgui_port] gdbgui_args += ['--port', gdbgui_port]
gdbgui_out_name = os.path.join(local_dir, GDBGUI_OUT_FILE) gdbgui_out_name = os.path.join(local_dir, GDBGUI_OUT_FILE)
gdbgui_out = open(gdbgui_out_name, 'w') gdbgui_out = open(gdbgui_out_name, 'w', encoding='utf-8')
env = os.environ.copy() env = os.environ.copy()
# The only known solution for https://github.com/cs01/gdbgui/issues/359 is to set the following environment # The only known solution for https://github.com/cs01/gdbgui/issues/359 is to set the following environment
# variable. The greenlet package cannot be downgraded for compatibility with other requirements (gdbgui, # variable. The greenlet package cannot be downgraded for compatibility with other requirements (gdbgui,

View File

@ -209,7 +209,7 @@ def action_extensions(base_actions: Dict, project_path: str) -> Dict:
desc_path = os.path.join(args.build_dir, 'project_description.json') desc_path = os.path.join(args.build_dir, 'project_description.json')
if not os.path.exists(desc_path): if not os.path.exists(desc_path):
ensure_build_directory(args, ctx.info_name) ensure_build_directory(args, ctx.info_name)
with open(desc_path, 'r') as f: with open(desc_path, 'r', encoding='utf-8') as f:
project_desc = json.load(f) project_desc = json.load(f)
return project_desc return project_desc

View File

@ -53,7 +53,7 @@ def action_extensions(base_actions: Dict, project_path: str) -> Dict:
desc_path = os.path.join(args.build_dir, 'project_description.json') desc_path = os.path.join(args.build_dir, 'project_description.json')
if not os.path.exists(desc_path): if not os.path.exists(desc_path):
ensure_build_directory(args, ctx.info_name) ensure_build_directory(args, ctx.info_name)
with open(desc_path, 'r') as f: with open(desc_path, 'r', encoding='utf-8') as f:
project_desc = json.load(f) project_desc = json.load(f)
return project_desc return project_desc
@ -71,7 +71,7 @@ def action_extensions(base_actions: Dict, project_path: str) -> Dict:
result += ['-p', args.port] result += ['-p', args.port]
result += ['-b', str(args.baud)] result += ['-b', str(args.baud)]
with open(os.path.join(args.build_dir, 'flasher_args.json')) as f: with open(os.path.join(args.build_dir, 'flasher_args.json'), encoding='utf-8') as f:
flasher_args = json.load(f) flasher_args = json.load(f)
extra_esptool_args = flasher_args['extra_esptool_args'] extra_esptool_args = flasher_args['extra_esptool_args']

View File

@ -64,7 +64,7 @@ def _set_build_context(args: 'PropertyDict') -> None:
proj_desc_fn = f'{args.build_dir}/project_description.json' proj_desc_fn = f'{args.build_dir}/project_description.json'
try: try:
with open(proj_desc_fn, 'r') as f: with open(proj_desc_fn, 'r', encoding='utf-8') as f:
ctx['proj_desc'] = json.load(f) ctx['proj_desc'] = json.load(f)
except (OSError, ValueError) as e: except (OSError, ValueError) as e:
raise FatalError(f'Cannot load {proj_desc_fn}: {e}') raise FatalError(f'Cannot load {proj_desc_fn}: {e}')
@ -85,7 +85,7 @@ def _idf_version_from_cmake() -> Optional[str]:
regex = re.compile(r'^\s*set\s*\(\s*IDF_VERSION_([A-Z]{5})\s+(\d+)') regex = re.compile(r'^\s*set\s*\(\s*IDF_VERSION_([A-Z]{5})\s+(\d+)')
ver = {} ver = {}
try: try:
with open(version_path) as f: with open(version_path, encoding='utf-8') as f:
for line in f: for line in f:
m = regex.match(line) m = regex.match(line)
@ -189,7 +189,7 @@ def load_hints() -> Dict:
} }
current_module_dir = os.path.dirname(__file__) current_module_dir = os.path.dirname(__file__)
with open(os.path.join(current_module_dir, 'hints.yml'), 'r') as file: with open(os.path.join(current_module_dir, 'hints.yml'), 'r', encoding='utf-8') as file:
hints['yml'] = yaml.safe_load(file) hints['yml'] = yaml.safe_load(file)
hint_modules_dir = os.path.join(current_module_dir, 'hint_modules') hint_modules_dir = os.path.join(current_module_dir, 'hint_modules')
@ -263,7 +263,7 @@ def generate_hints(*filenames: str) -> Generator:
"""Getting output files and printing hints on how to resolve errors based on the output.""" """Getting output files and printing hints on how to resolve errors based on the output."""
hints = load_hints() hints = load_hints()
for file_name in filenames: for file_name in filenames:
with open(file_name, 'r') as file: with open(file_name, 'r', encoding='utf-8') as file:
yield from generate_hints_buffer(file.read(), hints) yield from generate_hints_buffer(file.read(), hints)
@ -691,7 +691,7 @@ def get_sdkconfig_filename(args: 'PropertyDict', cache_cmdl: Optional[Dict]=None
proj_desc_path = os.path.join(args.build_dir, 'project_description.json') proj_desc_path = os.path.join(args.build_dir, 'project_description.json')
try: try:
with open(proj_desc_path, 'r') as f: with open(proj_desc_path, 'r', encoding='utf-8') as f:
proj_desc = json.load(f) proj_desc = json.load(f)
return str(proj_desc['config_file']) return str(proj_desc['config_file'])
except (OSError, KeyError): except (OSError, KeyError):
@ -712,7 +712,7 @@ def get_sdkconfig_value(sdkconfig_file: str, key: str) -> Optional[str]:
value = None value = None
# if the value is quoted, this excludes the quotes from the value # if the value is quoted, this excludes the quotes from the value
pattern = re.compile(r"^{}=\"?([^\"]*)\"?$".format(key)) pattern = re.compile(r"^{}=\"?([^\"]*)\"?$".format(key))
with open(sdkconfig_file, 'r') as f: with open(sdkconfig_file, 'r', encoding='utf-8') as f:
for line in f: for line in f:
match = re.match(pattern, line) match = re.match(pattern, line)
if match: if match:

View File

@ -1575,7 +1575,7 @@ class ENVState:
if cls.deactivate_file_path: if cls.deactivate_file_path:
try: try:
with open(cls.deactivate_file_path, 'r') as fp: with open(cls.deactivate_file_path, 'r', encoding='utf-8') as fp:
env_state_obj.idf_variables = json.load(fp) env_state_obj.idf_variables = json.load(fp)
except (IOError, OSError, ValueError): except (IOError, OSError, ValueError):
pass pass
@ -1585,7 +1585,7 @@ class ENVState:
try: try:
if self.deactivate_file_path and os.path.basename(self.deactivate_file_path).endswith(f'idf_{str(os.getppid())}'): if self.deactivate_file_path and os.path.basename(self.deactivate_file_path).endswith(f'idf_{str(os.getppid())}'):
# If exported file path/name exists and belongs to actual opened shell # If exported file path/name exists and belongs to actual opened shell
with open(self.deactivate_file_path, 'w') as w: with open(self.deactivate_file_path, 'w', encoding='utf-8') as w:
json.dump(self.idf_variables, w, ensure_ascii=False, indent=4) # type: ignore json.dump(self.idf_variables, w, ensure_ascii=False, indent=4) # type: ignore
else: else:
with tempfile.NamedTemporaryFile(delete=False, suffix=f'idf_{str(os.getppid())}') as fp: with tempfile.NamedTemporaryFile(delete=False, suffix=f'idf_{str(os.getppid())}') as fp:
@ -1604,7 +1604,7 @@ def load_tools_info() -> Dict[str, IDFTool]:
tool_versions_file_name = g.tools_json tool_versions_file_name = g.tools_json
with open(tool_versions_file_name, 'r') as f: # type: ignore with open(tool_versions_file_name, 'r', encoding='utf-8') as f: # type: ignore
tools_info = json.load(f) tools_info = json.load(f)
return parse_tools_info_json(tools_info) # type: ignore return parse_tools_info_json(tools_info) # type: ignore
@ -1666,7 +1666,7 @@ def get_idf_version() -> str:
version_file_path = os.path.join(g.idf_path, 'version.txt') version_file_path = os.path.join(g.idf_path, 'version.txt')
if os.path.exists(version_file_path): if os.path.exists(version_file_path):
with open(version_file_path, 'r') as version_file: with open(version_file_path, 'r', encoding='utf-8') as version_file:
idf_version_str = version_file.read() idf_version_str = version_file.read()
match = re.match(r'^v([0-9]+\.[0-9]+).*', idf_version_str) match = re.match(r'^v([0-9]+\.[0-9]+).*', idf_version_str)
@ -1675,7 +1675,7 @@ def get_idf_version() -> str:
if idf_version is None: if idf_version is None:
try: try:
with open(os.path.join(g.idf_path, 'components', 'esp_common', 'include', 'esp_idf_version.h')) as f: with open(os.path.join(g.idf_path, 'components', 'esp_common', 'include', 'esp_idf_version.h'), encoding='utf-8') as f:
m = re.search(r'^#define\s+ESP_IDF_VERSION_MAJOR\s+(\d+).+?^#define\s+ESP_IDF_VERSION_MINOR\s+(\d+)', m = re.search(r'^#define\s+ESP_IDF_VERSION_MAJOR\s+(\d+).+?^#define\s+ESP_IDF_VERSION_MINOR\s+(\d+)',
f.read(), re.DOTALL | re.MULTILINE) f.read(), re.DOTALL | re.MULTILINE)
if m: if m:
@ -2136,7 +2136,7 @@ def process_tool(
def check_python_venv_compatibility(idf_python_env_path: str, idf_version: str) -> None: def check_python_venv_compatibility(idf_python_env_path: str, idf_version: str) -> None:
try: try:
with open(os.path.join(idf_python_env_path, VENV_VER_FILE), 'r') as f: with open(os.path.join(idf_python_env_path, VENV_VER_FILE), 'r', encoding='utf-8') as f:
read_idf_version = f.read().strip() read_idf_version = f.read().strip()
if read_idf_version != idf_version: if read_idf_version != idf_version:
fatal(f'Python environment is set to {idf_python_env_path} which was generated for ' fatal(f'Python environment is set to {idf_python_env_path} which was generated for '
@ -2643,7 +2643,7 @@ def action_install_python_env(args): # type: ignore
stdout=sys.stdout, stderr=sys.stderr) stdout=sys.stdout, stderr=sys.stderr)
try: try:
with open(os.path.join(idf_python_env_path, VENV_VER_FILE), 'w') as f: with open(os.path.join(idf_python_env_path, VENV_VER_FILE), 'w', encoding='utf-8') as f:
f.write(idf_version) f.write(idf_version)
except OSError as e: except OSError as e:
warn(f'The following issue occurred while generating the ESP-IDF version file in the Python environment: {e}. ' warn(f'The following issue occurred while generating the ESP-IDF version file in the Python environment: {e}. '
@ -2781,7 +2781,7 @@ class ChecksumFileParser():
sha256_file = sha256_file_tmp sha256_file = sha256_file_tmp
download(url, sha256_file) download(url, sha256_file)
with open(sha256_file, 'r') as f: with open(sha256_file, 'r', encoding='utf-8') as f:
self.checksum = f.read().splitlines() self.checksum = f.read().splitlines()
# remove temp file # remove temp file
@ -2867,7 +2867,7 @@ def action_add_version(args: Any) -> None:
json_str = dump_tools_json(tools_info) json_str = dump_tools_json(tools_info)
if not args.output: if not args.output:
args.output = os.path.join(g.idf_path, TOOLS_FILE_NEW) # type: ignore args.output = os.path.join(g.idf_path, TOOLS_FILE_NEW) # type: ignore
with open(args.output, 'w') as f: with open(args.output, 'w', encoding='utf-8') as f:
f.write(json_str) f.write(json_str)
f.write('\n') f.write('\n')
info(f'Wrote output to {args.output}') info(f'Wrote output to {args.output}')
@ -2881,7 +2881,7 @@ def action_rewrite(args): # type: ignore
json_str = dump_tools_json(tools_info) json_str = dump_tools_json(tools_info)
if not args.output: if not args.output:
args.output = os.path.join(g.idf_path, TOOLS_FILE_NEW) args.output = os.path.join(g.idf_path, TOOLS_FILE_NEW)
with open(args.output, 'w') as f: with open(args.output, 'w', encoding='utf-8') as f:
f.write(json_str) f.write(json_str)
f.write('\n') f.write('\n')
info(f'Wrote output to {args.output}') info(f'Wrote output to {args.output}')
@ -2974,10 +2974,10 @@ def action_validate(args): # type: ignore
fatal('You need to install jsonschema package to use validate command') fatal('You need to install jsonschema package to use validate command')
raise SystemExit(1) raise SystemExit(1)
with open(os.path.join(g.idf_path, TOOLS_FILE), 'r') as tools_file: with open(os.path.join(g.idf_path, TOOLS_FILE), 'r', encoding='utf-8') as tools_file:
tools_json = json.load(tools_file) tools_json = json.load(tools_file)
with open(os.path.join(g.idf_path, TOOLS_SCHEMA_FILE), 'r') as schema_file: with open(os.path.join(g.idf_path, TOOLS_SCHEMA_FILE), 'r', encoding='utf-8') as schema_file:
schema_json = json.load(schema_file) schema_json = json.load(schema_file)
jsonschema.validate(tools_json, schema_json) jsonschema.validate(tools_json, schema_json)
# on failure, this will raise an exception with a fairly verbose diagnostic message # on failure, this will raise an exception with a fairly verbose diagnostic message

View File

@ -1,12 +1,9 @@
#!/usr/bin/env python #!/usr/bin/env python
# SPDX-FileCopyrightText: 2022-2024 Espressif Systems (Shanghai) CO LTD
# SPDX-FileCopyrightText: 2022-2023 Espressif Systems (Shanghai) CO LTD
# #
# SPDX-License-Identifier: Apache-2.0 # SPDX-License-Identifier: Apache-2.0
# This script is used from the $IDF_PATH/install.* scripts. This way the argument parsing can be done at one place and # This script is used from the $IDF_PATH/install.* scripts. This way the argument parsing can be done at one place and
# doesn't have to be implemented for all shells. # doesn't have to be implemented for all shells.
import argparse import argparse
import json import json
import os import os
@ -57,7 +54,7 @@ def action_print_help(script_extension: str) -> None:
# extract the list of features from ./requirements.json # extract the list of features from ./requirements.json
thisdir = os.path.dirname(os.path.realpath(__file__)) thisdir = os.path.dirname(os.path.realpath(__file__))
with open(f'{thisdir}/requirements.json', 'r') as f: with open(f'{thisdir}/requirements.json', 'r', encoding='utf-8') as f:
json_data = json.load(f) json_data = json.load(f)
features = [feat['name'] for feat in json_data['features']] features = [feat['name'] for feat in json_data['features']]

View File

@ -165,7 +165,7 @@ def main():
if exc.errno != errno.EEXIST: if exc.errno != errno.EEXIST:
raise raise
with open(output_path, 'w') as f: # only create output file after generation has suceeded with open(output_path, 'w', encoding='utf-8') as f: # only create output file after generation has succeeded
f.write(output.read()) f.write(output.read())
except LdGenFailure as e: except LdGenFailure as e:
print('linker script generation failed for %s\nERROR: %s' % (input_file.name, e)) print('linker script generation failed for %s\nERROR: %s' % (input_file.name, e))

View File

@ -23,7 +23,7 @@ def create_temp_files(args):
def strip_blank_lines(input_filename, output_filename): def strip_blank_lines(input_filename, output_filename):
with open(input_filename, 'r') as read_from, open(output_filename,'w', newline='') as write_to: with open(input_filename, 'r', encoding='utf-8') as read_from, open(output_filename,'w', newline='', encoding='utf-8') as write_to:
for line in read_from: for line in read_from:
if not line.isspace(): if not line.isspace():
write_to.write(line) write_to.write(line)
@ -32,7 +32,7 @@ def strip_blank_lines(input_filename, output_filename):
def verify_values_exist(input_values_file, keys_in_values_file): def verify_values_exist(input_values_file, keys_in_values_file):
""" Verify all keys have corresponding values in values file """ Verify all keys have corresponding values in values file
""" """
with open(input_values_file, 'r') as values_file: with open(input_values_file, 'r', encoding='utf-8') as values_file:
values_file_reader = csv.reader(values_file, delimiter=',') values_file_reader = csv.reader(values_file, delimiter=',')
next(values_file_reader) next(values_file_reader)
@ -48,7 +48,7 @@ def verify_keys_exist(values_file_keys, input_config_file):
""" """
keys_missing = [] keys_missing = []
with open(input_config_file,'r') as config_file: with open(input_config_file,'r', encoding='utf-8') as config_file:
config_file_reader = csv.reader(config_file, delimiter=',') config_file_reader = csv.reader(config_file, delimiter=',')
for line_num, line in enumerate(config_file_reader, start=1): for line_num, line in enumerate(config_file_reader, start=1):
@ -74,7 +74,7 @@ def verify_datatype_encoding(input_config_file):
valid_encodings = {'string', 'binary', 'hex2bin','u8', 'i8', 'u16', 'u32', 'i32', 'u64', 'i64','base64'} valid_encodings = {'string', 'binary', 'hex2bin','u8', 'i8', 'u16', 'u32', 'i32', 'u64', 'i64','base64'}
valid_datatypes = {'file','data','namespace'} valid_datatypes = {'file','data','namespace'}
with open(input_config_file,'r') as config_file: with open(input_config_file,'r', encoding='utf-8') as config_file:
config_file_reader = csv.reader(config_file, delimiter=',') config_file_reader = csv.reader(config_file, delimiter=',')
for line_num, line in enumerate(config_file_reader, start=1): for line_num, line in enumerate(config_file_reader, start=1):
@ -90,7 +90,7 @@ def verify_file_data_count(input_config_file, keys_repeat):
""" Verify count of data on each line in config file is equal to 3 """ Verify count of data on each line in config file is equal to 3
(as format must be: <key,type and encoding>) (as format must be: <key,type and encoding>)
""" """
with open(input_config_file, 'r') as config_file: with open(input_config_file, 'r', encoding='utf-8') as config_file:
config_file_reader = csv.reader(config_file, delimiter=',') config_file_reader = csv.reader(config_file, delimiter=',')
for line_num, line in enumerate(config_file_reader, start=1): for line_num, line in enumerate(config_file_reader, start=1):
@ -136,7 +136,7 @@ def add_config_data_per_namespace(input_config_file):
config_data_to_write = [] config_data_to_write = []
config_data_per_namespace = [] config_data_per_namespace = []
with open(input_config_file,'r') as csv_config_file: with open(input_config_file,'r', encoding='utf-8') as csv_config_file:
config_file_reader = csv.reader(csv_config_file, delimiter=',') config_file_reader = csv.reader(csv_config_file, delimiter=',')
# `config_data_per_namespace` is added to `config_data_to_write` list after reading next namespace # `config_data_per_namespace` is added to `config_data_to_write` list after reading next namespace
@ -182,7 +182,7 @@ def add_data_to_file(config_data_to_write, key_value_pair, output_csv_file):
header = ['key', 'type', 'encoding', 'value'] header = ['key', 'type', 'encoding', 'value']
data_to_write = [] data_to_write = []
with open(output_csv_file, 'w', newline='') as target_csv_file: with open(output_csv_file, 'w', newline='', encoding='utf-8') as target_csv_file:
output_file_writer = csv.writer(target_csv_file, delimiter=',') output_file_writer = csv.writer(target_csv_file, delimiter=',')
output_file_writer.writerow(header) output_file_writer.writerow(header)
@ -214,7 +214,7 @@ def create_dir(filetype, output_dir_path):
def set_repeat_value(total_keys_repeat, keys, csv_file, target_filename): def set_repeat_value(total_keys_repeat, keys, csv_file, target_filename):
with open(csv_file, 'r') as read_from, open(target_filename,'w', newline='') as write_to: with open(csv_file, 'r', encoding='utf-8') as read_from, open(target_filename,'w', newline='', encoding='utf-8') as write_to:
csv_file_reader = csv.reader(read_from, delimiter=',') csv_file_reader = csv.reader(read_from, delimiter=',')
headers = next(csv_file_reader) headers = next(csv_file_reader)
values = next(csv_file_reader) values = next(csv_file_reader)
@ -247,7 +247,7 @@ def create_intermediate_csv(args, keys_in_values_file, keys_repeat, is_encr=Fals
config_data_to_write = add_config_data_per_namespace(args.conf) config_data_to_write = add_config_data_per_namespace(args.conf)
try: try:
with open(args.values, 'r') as csv_values_file: with open(args.values, 'r', encoding='utf-8') as csv_values_file:
values_file_reader = csv.reader(csv_values_file, delimiter=',') values_file_reader = csv.reader(csv_values_file, delimiter=',')
keys = next(values_file_reader) keys = next(values_file_reader)
@ -258,7 +258,7 @@ def create_intermediate_csv(args, keys_in_values_file, keys_repeat, is_encr=Fals
else: else:
target_values_file = args.values target_values_file = args.values
with open(target_values_file, 'r') as csv_values_file: with open(target_values_file, 'r', encoding='utf-8') as csv_values_file:
values_file_reader = csv.reader(csv_values_file, delimiter=',') values_file_reader = csv.reader(csv_values_file, delimiter=',')
next(values_file_reader) next(values_file_reader)
@ -341,7 +341,7 @@ def verify_file_format(args):
raise SystemExit('Error: values file: %s is empty.' % args.values) raise SystemExit('Error: values file: %s is empty.' % args.values)
# Extract keys from config file # Extract keys from config file
with open(args.conf, 'r') as config_file: with open(args.conf, 'r', encoding='utf-8') as config_file:
config_file_reader = csv.reader(config_file, delimiter=',') config_file_reader = csv.reader(config_file, delimiter=',')
for config_data in config_file_reader: for config_data in config_file_reader:
if 'namespace' not in config_data: if 'namespace' not in config_data:
@ -350,7 +350,7 @@ def verify_file_format(args):
keys_repeat.append(config_data[0]) keys_repeat.append(config_data[0])
# Extract keys from values file # Extract keys from values file
with open(args.values, 'r') as values_file: with open(args.values, 'r', encoding='utf-8') as values_file:
values_file_reader = csv.reader(values_file, delimiter=',') values_file_reader = csv.reader(values_file, delimiter=',')
keys_in_values_file = next(values_file_reader) keys_in_values_file = next(values_file_reader)

View File

@ -1,6 +1,6 @@
#!/usr/bin/env python #!/usr/bin/env python
# #
# SPDX-FileCopyrightText: 2020-2022 Espressif Systems (Shanghai) CO LTD # SPDX-FileCopyrightText: 2020-2024 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0 # SPDX-License-Identifier: Apache-2.0
# #
# This program creates archives compatible with ESP32-S* ROM DFU implementation. # This program creates archives compatible with ESP32-S* ROM DFU implementation.
@ -9,9 +9,6 @@
# as a separate file. In addition to that, a special index file, 'dfuinfo0.dat', is created. # as a separate file. In addition to that, a special index file, 'dfuinfo0.dat', is created.
# This file must be the first one in the archive. It contains binary structures describing each # This file must be the first one in the archive. It contains binary structures describing each
# subsequent file (for example, where the file needs to be flashed/loaded). # subsequent file (for example, where the file needs to be flashed/loaded).
from __future__ import print_function, unicode_literals
import argparse import argparse
import hashlib import hashlib
import json import json
@ -308,7 +305,7 @@ def main(): # type: () -> None
''' '''
return check_file(os.path.relpath(os.path.join(json_dir, path), start=os.curdir)) return check_file(os.path.relpath(os.path.join(json_dir, path), start=os.curdir))
with open(args.json) as f: with open(args.json, encoding='utf-8') as f:
files += [(int(addr, 0), files += [(int(addr, 0),
process_json_file(f_name)) for addr, f_name in json.load(f)['flash_files'].items()] process_json_file(f_name)) for addr, f_name in json.load(f)['flash_files'].items()]

View File

@ -1,16 +1,15 @@
#!/usr/bin/env python #!/usr/bin/env python
# #
# SPDX-FileCopyrightText: 2020-2023 Espressif Systems (Shanghai) CO LTD # SPDX-FileCopyrightText: 2020-2024 Espressif Systems (Shanghai) CO LTD
# SPDX-License-Identifier: Apache-2.0 # SPDX-License-Identifier: Apache-2.0
# Module was moved to the esptool in ESP-IDF v5.2 and relicensed under GPL v2.0 license. # Module was moved to the esptool in ESP-IDF v5.2 and relicensed under GPL v2.0 license.
from __future__ import division
import argparse import argparse
import json import json
import os import os
import subprocess import subprocess
import sys import sys
from typing import List
from typing import Tuple
def main() -> None: def main() -> None:
@ -75,7 +74,7 @@ def main() -> None:
raise RuntimeError('{} is not a regular file!'.format(file_name)) raise RuntimeError('{} is not a regular file!'.format(file_name))
return file_name return file_name
files = [] files: List[Tuple[int, str]] = []
if args.files: if args.files:
files += [(addr, check_file(f_name)) for addr, f_name in zip(args.files[::2], args.files[1::2])] files += [(addr, check_file(f_name)) for addr, f_name in zip(args.files[::2], args.files[1::2])]
@ -89,7 +88,7 @@ def main() -> None:
''' '''
return check_file(os.path.abspath(os.path.join(json_dir, path))) return check_file(os.path.abspath(os.path.join(json_dir, path)))
with open(args.json) as f: with open(args.json, encoding='utf-8') as f:
json_content = json.load(f) json_content = json.load(f)
if args.bin: if args.bin:
@ -107,10 +106,10 @@ def main() -> None:
files += [(addr, process_json_file(f_name)) for addr, f_name in flash_dic.items()] files += [(addr, process_json_file(f_name)) for addr, f_name in flash_dic.items()]
# remove possible duplicates and sort based on the address # remove possible duplicates and sort based on the address
files = sorted([(addr, f_name) for addr, f_name in dict(files).items()], key=lambda x: x[0]) # type: ignore files = sorted([(addr, f_name) for addr, f_name in dict(files).items()], key=lambda x: x[0])
# list of tuples to simple list # list of tuples to simple list
files = [item for t in files for item in t] files_flatten = [item for t in files for item in t]
cmd = [ cmd = [
sys.executable, '-m', 'esptool', sys.executable, '-m', 'esptool',
@ -125,10 +124,10 @@ def main() -> None:
if args.md5_disable: if args.md5_disable:
cmd.append('--md5-disable') cmd.append('--md5-disable')
cmd_str = ' '.join(cmd + files) cmd_str = ' '.join(cmd + files_flatten)
print(f'Executing: {cmd_str}') print(f'Executing: {cmd_str}')
sys.exit(subprocess.run(cmd + files).returncode) sys.exit(subprocess.run(cmd + files_flatten).returncode)
if __name__ == '__main__': if __name__ == '__main__':