From 4e7d2ec241c3d5bbba735d1d29fd9c70e1ead667 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20M=C3=BAdry?= Date: Wed, 31 Jul 2024 15:32:58 +0200 Subject: [PATCH 1/3] refactor(nvs): nvs_tool.py integrity check refactor --- .../nvs_flash/nvs_partition_tool/nvs_check.py | 373 ++++++++++-------- 1 file changed, 218 insertions(+), 155 deletions(-) diff --git a/components/nvs_flash/nvs_partition_tool/nvs_check.py b/components/nvs_flash/nvs_partition_tool/nvs_check.py index fd38978e11..6051ee80a3 100644 --- a/components/nvs_flash/nvs_partition_tool/nvs_check.py +++ b/components/nvs_flash/nvs_partition_tool/nvs_check.py @@ -4,17 +4,21 @@ from typing import Dict, List from nvs_logger import NVS_Logger -from nvs_parser import NVS_Entry, NVS_Partition, nvs_const +from nvs_parser import nvs_const +from nvs_parser import NVS_Entry +from nvs_parser import NVS_Page +from nvs_parser import NVS_Partition -def integrity_check(nvs_partition: NVS_Partition, nvs_log: NVS_Logger) -> None: - used_namespaces: Dict[int, None] = {} - found_namespaces: Dict[int, str] = {} - blobs: Dict = {} - blob_chunks: List[NVS_Entry] = [] - empty_entry = NVS_Entry(-1, bytearray(32), 'Erased') +EMPTY_ENTRY = NVS_Entry(-1, bytearray(32), 'Erased') - # Partition size check +used_namespaces: Dict[int, None] = {} +found_namespaces: Dict[int, str] = {} +blobs: Dict = {} +blob_chunks: List[NVS_Entry] = [] + + +def check_partition_size(nvs_partition: NVS_Partition, nvs_log: NVS_Logger) -> None: if len(nvs_partition.pages) < 3: nvs_log.info( nvs_log.yellow( @@ -22,7 +26,8 @@ def integrity_check(nvs_partition: NVS_Partition, nvs_log: NVS_Logger) -> None: ) ) - # Free/empty page check + +def check_empty_page_present(nvs_partition: NVS_Partition, nvs_log: NVS_Logger) -> None: if not any(page.header['status'] == 'Empty' for page in nvs_partition.pages): nvs_log.info( nvs_log.red( @@ -32,175 +37,186 @@ at least one free page is required for proper function!''' ) nvs_log.info(nvs_log.red('NVS partition possibly truncated?\n')) - for page in nvs_partition.pages: - # page: NVS_Page - # Print page header - if page.header['status'] == 'Empty': - nvs_log.info(nvs_log.cyan('Page Empty')) +def check_empty_page_content(nvs_page: NVS_Page, nvs_log: NVS_Logger) -> None: + nvs_log.info(nvs_log.cyan(f'Page {nvs_page.header["status"]}')) - # Check if page is truly empty - if page.raw_entry_state_bitmap != bytearray({0xFF}) * nvs_const.entry_size: + if nvs_page.raw_entry_state_bitmap != bytearray({0xFF}) * nvs_const.entry_size: + nvs_log.info( + nvs_log.red( + 'The page is reported as Empty but its entry state bitmap is not empty!' + ) + ) + + if any([not e.is_empty for e in nvs_page.entries]): + nvs_log.info( + nvs_log.red('The page is reported as Empty but there are data written!') + ) + + +def check_page_crc(nvs_page: NVS_Page, nvs_log: NVS_Logger) -> None: + if nvs_page.header['crc']['original'] == nvs_page.header['crc']['computed']: + nvs_log.info( + nvs_log.cyan(f'Page no. {nvs_page.header["page_index"]}'), '\tCRC32: OK' + ) + else: + nvs_log.info( + nvs_log.cyan(f'Page no. {nvs_page.header["page_index"]}'), + f'Original CRC32:', + nvs_log.red(f'{nvs_page.header["crc"]["original"]:x}'), + f'Generated CRC32:', + nvs_log.green(f'{nvs_page.header["crc"]["computed"]:x}'), + ) + + +def identify_entry_duplicates(entry: NVS_Entry, seen_written_entires: Dict[str, list[NVS_Entry]]) -> Dict[str, list[NVS_Entry]]: + if entry.state == 'Written': + if entry.key in seen_written_entires: + seen_written_entires[entry.key].append(entry) + else: + seen_written_entires[entry.key] = [entry] + return seen_written_entires + + +def check_page_entries(nvs_page: NVS_Page, nvs_log: NVS_Logger) -> Dict[str, list[NVS_Entry]]: + seen_written_entires: Dict[str, list[NVS_Entry]] = {} + + for entry in nvs_page.entries: + # entry: NVS_Entry + + # Entries stored in 'page.entries' are primitive data types, blob indexes or string/blob data + + # Variable length values themselves occupy whole 32 bytes (therefore metadata values are meaningless) + # and are stored in as entries inside string/blob data entry 'entry.children' list + + # Duplicate entry check (1) - same key, different index - find duplicates + seen_written_entires = identify_entry_duplicates(entry, seen_written_entires) + + # Entry state check - doesn't check variable length values (metadata such as state are meaningless as all 32 bytes are pure data) + if entry.is_empty: + if entry.state == 'Written': nvs_log.info( nvs_log.red( - 'The page is reported as Empty but its entry state bitmap is not empty!' + f' Entry #{entry.index:03d} is reported as Written but it is empty!' ) ) - if any([not e.is_empty for e in page.entries]): + continue + elif entry.state == 'Erased': nvs_log.info( - nvs_log.red('The page is reported as Empty but there are data written!') + nvs_log.yellow( + f' Entry #{entry.index:03d} is reported as Erased but it is empty! (Only entries reported as Empty should be empty)' + ) ) - else: - # Check page header CRC32 - if page.header['crc']['original'] == page.header['crc']['computed']: + + if entry.state == 'Written': + # Entry CRC32 check + if ( + entry.metadata['crc']['original'] + != entry.metadata['crc']['computed'] + ): nvs_log.info( - nvs_log.cyan(f'Page no. {page.header["page_index"]}'), '\tCRC32: OK' + nvs_log.red( + f' Entry #{entry.index:03d} {entry.key} has wrong CRC32!{"": <5}' + ), + f'Written:', + nvs_log.red(f'{entry.metadata["crc"]["original"]:x}'), + f'Generated:', + nvs_log.green(f'{entry.metadata["crc"]["computed"]:x}'), ) + + # Entry children CRC32 check + if ( + entry.metadata['span'] > 1 + and (entry.metadata['crc']['data_original'] != entry.metadata['crc']['data_computed']) + ): + nvs_log.info( + nvs_log.red( + f' Entry #{entry.index:03d} {entry.key} data (string, blob) has wrong CRC32!' + ), + f'Written:', + nvs_log.red(f'{entry.metadata["crc"]["data_original"]:x}'), + f'Generated:', + nvs_log.green(f'{entry.metadata["crc"]["data_computed"]:x}'), + ) + + # Entry type check + if entry.metadata['type'] not in [ + nvs_const.item_type[key] for key in nvs_const.item_type + ]: + nvs_log.info( + nvs_log.yellow( + f' Type of entry #{entry.index:03d} {entry.key} is unrecognized!' + ), + f'Type: {entry.metadata["type"]}', + ) + + # Span check + if ( + entry.index + entry.metadata['span'] - 1 + >= int(nvs_const.page_size / nvs_const.entry_size) - 2 + ): + nvs_log.info( + nvs_log.red( + f' Variable length entry #{entry.index:03d} {entry.key} is out of bounds!' + ) + ) + # Spanned entry state checks + elif entry.metadata['span'] > 1: + parent_state = entry.state + for kid in entry.children: + if parent_state != kid.state: + nvs_log.info( + nvs_log.yellow(' Inconsistent data state!'), + f'Entry #{entry.index:03d} {entry.key} state: {parent_state},', + f'Data entry #{kid.index:03d} {entry.key} state: {kid.state}', + ) + + # Gather blobs & namespaces + if entry.metadata['type'] == 'blob_index': + blobs[f'{entry.metadata["namespace"]:03d}{entry.key}'] = [entry] + [ + EMPTY_ENTRY + ] * entry.data['chunk_count'] + elif entry.metadata['type'] == 'blob_data': + blob_chunks.append(entry) + + if entry.metadata['namespace'] == 0: + found_namespaces[entry.data['value']] = entry.key else: - nvs_log.info( - nvs_log.cyan(f'Page no. {page.header["page_index"]}'), - f'Original CRC32:', - nvs_log.red(f'{page.header["crc"]["original"]:x}'), - f'Generated CRC32:', - nvs_log.green(f'{page.header["crc"]["computed"]:x}'), - ) + used_namespaces[entry.metadata['namespace']] = None - # Check all entries - seen_written_entires: Dict[str, list[NVS_Entry]] = {} - for entry in page.entries: - # entry: NVS_Entry + return seen_written_entires - # Entries stored in 'page.entries' are primitive data types, blob indexes or string/blob data - # Variable length values themselves occupy whole 32 bytes (therefore metadata values are meaningless) - # and are stored in as entries inside string/blob data entry 'entry.children' list +def filter_entry_duplicates(seen_written_entires: Dict[str, list[NVS_Entry]]) -> List[List[NVS_Entry]]: + duplicate_entries_list = [seen_written_entires[key] for key in seen_written_entires if len(seen_written_entires[key]) > 1] + return duplicate_entries_list - # Duplicate entry check (1) - same key, different index - find duplicates - if entry.state == 'Written': - if entry.key in seen_written_entires: - seen_written_entires[entry.key].append(entry) - else: - seen_written_entires[entry.key] = [entry] - # Entry state check - doesn't check variable length values (metadata such as state are meaningless as all 32 bytes are pure data) - if entry.is_empty: - if entry.state == 'Written': - nvs_log.info( - nvs_log.red( - f' Entry #{entry.index:03d} is reported as Written but it is empty!' - ) - ) - continue - elif entry.state == 'Erased': - nvs_log.info( - nvs_log.yellow( - f' Entry #{entry.index:03d} is reported as Erased but it is empty! (Only entries reported as Empty should be empty)' - ) - ) - - if entry.state == 'Written': - # Entry CRC32 check - if ( - entry.metadata['crc']['original'] - != entry.metadata['crc']['computed'] - ): - nvs_log.info( - nvs_log.red( - f' Entry #{entry.index:03d} {entry.key} has wrong CRC32!{"": <5}' - ), - f'Written:', - nvs_log.red(f'{entry.metadata["crc"]["original"]:x}'), - f'Generated:', - nvs_log.green(f'{entry.metadata["crc"]["computed"]:x}'), - ) - - # Entry children CRC32 check - if ( - entry.metadata['span'] > 1 - and (entry.metadata['crc']['data_original'] != entry.metadata['crc']['data_computed']) - ): - nvs_log.info( - nvs_log.red( - f' Entry #{entry.index:03d} {entry.key} data (string, blob) has wrong CRC32!' - ), - f'Written:', - nvs_log.red(f'{entry.metadata["crc"]["data_original"]:x}'), - f'Generated:', - nvs_log.green(f'{entry.metadata["crc"]["data_computed"]:x}'), - ) - - # Entry type check - if entry.metadata['type'] not in [ - nvs_const.item_type[key] for key in nvs_const.item_type - ]: - nvs_log.info( - nvs_log.yellow( - f' Type of entry #{entry.index:03d} {entry.key} is unrecognized!' - ), - f'Type: {entry.metadata["type"]}', - ) - - # Span check - if ( - entry.index + entry.metadata['span'] - 1 - >= int(nvs_const.page_size / nvs_const.entry_size) - 2 - ): - nvs_log.info( - nvs_log.red( - f' Variable length entry #{entry.index:03d} {entry.key} is out of bounds!' - ) - ) - # Spanned entry state checks - elif entry.metadata['span'] > 1: - parent_state = entry.state - for kid in entry.children: - if parent_state != kid.state: - nvs_log.info( - nvs_log.yellow(' Inconsistent data state!'), - f'Entry #{entry.index:03d} {entry.key} state: {parent_state},', - f'Data entry #{kid.index:03d} {entry.key} state: {kid.state}', - ) - - # Gather blobs & namespaces - if entry.metadata['type'] == 'blob_index': - blobs[f'{entry.metadata["namespace"]:03d}{entry.key}'] = [entry] + [ - empty_entry - ] * entry.data['chunk_count'] - elif entry.metadata['type'] == 'blob_data': - blob_chunks.append(entry) - - if entry.metadata['namespace'] == 0: - found_namespaces[entry.data['value']] = entry.key - else: - used_namespaces[entry.metadata['namespace']] = None - - # Duplicate entry check (2) - same key, different index - print duplicates - duplicate_entries_list = [seen_written_entires[key] for key in seen_written_entires if len(seen_written_entires[key]) > 1] - for duplicate_entries in duplicate_entries_list: - # duplicate_entries: list[NVS_Entry] +def print_entry_duplicates(page: NVS_Page, duplicate_entries_list: List[List[NVS_Entry]], nvs_log: NVS_Logger) -> None: + for duplicate_entries in duplicate_entries_list: + # duplicate_entries: list[NVS_Entry] + nvs_log.info( + nvs_log.red( + f'''Entry key {duplicate_entries[0].key} on page no. {page.header["page_index"]} +with status {page.header["status"]} is used by the following entries:''' + ) + ) + for entry in duplicate_entries: nvs_log.info( nvs_log.red( - f'''Entry key {duplicate_entries[0].key} on page no. {page.header["page_index"]} -with status {page.header["status"]} is used by the following entries:''' + f'Entry #{entry.index:03d} {entry.key} is a duplicate!' ) ) - for entry in duplicate_entries: - nvs_log.info( - nvs_log.red( - f'Entry #{entry.index:03d} {entry.key} is a duplicate!' - ) - ) - nvs_log.info() - # Blob checks - # Assemble blobs +def assemble_blobs(nvs_log: NVS_Logger) -> None: for chunk in blob_chunks: + # chunk: NVS_Entry parent = blobs.get( - f'{chunk.metadata["namespace"]:03d}{chunk.key}', [empty_entry] + f'{chunk.metadata["namespace"]:03d}{chunk.key}', [EMPTY_ENTRY] )[0] # Blob chunk without blob index check - if parent is empty_entry: + if parent is EMPTY_ENTRY: nvs_log.info( nvs_log.red(f'Blob {chunk.key} chunk has no blob index!'), f'Namespace index: {chunk.metadata["namespace"]:03d}', @@ -212,15 +228,19 @@ with status {page.header["status"]} is used by the following entries:''' chunk_index = chunk.metadata['chunk_index'] - parent.data['chunk_start'] blobs[blob_key][chunk_index + 1] = chunk - # Blob data check + # return blobs + + +def check_blob_data(nvs_log: NVS_Logger) -> None: for blob_key in blobs: blob_index = blobs[blob_key][0] blob_chunks = blobs[blob_key][1:] blob_size = blob_index.data['size'] for i, chunk in enumerate(blob_chunks): + # chunk: NVS_Entry # Blob missing chunk check - if chunk is empty_entry: + if chunk is EMPTY_ENTRY: nvs_log.info( nvs_log.red(f'Blob {blob_index.key} is missing a chunk!'), f'Namespace index: {blob_index.metadata["namespace"]:03d}', @@ -237,7 +257,15 @@ with status {page.header["status"]} is used by the following entries:''' f'Namespace index: {blob_index.metadata["namespace"]:03d}', ) - # Namespace checks + +def check_blobs(nvs_log: NVS_Logger) -> None: + # Assemble blobs + assemble_blobs(nvs_log) + # Blob data check + check_blob_data(nvs_log) + + +def check_namespaces(nvs_log: NVS_Logger) -> None: # Undefined namespace index check for used_ns in used_namespaces: key = found_namespaces.pop(used_ns, '') @@ -255,3 +283,38 @@ with status {page.header["status"]} is used by the following entries:''' f'Namespace index: {unused_ns:03d}', f'[{found_namespaces[unused_ns]}]', ) + + +def integrity_check(nvs_partition: NVS_Partition, nvs_log: NVS_Logger) -> None: + # Partition size check + check_partition_size(nvs_partition, nvs_log) + + # Free/empty page check + check_empty_page_present(nvs_partition, nvs_log) + + for page in nvs_partition.pages: + # page: NVS_Page + + # Print page header + if page.header['status'] == 'Empty': + # Check if page is truly empty + check_empty_page_content(page, nvs_log) + else: + # Check page header CRC32 + check_page_crc(page, nvs_log) + + # Check all entries + seen_written_entires = check_page_entries(page, nvs_log) + + # Duplicate entry check (2) - same key, different index - print duplicates + duplicates = filter_entry_duplicates(seen_written_entires) + # Print duplicate entries + print_entry_duplicates(page, duplicates, nvs_log) + + nvs_log.info() # Empty line + + # Blob checks + check_blobs(nvs_log) + + # Namespace checks + check_namespaces(nvs_log) From 6cb20800760fb905a0521561e854ac344562a994 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20M=C3=BAdry?= Date: Tue, 30 Jul 2024 23:22:22 +0200 Subject: [PATCH 2/3] fix(nvs): nvs_tool.py reduce false duplicate warnings --- .../nvs_flash/nvs_partition_tool/nvs_check.py | 153 ++++++++++++++++-- 1 file changed, 136 insertions(+), 17 deletions(-) diff --git a/components/nvs_flash/nvs_partition_tool/nvs_check.py b/components/nvs_flash/nvs_partition_tool/nvs_check.py index 6051ee80a3..608d796512 100644 --- a/components/nvs_flash/nvs_partition_tool/nvs_check.py +++ b/components/nvs_flash/nvs_partition_tool/nvs_check.py @@ -1,7 +1,8 @@ #!/usr/bin/env python3 -# SPDX-FileCopyrightText: 2023 Espressif Systems (Shanghai) CO LTD +# SPDX-FileCopyrightText: 2023-2024 Espressif Systems (Shanghai) CO LTD # SPDX-License-Identifier: Apache-2.0 -from typing import Dict, List +from typing import Dict +from typing import List from nvs_logger import NVS_Logger from nvs_parser import nvs_const @@ -9,6 +10,8 @@ from nvs_parser import NVS_Entry from nvs_parser import NVS_Page from nvs_parser import NVS_Partition +# from pprint import pprint + EMPTY_ENTRY = NVS_Entry(-1, bytearray(32), 'Erased') @@ -18,16 +21,18 @@ blobs: Dict = {} blob_chunks: List[NVS_Entry] = [] -def check_partition_size(nvs_partition: NVS_Partition, nvs_log: NVS_Logger) -> None: +def check_partition_size(nvs_partition: NVS_Partition, nvs_log: NVS_Logger) -> bool: if len(nvs_partition.pages) < 3: nvs_log.info( nvs_log.yellow( 'NVS Partition must contain 3 pages (sectors) at least to function properly!' ) ) + return False + return True -def check_empty_page_present(nvs_partition: NVS_Partition, nvs_log: NVS_Logger) -> None: +def check_empty_page_present(nvs_partition: NVS_Partition, nvs_log: NVS_Logger) -> bool: if not any(page.header['status'] == 'Empty' for page in nvs_partition.pages): nvs_log.info( nvs_log.red( @@ -36,12 +41,16 @@ at least one free page is required for proper function!''' ) ) nvs_log.info(nvs_log.red('NVS partition possibly truncated?\n')) + return False + return True -def check_empty_page_content(nvs_page: NVS_Page, nvs_log: NVS_Logger) -> None: +def check_empty_page_content(nvs_page: NVS_Page, nvs_log: NVS_Logger) -> bool: + result = True nvs_log.info(nvs_log.cyan(f'Page {nvs_page.header["status"]}')) if nvs_page.raw_entry_state_bitmap != bytearray({0xFF}) * nvs_const.entry_size: + result = False nvs_log.info( nvs_log.red( 'The page is reported as Empty but its entry state bitmap is not empty!' @@ -49,16 +58,20 @@ def check_empty_page_content(nvs_page: NVS_Page, nvs_log: NVS_Logger) -> None: ) if any([not e.is_empty for e in nvs_page.entries]): + result = False nvs_log.info( nvs_log.red('The page is reported as Empty but there are data written!') ) + return result -def check_page_crc(nvs_page: NVS_Page, nvs_log: NVS_Logger) -> None: + +def check_page_crc(nvs_page: NVS_Page, nvs_log: NVS_Logger) -> bool: if nvs_page.header['crc']['original'] == nvs_page.header['crc']['computed']: nvs_log.info( nvs_log.cyan(f'Page no. {nvs_page.header["page_index"]}'), '\tCRC32: OK' ) + return True else: nvs_log.info( nvs_log.cyan(f'Page no. {nvs_page.header["page_index"]}'), @@ -67,6 +80,7 @@ def check_page_crc(nvs_page: NVS_Page, nvs_log: NVS_Logger) -> None: f'Generated CRC32:', nvs_log.green(f'{nvs_page.header["crc"]["computed"]:x}'), ) + return False def identify_entry_duplicates(entry: NVS_Entry, seen_written_entires: Dict[str, list[NVS_Entry]]) -> Dict[str, list[NVS_Entry]]: @@ -187,13 +201,111 @@ def check_page_entries(nvs_page: NVS_Page, nvs_log: NVS_Logger) -> Dict[str, lis return seen_written_entires -def filter_entry_duplicates(seen_written_entires: Dict[str, list[NVS_Entry]]) -> List[List[NVS_Entry]]: - duplicate_entries_list = [seen_written_entires[key] for key in seen_written_entires if len(seen_written_entires[key]) > 1] - return duplicate_entries_list +def filter_namespaces_fake_duplicates(duplicate_entries_dict: Dict[str, List[NVS_Entry]]) -> Dict[str, List[NVS_Entry]]: + new_duplicate_entries_dict: Dict[str, List[NVS_Entry]] = {} + for key, duplicate_entries in duplicate_entries_dict.items(): + seen_entries: List[NVS_Entry] = [] + entry_same_namespace_collisions_list: set[NVS_Entry] = set() + + # Search through the "duplicates" and see if there are real duplicates + # E.g. the key can be the same if the namespace is different + for entry in duplicate_entries: + if entry.metadata['type'] in nvs_const.item_type.values(): + + entry_same_namespace_collisions = set() + for other_entry in seen_entries: + if entry.metadata['namespace'] == other_entry.metadata['namespace']: + entry_same_namespace_collisions.add(entry) + entry_same_namespace_collisions.add(other_entry) + + if len(entry_same_namespace_collisions) != 0: + entry_same_namespace_collisions_list.update(entry_same_namespace_collisions) + seen_entries.append(entry) + + # Catch real duplicates + new_duplicate_entries: List[NVS_Entry] = [] + if len(seen_entries) > 1: + for entry in seen_entries: + if entry in entry_same_namespace_collisions_list: + new_duplicate_entries.append(entry) + + if len(new_duplicate_entries) > 0: + new_duplicate_entries_dict[key] = new_duplicate_entries + + return new_duplicate_entries_dict -def print_entry_duplicates(page: NVS_Page, duplicate_entries_list: List[List[NVS_Entry]], nvs_log: NVS_Logger) -> None: - for duplicate_entries in duplicate_entries_list: +def filter_blob_related_duplicates(duplicate_entries_dict: Dict[str, List[NVS_Entry]]) -> Dict[str, List[NVS_Entry]]: + new_duplicate_entries_dict: Dict[str, List[NVS_Entry]] = {} + for key, duplicate_entries in duplicate_entries_dict.items(): + seen_blob_index: List[NVS_Entry] = [] + seen_blob_data: List[NVS_Entry] = [] + seen_another_type_data: List[NVS_Entry] = [] + blob_index_chunk_index_collisions_list: set[NVS_Entry] = set() + blob_data_chunk_index_collisions_list: set[NVS_Entry] = set() + + # Search through the "duplicates" and see if there are real duplicates + # E.g. the key can be the same for blob_index and blob_data + # (and even for more blob_data entries if they have a different chunk_index) + for entry in duplicate_entries: + if entry.metadata['type'] == 'blob_index': + + blob_index_chunk_index_collisions = set() + for other_entry in seen_blob_index: + if entry.metadata['namespace'] == other_entry.metadata['namespace']: + blob_index_chunk_index_collisions.add(entry) + blob_index_chunk_index_collisions.add(other_entry) + + if len(blob_index_chunk_index_collisions) != 0: + blob_index_chunk_index_collisions_list.update(blob_index_chunk_index_collisions) + seen_blob_index.append(entry) + + elif entry.metadata['type'] == 'blob_data': + + blob_data_chunk_index_collisions = set() + for other_entry in seen_blob_data: + if (entry.metadata['namespace'] == other_entry.metadata['namespace'] + and entry.metadata['chunk_index'] == other_entry.metadata['chunk_index']): + blob_data_chunk_index_collisions.add(entry) + blob_data_chunk_index_collisions.add(other_entry) + + if len(blob_data_chunk_index_collisions) != 0: + blob_data_chunk_index_collisions_list.update(blob_data_chunk_index_collisions) + seen_blob_data.append(entry) + + else: + seen_another_type_data.append(entry) + + # Catch real duplicates + new_duplicate_entries: List[NVS_Entry] = [] + if len(seen_blob_index) > 1: + for entry in seen_blob_index: + if entry in blob_index_chunk_index_collisions_list: + new_duplicate_entries.append(entry) + + if len(seen_blob_data) > 1: + for entry in seen_blob_data: + if entry in blob_data_chunk_index_collisions_list: + new_duplicate_entries.append(entry) + + for entry in seen_another_type_data: # If there are any duplicates of other types + new_duplicate_entries.append(entry) + + if len(new_duplicate_entries) > 0: + new_duplicate_entries_dict[key] = new_duplicate_entries + + return new_duplicate_entries_dict + + +def filter_entry_duplicates(seen_written_entires: Dict[str, list[NVS_Entry]]) -> Dict[str, List[NVS_Entry]]: + duplicate_entries_list = {key: v for key, v in seen_written_entires.items() if len(v) > 1} + duplicate_entries_list_1 = filter_namespaces_fake_duplicates(duplicate_entries_list) + duplicate_entries_list_2 = filter_blob_related_duplicates(duplicate_entries_list_1) + return duplicate_entries_list_2 + + +def print_entry_duplicates(page: NVS_Page, duplicate_entries_list: Dict[str, List[NVS_Entry]], nvs_log: NVS_Logger) -> None: + for _, duplicate_entries in duplicate_entries_list.items(): # duplicate_entries: list[NVS_Entry] nvs_log.info( nvs_log.red( @@ -228,8 +340,6 @@ def assemble_blobs(nvs_log: NVS_Logger) -> None: chunk_index = chunk.metadata['chunk_index'] - parent.data['chunk_start'] blobs[blob_key][chunk_index + 1] = chunk - # return blobs - def check_blob_data(nvs_log: NVS_Logger) -> None: for blob_key in blobs: @@ -292,6 +402,8 @@ def integrity_check(nvs_partition: NVS_Partition, nvs_log: NVS_Logger) -> None: # Free/empty page check check_empty_page_present(nvs_partition, nvs_log) + seen_written_entires_all: Dict[str, list[NVS_Entry]] = {} + for page in nvs_partition.pages: # page: NVS_Page @@ -306,10 +418,17 @@ def integrity_check(nvs_partition: NVS_Partition, nvs_log: NVS_Logger) -> None: # Check all entries seen_written_entires = check_page_entries(page, nvs_log) - # Duplicate entry check (2) - same key, different index - print duplicates - duplicates = filter_entry_duplicates(seen_written_entires) - # Print duplicate entries - print_entry_duplicates(page, duplicates, nvs_log) + # Collect all seen written entries + for key in seen_written_entires: + if key in seen_written_entires_all: + seen_written_entires_all[key].extend(seen_written_entires[key]) + else: + seen_written_entires_all[key] = seen_written_entires[key] + + # Duplicate entry check (2) - same key, different index + duplicates = filter_entry_duplicates(seen_written_entires_all) + # Print duplicate entries + print_entry_duplicates(page, duplicates, nvs_log) nvs_log.info() # Empty line From 05b356f87f8d537a79aa4116004423a831469388 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20M=C3=BAdry?= Date: Thu, 1 Aug 2024 04:25:35 +0200 Subject: [PATCH 3/3] feat(nvs): Test nvs_partition_gen.py and nvs_check.py with pytest Little fixes in nvs_check.py --- .gitlab/ci/host-test.yml | 14 + .../nvs_flash/nvs_partition_tool/nvs_check.py | 75 ++-- .../nvs_partition_tool/nvs_parser.py | 1 + .../nvs_flash/nvs_partition_tool/pytest.ini | 12 + .../nvs_partition_tool/test_nvs_gen_check.py | 326 ++++++++++++++++++ 5 files changed, 405 insertions(+), 23 deletions(-) create mode 100644 components/nvs_flash/nvs_partition_tool/pytest.ini create mode 100644 components/nvs_flash/nvs_partition_tool/test_nvs_gen_check.py diff --git a/.gitlab/ci/host-test.yml b/.gitlab/ci/host-test.yml index 109020a66c..1e2657ea6c 100644 --- a/.gitlab/ci/host-test.yml +++ b/.gitlab/ci/host-test.yml @@ -388,3 +388,17 @@ test_idf_build_apps_load_soc_caps: extends: .host_test_template script: - python tools/ci/check_soc_headers_load_in_idf_build_apps.py + +test_nvs_gen_check: + extends: .host_test_template + artifacts: + paths: + - XUNIT_RESULT.xml + - components/nvs_flash/nvs_partition_tool + reports: + junit: XUNIT_RESULT.xml + variables: + LC_ALL: C.UTF-8 + script: + - cd ${IDF_PATH}/components/nvs_flash/nvs_partition_tool + - pytest --noconftest test_nvs_gen_check.py --junitxml=XUNIT_RESULT.xml diff --git a/components/nvs_flash/nvs_partition_tool/nvs_check.py b/components/nvs_flash/nvs_partition_tool/nvs_check.py index 608d796512..b6a4da3aba 100644 --- a/components/nvs_flash/nvs_partition_tool/nvs_check.py +++ b/components/nvs_flash/nvs_partition_tool/nvs_check.py @@ -3,6 +3,8 @@ # SPDX-License-Identifier: Apache-2.0 from typing import Dict from typing import List +from typing import Optional +from typing import Set from nvs_logger import NVS_Logger from nvs_parser import nvs_const @@ -10,12 +12,10 @@ from nvs_parser import NVS_Entry from nvs_parser import NVS_Page from nvs_parser import NVS_Partition -# from pprint import pprint - EMPTY_ENTRY = NVS_Entry(-1, bytearray(32), 'Erased') -used_namespaces: Dict[int, None] = {} +used_namespaces: Dict[int, Optional[str]] = {} found_namespaces: Dict[int, str] = {} blobs: Dict = {} blob_chunks: List[NVS_Entry] = [] @@ -83,7 +83,7 @@ def check_page_crc(nvs_page: NVS_Page, nvs_log: NVS_Logger) -> bool: return False -def identify_entry_duplicates(entry: NVS_Entry, seen_written_entires: Dict[str, list[NVS_Entry]]) -> Dict[str, list[NVS_Entry]]: +def identify_entry_duplicates(entry: NVS_Entry, seen_written_entires: Dict[str, List[NVS_Entry]]) -> Dict[str, List[NVS_Entry]]: if entry.state == 'Written': if entry.key in seen_written_entires: seen_written_entires[entry.key].append(entry) @@ -92,11 +92,12 @@ def identify_entry_duplicates(entry: NVS_Entry, seen_written_entires: Dict[str, return seen_written_entires -def check_page_entries(nvs_page: NVS_Page, nvs_log: NVS_Logger) -> Dict[str, list[NVS_Entry]]: - seen_written_entires: Dict[str, list[NVS_Entry]] = {} +def check_page_entries(nvs_page: NVS_Page, nvs_log: NVS_Logger) -> Dict[str, List[NVS_Entry]]: + seen_written_entires: Dict[str, List[NVS_Entry]] = {} for entry in nvs_page.entries: # entry: NVS_Entry + entry.page = nvs_page # Entries stored in 'page.entries' are primitive data types, blob indexes or string/blob data @@ -205,7 +206,7 @@ def filter_namespaces_fake_duplicates(duplicate_entries_dict: Dict[str, List[NVS new_duplicate_entries_dict: Dict[str, List[NVS_Entry]] = {} for key, duplicate_entries in duplicate_entries_dict.items(): seen_entries: List[NVS_Entry] = [] - entry_same_namespace_collisions_list: set[NVS_Entry] = set() + entry_same_namespace_collisions_list: Set[NVS_Entry] = set() # Search through the "duplicates" and see if there are real duplicates # E.g. the key can be the same if the namespace is different @@ -241,8 +242,8 @@ def filter_blob_related_duplicates(duplicate_entries_dict: Dict[str, List[NVS_En seen_blob_index: List[NVS_Entry] = [] seen_blob_data: List[NVS_Entry] = [] seen_another_type_data: List[NVS_Entry] = [] - blob_index_chunk_index_collisions_list: set[NVS_Entry] = set() - blob_data_chunk_index_collisions_list: set[NVS_Entry] = set() + blob_index_chunk_index_collisions_list: Set[NVS_Entry] = set() + blob_data_chunk_index_collisions_list: Set[NVS_Entry] = set() # Search through the "duplicates" and see if there are real duplicates # E.g. the key can be the same for blob_index and blob_data @@ -297,26 +298,44 @@ def filter_blob_related_duplicates(duplicate_entries_dict: Dict[str, List[NVS_En return new_duplicate_entries_dict -def filter_entry_duplicates(seen_written_entires: Dict[str, list[NVS_Entry]]) -> Dict[str, List[NVS_Entry]]: +def filter_entry_duplicates(seen_written_entires: Dict[str, List[NVS_Entry]]) -> Dict[str, List[NVS_Entry]]: duplicate_entries_list = {key: v for key, v in seen_written_entires.items() if len(v) > 1} duplicate_entries_list_1 = filter_namespaces_fake_duplicates(duplicate_entries_list) duplicate_entries_list_2 = filter_blob_related_duplicates(duplicate_entries_list_1) return duplicate_entries_list_2 -def print_entry_duplicates(page: NVS_Page, duplicate_entries_list: Dict[str, List[NVS_Entry]], nvs_log: NVS_Logger) -> None: +def print_entry_duplicates(duplicate_entries_list: Dict[str, List[NVS_Entry]], nvs_log: NVS_Logger) -> None: + if len(duplicate_entries_list) > 0: + nvs_log.info(nvs_log.red('Found duplicate entries:')) + nvs_log.info(nvs_log.red('Entry\tKey\t\t\tType\t\tNamespace idx\tPage\tPage status')) + for _, duplicate_entries in duplicate_entries_list.items(): - # duplicate_entries: list[NVS_Entry] - nvs_log.info( - nvs_log.red( - f'''Entry key {duplicate_entries[0].key} on page no. {page.header["page_index"]} -with status {page.header["status"]} is used by the following entries:''' - ) - ) + # duplicate_entries: List[NVS_Entry] for entry in duplicate_entries: + # entry: NVS_Entry + if entry.metadata['namespace'] == 0: + entry_type = f'namespace ({entry.data["value"]})' + else: + entry_type = entry.metadata['type'] + + if entry.page is not None: + page_num = entry.page.header['page_index'] + page_status = entry.page.header['status'] + else: + page_num = 'Unknown' + page_status = 'Unknown' + + entry_key_tab_cnt = len(entry.key) // 8 + entry_key_tab = '\t' * (3 - entry_key_tab_cnt) + + namespace_tab_cnt = len(entry_type) // 8 + namepace_tab = '\t' * (2 - namespace_tab_cnt) + namespace_str = f'{entry.metadata["namespace"]}' + nvs_log.info( nvs_log.red( - f'Entry #{entry.index:03d} {entry.key} is a duplicate!' + f'#{entry.index:03d}\t{entry.key}{entry_key_tab}{entry_type}{namepace_tab}{namespace_str}\t\t{page_num}\t{page_status}' ) ) @@ -378,8 +397,8 @@ def check_blobs(nvs_log: NVS_Logger) -> None: def check_namespaces(nvs_log: NVS_Logger) -> None: # Undefined namespace index check for used_ns in used_namespaces: - key = found_namespaces.pop(used_ns, '') - if key == '': + key = found_namespaces.pop(used_ns, None) + if key is None: nvs_log.info( nvs_log.red('Undefined namespace index!'), f'Namespace index: {used_ns:03d}', @@ -395,6 +414,14 @@ def check_namespaces(nvs_log: NVS_Logger) -> None: ) +def reset_global_variables() -> None: + global used_namespaces, found_namespaces, blobs, blob_chunks + used_namespaces = {} + found_namespaces = {} + blobs = {} + blob_chunks = [] + + def integrity_check(nvs_partition: NVS_Partition, nvs_log: NVS_Logger) -> None: # Partition size check check_partition_size(nvs_partition, nvs_log) @@ -402,7 +429,7 @@ def integrity_check(nvs_partition: NVS_Partition, nvs_log: NVS_Logger) -> None: # Free/empty page check check_empty_page_present(nvs_partition, nvs_log) - seen_written_entires_all: Dict[str, list[NVS_Entry]] = {} + seen_written_entires_all: Dict[str, List[NVS_Entry]] = {} for page in nvs_partition.pages: # page: NVS_Page @@ -428,7 +455,7 @@ def integrity_check(nvs_partition: NVS_Partition, nvs_log: NVS_Logger) -> None: # Duplicate entry check (2) - same key, different index duplicates = filter_entry_duplicates(seen_written_entires_all) # Print duplicate entries - print_entry_duplicates(page, duplicates, nvs_log) + print_entry_duplicates(duplicates, nvs_log) nvs_log.info() # Empty line @@ -437,3 +464,5 @@ def integrity_check(nvs_partition: NVS_Partition, nvs_log: NVS_Logger) -> None: # Namespace checks check_namespaces(nvs_log) + + reset_global_variables() diff --git a/components/nvs_flash/nvs_partition_tool/nvs_parser.py b/components/nvs_flash/nvs_partition_tool/nvs_parser.py index 759717d69b..29b610b33b 100644 --- a/components/nvs_flash/nvs_partition_tool/nvs_parser.py +++ b/components/nvs_flash/nvs_partition_tool/nvs_parser.py @@ -219,6 +219,7 @@ class NVS_Entry: self.state = entry_state self.is_empty = self.raw == bytearray({0xFF}) * nvs_const.entry_size self.index = index + self.page = None namespace = self.raw[0] entry_type = self.raw[1] diff --git a/components/nvs_flash/nvs_partition_tool/pytest.ini b/components/nvs_flash/nvs_partition_tool/pytest.ini new file mode 100644 index 0000000000..d95e773e5c --- /dev/null +++ b/components/nvs_flash/nvs_partition_tool/pytest.ini @@ -0,0 +1,12 @@ +[pytest] +addopts = -s -p no:pytest_embedded + +# log related +log_cli = True +log_cli_level = INFO +log_cli_format = %(asctime)s %(levelname)s %(message)s +log_cli_date_format = %Y-%m-%d %H:%M:%S + +## log all to `system-out` when case fail +junit_logging = stdout +junit_log_passing_tests = False diff --git a/components/nvs_flash/nvs_partition_tool/test_nvs_gen_check.py b/components/nvs_flash/nvs_partition_tool/test_nvs_gen_check.py new file mode 100644 index 0000000000..380943b481 --- /dev/null +++ b/components/nvs_flash/nvs_partition_tool/test_nvs_gen_check.py @@ -0,0 +1,326 @@ +#!/usr/bin/env python3 +# SPDX-FileCopyrightText: 2024 Espressif Systems (Shanghai) CO LTD +# SPDX-License-Identifier: Apache-2.0 +from io import BufferedRandom +from io import BytesIO +from pathlib import Path +from typing import Any +from typing import Callable +from typing import Dict +from typing import List +from typing import Optional +from typing import Union +from zlib import crc32 + +import esp_idf_nvs_partition_gen.nvs_partition_gen as nvs_partition_gen +import nvs_check as nvs_check +import pytest +from esp_idf_nvs_partition_gen.nvs_partition_gen import NVS +from nvs_logger import nvs_log +from nvs_logger import NVS_Logger +from nvs_parser import nvs_const +from nvs_parser import NVS_Entry +from nvs_parser import NVS_Partition + + +class SilentLogger(NVS_Logger): + def __init__(self) -> None: + super().__init__() + self.color = False + + def info(self, *args, **kwargs) -> None: # type: ignore + pass + + +logger = nvs_log # SilentLogger() + +LOREM_STRING = '''Lorem ipsum dolor sit amet, consectetur adipiscing elit. +Nullam eget orci fringilla, cursus nisi sit amet, hendrerit tortor. +Vivamus lectus dolor, rhoncus eget metus id, convallis placerat quam. +Nulla facilisi. +In at aliquam nunc, in dictum augue. +Nullam dapibus ligula nec enim commodo lobortis. +Praesent facilisis ante nec magna various lobortis. +Phasellus sodales sed nisi vitae pulvinar. +Aliquam tempor quis sem et tempor. +Etiam interdum nunc quis justo pharetra, sed finibus arcu lacinia. +Suspendisse potenti. +Praesent et turpis ut justo accumsan pellentesque sed at leo. +Aenean consequat ligula ac mattis porta. +Nullam id justo a arcu tincidunt sodales. +Nunc rhoncus pretium nibh ut convallis. +Maecenas orci enim, tincidunt eget vestibulum eu, placerat non ante. +Proin sit amet felis tempor, ullamcorper sem sed, scelerisque nibh. +Aliquam sit amet semper leo, in fringilla nulla. +Vestibulum sit amet tortor tincidunt, laoreet risus eget, ullamcorper sapien. +Fusce non finibus nisl. Cras vitae dui nibh. +Sed fermentum ullamcorper various. +Integer sit amet elit sed nunc fringilla molestie nec nec diam. +Etiam et ornare tellus. +Donec tristique auctor urna, ac aliquam tellus sodales id. +Duis nec magna eget mi consequat gravida. +Vestibulum ante ipsum primis in faucibus orci luctus et ultrices posuere cubilia curae; +Name imperdiet ante neque, nec viverra sem pellentesque vel. +Sed nec arcu non nisl tempor pretium. +Quisque facilisis auctor lobortis. +Pellentesque sed finibus sem, eu lacinia tellus. +Vivamus imperdiet non augue in tincidunt. +Sed aliquet tincidunt dignissim. +Name vehicula leo eu dolor pellentesque, ultrices tempus ex hendrerit. +''' + + +def get_entry_type_bin(entry_type_str: str) -> Optional[int]: + # Reverse `item_type` dict lookup + entry_type_bin: Optional[int] = next(key for key, value in nvs_const.item_type.items() if value == entry_type_str) + if entry_type_bin is None: + logger.info(logger.yellow(f'Unknown entry type {entry_type_str}')) + return entry_type_bin + + +def create_entry_data_bytearray(namespace_index: int, entry_type: int, span: int, chunk_index: int, key: str, data: Any) -> bytearray: + key_bytearray = bytearray(key, 'ascii') + key_encoded = (key_bytearray + bytearray({0x00}) * (16 - len(key_bytearray)))[:16] # Pad key with null bytes + key_encoded[15] = 0x00 # Null-terminate the key + is_signed = entry_type >= 0x11 and entry_type <= 0x18 + + entry_data: bytearray = bytearray({0xFF}) * nvs_const.entry_size # Empty entry + entry_data[0] = namespace_index + entry_data[1] = entry_type + entry_data[2] = span + entry_data[3] = chunk_index + # entry_data[4:8] # CRC32 + entry_data[8:24] = key_encoded + entry_data[24:32] = data.to_bytes(8, byteorder='little', signed=is_signed) + + raw_without_crc = entry_data[:4] + entry_data[8:32] + entry_data[4:8] = crc32(raw_without_crc, 0xFFFFFFFF).to_bytes(4, byteorder='little', signed=False) + + return entry_data + + +@pytest.fixture +def generate_nvs() -> Callable: + def _execute_nvs_setup(nvs_setup_func: Callable, size: int = 0x4000, output: Optional[Path] = None) -> NVS_Partition: + nvs_file: Optional[Union[BytesIO, BufferedRandom]] = None + if output is None: + nvs_file = BytesIO() + else: + try: + nvs_file = open(output, 'wb+') + except OSError as e: + raise RuntimeError(f'Cannot open file {output}, error: {e}') + size_fixed = nvs_partition_gen.check_size(str(size)) + nvs_obj = nvs_partition_gen.nvs_open( + result_obj=nvs_file, + input_size=size_fixed, + version=nvs_partition_gen.Page.VERSION2, + is_encrypt=False, + key=None + ) + nvs_setup_func(nvs_obj) + nvs_partition_gen.nvs_close(nvs_obj) + nvs_file.seek(0) + nvs_parsed = NVS_Partition('test', bytearray(nvs_file.read())) + nvs_file.close() + return nvs_parsed + return _execute_nvs_setup + + +# Setup functions +def setup_ok_primitive(nvs_obj: NVS) -> None: + nvs_partition_gen.write_entry(nvs_obj, 'storage', 'namespace', '', '') + nvs_partition_gen.write_entry(nvs_obj, 'int32_test', 'data', 'i32', str(42)) + nvs_partition_gen.write_entry(nvs_obj, 'uint32_test', 'data', 'u32', str(42)) + nvs_partition_gen.write_entry(nvs_obj, 'int8_test', 'data', 'i8', str(100)) + + +def setup_ok_variable_len(nvs_obj: NVS) -> None: + size_fixed = nvs_partition_gen.check_size(str('0x5000')) + nvs_obj.size = size_fixed + nvs_partition_gen.write_entry(nvs_obj, 'storage', 'namespace', '', '') + nvs_partition_gen.write_entry(nvs_obj, 'short_string_key', 'data', 'string', 'Hello world!') + nvs_partition_gen.write_entry(nvs_obj, 'blob_key', 'file', 'binary', + '../nvs_partition_generator/testdata/sample_blob.bin') + nvs_partition_gen.write_entry(nvs_obj, 'lorem_string_key', 'data', 'string', LOREM_STRING * 2) + nvs_partition_gen.write_entry(nvs_obj, 'uniq_string_key', 'data', 'string', 'I am unique!') + nvs_partition_gen.write_entry(nvs_obj, 'multi_blob_key', 'file', 'binary', + '../nvs_partition_generator/testdata/sample_multipage_blob.bin') + + +def setup_ok_mixed(nvs_obj: NVS) -> None: + size_fixed = nvs_partition_gen.check_size(str('0x6000')) + nvs_obj.size = size_fixed + prim_types = ['i8', 'u8', 'i16', 'u16', 'i32', 'u32'] + + nvs_partition_gen.write_entry(nvs_obj, 'storage', 'namespace', '', '') + for i in range(20): + nvs_partition_gen.write_entry(nvs_obj, f'test_{i}', 'data', prim_types[i % len(prim_types)], str(i)) + nvs_partition_gen.write_entry(nvs_obj, 'blob_key', 'file', 'binary', + '../nvs_partition_generator/testdata/sample_singlepage_blob.bin') + + nvs_partition_gen.write_entry(nvs_obj, 'etc', 'namespace', '', '') + for i in range(20): + nvs_partition_gen.write_entry(nvs_obj, f'test_{i}', 'data', prim_types[i % len(prim_types)], str(i)) + + nvs_partition_gen.write_entry(nvs_obj, 'lorem_string_key', 'data', 'string', LOREM_STRING * 2) + + nvs_partition_gen.write_entry(nvs_obj, 'abcd', 'namespace', '', '') + for i in range(20): + nvs_partition_gen.write_entry(nvs_obj, f'test_{i}', 'data', prim_types[i % len(prim_types)], str(i)) + + nvs_partition_gen.write_entry(nvs_obj, 'uniq_string_key', 'data', 'string', 'I am unique!') + nvs_partition_gen.write_entry(nvs_obj, 'blob_key', 'file', 'binary', + '../nvs_partition_generator/testdata/sample_multipage_blob.bin') + + +def setup_bad_mixed_same_key_different_page(nvs_obj: NVS) -> None: + size_fixed = nvs_partition_gen.check_size(str('0x6000')) + nvs_obj.size = size_fixed + prim_types = ['i8', 'u8', 'i16', 'u16', 'i32', 'u32'] + + nvs_partition_gen.write_entry(nvs_obj, 'storage', 'namespace', '', '') + for i in range(20): + nvs_partition_gen.write_entry(nvs_obj, f'test_{i}', 'data', prim_types[i % len(prim_types)], str(i)) + nvs_partition_gen.write_entry(nvs_obj, 'blob_key', 'file', 'binary', + '../nvs_partition_generator/testdata/sample_singlepage_blob.bin') + + nvs_partition_gen.write_entry(nvs_obj, 'etc', 'namespace', '', '') + for i in range(20): + nvs_partition_gen.write_entry(nvs_obj, f'test_{i}', 'data', prim_types[i % len(prim_types)], str(i)) + + nvs_partition_gen.write_entry(nvs_obj, 'lorem_string_key', 'data', 'string', LOREM_STRING * 2) + + nvs_partition_gen.write_entry(nvs_obj, 'uniq_string_key', 'data', 'string', 'I am unique!') + nvs_partition_gen.write_entry(nvs_obj, 'blob_key', 'file', 'binary', + '../nvs_partition_generator/testdata/sample_multipage_blob.bin') + + # Should be on a different page already - start creating duplicates + + for i in range(6): + data_type = prim_types[i % len(prim_types)] + nvs_partition_gen.write_entry(nvs_obj, f'test_{i}', 'data', data_type, str(i)) # Conflicting keys under "abcd" namespace - 6 duplicates + nvs_partition_gen.write_entry(nvs_obj, 'lorem_string_key', 'data', 'string', 'abc') # Conflicting key for string - 7th duplicate + nvs_partition_gen.write_entry(nvs_obj, 'storage', 'namespace', '', '') # Conflicting namespace - 8th duplicate + nvs_partition_gen.write_entry(nvs_obj, 'storage2', 'namespace', '', '') # New namespace, ignored + nvs_partition_gen.write_entry(nvs_obj, 'lorem_string_key', 'data', 'string', 'abc') # Should be ignored as is under different "storage2" namespace + nvs_partition_gen.write_entry(nvs_obj, 'lorem_string', 'data', 'string', 'abc') # 3 conflicting keys under "storage2" namespace - 9th duplicate + nvs_partition_gen.write_entry(nvs_obj, 'lorem_string', 'data', 'string', 'def') + nvs_partition_gen.write_entry(nvs_obj, 'lorem_string', 'data', 'string', '123') + + +def setup_bad_same_key_primitive(nvs_obj: NVS) -> None: + nvs_partition_gen.write_entry(nvs_obj, 'storage', 'namespace', '', '') + nvs_partition_gen.write_entry(nvs_obj, 'unique_key', 'data', 'i16', str(1234)) + nvs_partition_gen.write_entry(nvs_obj, 'same_key', 'data', 'i32', str(42)) + nvs_partition_gen.write_entry(nvs_obj, 'same_key', 'data', 'u32', str(24)) + nvs_partition_gen.write_entry(nvs_obj, 'same_key', 'data', 'i8', str(-5)) + nvs_partition_gen.write_entry(nvs_obj, 'another_same_key', 'data', 'u16', str(321)) + nvs_partition_gen.write_entry(nvs_obj, 'another_same_key', 'data', 'u16', str(456)) + + +def setup_bad_same_key_variable_len(nvs_obj: NVS) -> None: + nvs_partition_gen.write_entry(nvs_obj, 'storage', 'namespace', '', '') + nvs_partition_gen.write_entry(nvs_obj, 'same_string_key', 'data', 'string', 'Hello') + nvs_partition_gen.write_entry(nvs_obj, 'same_string_key', 'data', 'string', 'world!') + nvs_partition_gen.write_entry(nvs_obj, 'unique_string_key', 'data', 'string', 'I am unique!') + + +def setup_bad_same_key_blob_index(nvs_obj: NVS) -> None: + size_fixed = nvs_partition_gen.check_size(str('0x6000')) + nvs_obj.size = size_fixed + nvs_partition_gen.write_entry(nvs_obj, 'storage', 'namespace', '', '') + nvs_partition_gen.write_entry(nvs_obj, 'blob_key', 'file', 'binary', + '../nvs_partition_generator/testdata/sample_multipage_blob.bin') + nvs_partition_gen.write_entry(nvs_obj, 'blob_key_2', 'file', 'binary', + '../nvs_partition_generator/testdata/sample_multipage_blob.bin') + nvs_partition_gen.write_entry(nvs_obj, 'blob_key', 'file', 'binary', + '../nvs_partition_generator/testdata/sample_multipage_blob.bin') # Duplicate key + + +# Helper functions +def prepare_duplicate_list(nvs: NVS_Partition) -> Dict[str, List[NVS_Entry]]: + seen_written_entires_all: Dict[str, List[NVS_Entry]] = {} + for page in nvs.pages: + # page: NVS_Page + for entry in page.entries: + # entry: NVS_Entry + # Duplicate entry check (1) - same key, different index - find duplicates + seen_written_entires_all = nvs_check.identify_entry_duplicates(entry, seen_written_entires_all) + # Duplicate entry check (2) - same key, different index + duplicates: Dict[str, List[NVS_Entry]] = nvs_check.filter_entry_duplicates(seen_written_entires_all) + return duplicates + + +# Tests +@pytest.mark.parametrize('setup_func', [setup_ok_primitive, setup_ok_variable_len, setup_ok_mixed]) +def test_check_partition_size(generate_nvs: Callable, setup_func: Callable) -> None: + nvs = generate_nvs(setup_func) + assert nvs_check.check_partition_size(nvs, logger) + + +@pytest.mark.parametrize('setup_func', [setup_ok_primitive, setup_ok_variable_len, setup_ok_mixed]) +def test_check_empty_page_present(generate_nvs: Callable, setup_func: Callable) -> None: + nvs = generate_nvs(setup_func) + assert nvs_check.check_empty_page_present(nvs, logger) + + +@pytest.mark.parametrize('setup_func', [setup_ok_primitive, setup_ok_variable_len, setup_ok_mixed]) +def test_check_empty_page_content__check_page_crc(generate_nvs: Callable, setup_func: Callable) -> None: + nvs = generate_nvs(setup_func) + for page in nvs.pages: + if page.header['status'] == 'Empty': + assert page.is_empty + assert nvs_check.check_empty_page_content(page, logger) + else: + assert not page.is_empty + assert nvs_check.check_page_crc(page, logger) + + +@pytest.mark.parametrize('setup_func', [setup_ok_primitive, setup_ok_variable_len, setup_ok_mixed]) +def test_check_duplicates_ok(generate_nvs: Callable, setup_func: Callable) -> None: + nvs = generate_nvs(setup_func) + duplicates = prepare_duplicate_list(nvs) + assert len(duplicates) == 0 # No duplicates in any page + + +@pytest.mark.parametrize('setup_func', [setup_bad_same_key_primitive]) +def test_check_duplicates_bad_same_key_primitive_type(generate_nvs: Callable, setup_func: Callable) -> None: + nvs = generate_nvs(setup_func) + duplicates = prepare_duplicate_list(nvs) + assert len(duplicates) == 2 # 2 different lists of duplicate keys + assert len(list(duplicates.values())[0]) == 3 # 3 entries with the same_key + assert len(list(duplicates.values())[1]) == 2 # 2 entries with the another_same_key + nvs_check.integrity_check(nvs, logger) + + +@pytest.mark.parametrize('setup_func', [setup_bad_same_key_variable_len]) +def test_check_duplicates_bad_same_key_variable_len_type(generate_nvs: Callable, setup_func: Callable) -> None: + nvs = generate_nvs(setup_func) + duplicates = prepare_duplicate_list(nvs) + assert len(duplicates) == 1 # Only one duplicate key list + assert len(list(duplicates.values())[0]) == 2 # 2 entries with the same_string_key + nvs_check.integrity_check(nvs, logger) + + +@pytest.mark.parametrize('setup_func', [setup_bad_mixed_same_key_different_page]) +def test_check_duplicates_bad_same_key_different_pages(generate_nvs: Callable, setup_func: Callable) -> None: + nvs = generate_nvs(setup_func) + duplicates = prepare_duplicate_list(nvs) + assert len(duplicates) == 9 # 9 duplicate keys in total (8 pairs of 2 duplicates + 1 triplet) + for i, value in enumerate(list(duplicates.values())): + if i < 8: + assert len(value) == 2 # i in range 0-7 -- pairs of 2 entries with the same key + else: + assert len(value) == 3 # i == 8 -- 3 entries with the lorem_string key + nvs_check.integrity_check(nvs, logger) + + +@pytest.mark.parametrize('setup_func', [setup_bad_same_key_blob_index]) +def test_check_duplicates_bad_same_key_blob_index(generate_nvs: Callable, setup_func: Callable) -> None: + nvs = generate_nvs(setup_func) + duplicates = prepare_duplicate_list(nvs) + assert len(duplicates) == 1 # Only one duplicate key list - blob_index and blob_data share the same key (which is OK), + # however there are 2 duplicates of each blob_index and blob_data + assert len(list(duplicates.values())[0]) == 6 # 6 entries with the blob_key (2x blob_index, 4x blob_data) + nvs_check.integrity_check(nvs, logger)