From 0f160c8f11ddf6e246ecb7f6c3d83a6d51a38996 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Martin=20Ga=C5=88o?= Date: Mon, 23 May 2022 15:10:46 +0200 Subject: [PATCH] fatfs: enable long file names for fatfsparse.py --- components/fatfs/fatfs_utils/cluster.py | 3 - components/fatfs/fatfs_utils/entry.py | 30 ++++- components/fatfs/fatfs_utils/fat.py | 17 +++ components/fatfs/fatfs_utils/fs_object.py | 8 +- .../fatfs/fatfs_utils/long_filename_utils.py | 6 +- components/fatfs/fatfs_utils/utils.py | 2 +- components/fatfs/fatfsgen.py | 1 - components/fatfs/fatfsparse.py | 125 ++++++++++-------- .../fatfs/test_fatfsgen/test_fatfsparse.py | 38 +++++- docs/en/api-reference/storage/fatfs.rst | 6 +- 10 files changed, 154 insertions(+), 82 deletions(-) diff --git a/components/fatfs/fatfs_utils/cluster.py b/components/fatfs/fatfs_utils/cluster.py index 1b15dd9c63..4ea6c315e6 100644 --- a/components/fatfs/fatfs_utils/cluster.py +++ b/components/fatfs/fatfs_utils/cluster.py @@ -30,7 +30,6 @@ class Cluster: cluster_id: int, boot_sector_state: BootSectorState, init_: bool) -> None: - self.id: int = cluster_id self.boot_sector_state: BootSectorState = boot_sector_state @@ -40,7 +39,6 @@ class Cluster: if self.id == Cluster.RESERVED_BLOCK_ID and init_: self.set_in_fat(self.INITIAL_BLOCK_SWITCH[self.boot_sector_state.fatfs_type]) return - self.cluster_data_address: int = self._compute_cluster_data_address() assert self.cluster_data_address @@ -143,7 +141,6 @@ class Cluster: 2. if the cluster index is odd, we set the first half of the computed byte and the full consequent byte. Order of half bytes is 1, 3, 2. """ - # value must fit into number of bits of the fat (12, 16 or 32) assert value <= (1 << self.boot_sector_state.fatfs_type) - 1 half_bytes = split_by_half_byte_12_bit_little_endian(value) diff --git a/components/fatfs/fatfs_utils/entry.py b/components/fatfs/fatfs_utils/entry.py index 84acb74553..8dcc99f5f7 100644 --- a/components/fatfs/fatfs_utils/entry.py +++ b/components/fatfs/fatfs_utils/entry.py @@ -19,8 +19,8 @@ class Entry: ATTR_HIDDEN: int = 0x02 ATTR_SYSTEM: int = 0x04 ATTR_VOLUME_ID: int = 0x08 - ATTR_DIRECTORY: int = 0x10 - ATTR_ARCHIVE: int = 0x20 + ATTR_DIRECTORY: int = 0x10 # directory + ATTR_ARCHIVE: int = 0x20 # file ATTR_LONG_NAME: int = ATTR_READ_ONLY | ATTR_HIDDEN | ATTR_SYSTEM | ATTR_VOLUME_ID # indexes in the entry structure and sizes in bytes, not in characters (encoded using 2 bytes for lfn) @@ -35,6 +35,8 @@ class Entry: CHARS_PER_ENTRY: int = LDIR_Name1_SIZE + LDIR_Name2_SIZE + LDIR_Name3_SIZE SHORT_ENTRY: int = -1 + + # this value is used for short-like entry but with accepted lower case SHORT_ENTRY_LN: int = 0 # The 1st January 1980 00:00:00 @@ -67,6 +69,11 @@ class Entry: self._is_alias: bool = False self._is_empty: bool = True + @staticmethod + def get_cluster_id(obj_: dict) -> int: + cluster_id_: int = obj_['DIR_FstClusLO'] + return cluster_id_ + @property def is_empty(self) -> bool: return self._is_empty @@ -82,7 +89,7 @@ class Entry: return entry_ @staticmethod - def _build_entry_long(names: List[bytes], checksum: int, order: int, is_last: bool, entity_type: int) -> bytes: + def _build_entry_long(names: List[bytes], checksum: int, order: int, is_last: bool) -> bytes: """ Long entry starts with 1 bytes of the order, if the entry is the last in the chain it is or-masked with 0x40, otherwise is without change (or masked with 0x00). The following example shows 3 entries: @@ -99,7 +106,7 @@ class Entry: order |= (0x40 if is_last else 0x00) long_entry: bytes = (Int8ul.build(order) + # order of the long name entry (possibly masked with 0x40) names[0] + # first 5 characters (10 bytes) of the name part - Int8ul.build(entity_type) + # one byte entity type ATTR_LONG_NAME + Int8ul.build(Entry.ATTR_LONG_NAME) + # one byte entity type ATTR_LONG_NAME Int8ul.build(0) + # one byte of zeros Int8ul.build(checksum) + # lfn_checksum defined in utils.py names[1] + # next 6 characters (12 bytes) of the name part @@ -107,6 +114,18 @@ class Entry: names[2]) # last 2 characters (4 bytes) of the name part return long_entry + @staticmethod + def parse_entry_long(entry_bytes_: bytes, my_check: int) -> dict: + order_ = Int8ul.parse(entry_bytes_[0:1]) + names0 = entry_bytes_[1:11] + if Int8ul.parse(entry_bytes_[12:13]) != 0 or Int16ul.parse(entry_bytes_[26:28]) != 0 or Int8ul.parse(entry_bytes_[11:12]) != 15: + return {} + if Int8ul.parse(entry_bytes_[13:14]) != my_check: + return {} + names1 = entry_bytes_[14:26] + names2 = entry_bytes_[28:32] + return {'order': order_, 'name1': names0, 'name2': names1, 'name3': names2, 'is_last': bool(order_ & 0x40 == 0x40)} + @property def entry_bytes(self) -> bytes: """ @@ -207,8 +226,7 @@ class Entry: self.fatfs_state.binary_image[start_address: end_address] = self._build_entry_long(lfn_names, lfn_checksum_, lfn_order, - lfn_is_last, - self.ATTR_LONG_NAME) + lfn_is_last) def update_content_size(self, content_size: int) -> None: """ diff --git a/components/fatfs/fatfs_utils/fat.py b/components/fatfs/fatfs_utils/fat.py index c3d5e3fcac..d61d32f729 100644 --- a/components/fatfs/fatfs_utils/fat.py +++ b/components/fatfs/fatfs_utils/fat.py @@ -38,6 +38,23 @@ class FAT: is_cluster_last_: bool = value_ == (1 << self.boot_sector_state.fatfs_type) - 1 return is_cluster_last_ + def chain_content(self, cluster_id_: int) -> bytearray: + bin_im: bytearray = self.boot_sector_state.binary_image + if self.is_cluster_last(cluster_id_): + data_address_ = Cluster.compute_cluster_data_address(self.boot_sector_state, cluster_id_) + content_: bytearray = bin_im[data_address_: data_address_ + self.boot_sector_state.sector_size] + return content_ + fat_value_: int = self.get_cluster_value(cluster_id_) + data_address_ = Cluster.compute_cluster_data_address(self.boot_sector_state, cluster_id_) + content_ = bin_im[data_address_: data_address_ + self.boot_sector_state.sector_size] + + while not self.is_cluster_last(cluster_id_): + cluster_id_ = fat_value_ + fat_value_ = self.get_cluster_value(cluster_id_) + data_address_ = Cluster.compute_cluster_data_address(self.boot_sector_state, cluster_id_) + content_ += bin_im[data_address_: data_address_ + self.boot_sector_state.sector_size] + return content_ + def find_free_cluster(self) -> Cluster: # finds first empty cluster and allocates it for cluster in self.clusters: diff --git a/components/fatfs/fatfs_utils/fs_object.py b/components/fatfs/fatfs_utils/fs_object.py index 7e57f609e5..9107d14bfa 100644 --- a/components/fatfs/fatfs_utils/fs_object.py +++ b/components/fatfs/fatfs_utils/fs_object.py @@ -146,7 +146,7 @@ class Directory: return None @staticmethod - def _if_end_of_path(path_as_list: List[str]) -> bool: + def _is_end_of_path(path_as_list: List[str]) -> bool: """ :param path_as_list: path split into the list @@ -161,7 +161,7 @@ class Directory: next_obj = current_dir.lookup_entity(name, extension) if next_obj is None: raise FileNotFoundError('No such file or directory!') - if self._if_end_of_path(path_as_list) and next_obj.name_equals(name, extension): + if self._is_end_of_path(path_as_list) and next_obj.name_equals(name, extension): return next_obj return self.recursive_search(path_as_list[1:], next_obj) @@ -213,8 +213,8 @@ class Directory: split_names_reversed = reversed(list(enumerate(split_name_to_lfn_entries(lfn_full_name, entries_count)))) for i, name_split_to_entry in split_names_reversed: order: int = i + 1 - lfn_names: List[bytes] = list( - map(lambda x: x.lower(), split_name_to_lfn_entry_blocks(name_split_to_entry))) # type: ignore + blocks_: List[bytes] = split_name_to_lfn_entry_blocks(name_split_to_entry) + lfn_names: List[bytes] = list(map(lambda x: x.lower(), blocks_)) free_entry.allocate_entry(first_cluster_id=free_cluster.id, entity_name=name, entity_extension=extension, diff --git a/components/fatfs/fatfs_utils/long_filename_utils.py b/components/fatfs/fatfs_utils/long_filename_utils.py index 0b2f782a4c..ccd63b5317 100644 --- a/components/fatfs/fatfs_utils/long_filename_utils.py +++ b/components/fatfs/fatfs_utils/long_filename_utils.py @@ -44,14 +44,15 @@ def split_name_to_lfn_entry_blocks(name: str) -> List[bytes]: characters are set to 0xFFFF. E.g.: 'GFILENAMA.TXT' -> [b'G\x00F\x00I\x00L\x00E\x00', b'N\x00A\x00M\x00A\x00.\x00T\x00', b'X\x00T\x00']; - 'T' -> [b'T\x00\x00\x00\xff\xff\xff\xff\xff\xff', b'\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff', b'\xff\xff\xff\xff'] + 'T' -> [b'T\x00\x00\x00\xff\xff\xff\xff\xff\xff', b'\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff', + b'\xff\xff\xff\xff'] Notice that since every character is coded using 2 bytes be must add 0x00 to ASCII symbols ('G' -> 'G\x00', etc.), since character 'T' ends in the first block, we must add '\x00\x00' after 'T\x00'. """ max_entry_size: int = Entry.LDIR_Name1_SIZE + Entry.LDIR_Name2_SIZE + Entry.LDIR_Name2_SIZE assert len(name) <= max_entry_size - return [ + blocks_: List[bytes] = [ convert_to_utf16_and_pad(content=name[:Entry.LDIR_Name1_SIZE], expected_size=Entry.LDIR_Name1_SIZE), convert_to_utf16_and_pad(content=name[Entry.LDIR_Name1_SIZE:Entry.LDIR_Name1_SIZE + Entry.LDIR_Name2_SIZE], @@ -59,6 +60,7 @@ def split_name_to_lfn_entry_blocks(name: str) -> List[bytes]: convert_to_utf16_and_pad(content=name[Entry.LDIR_Name1_SIZE + Entry.LDIR_Name2_SIZE:], expected_size=Entry.LDIR_Name3_SIZE) ] + return blocks_ def build_lfn_unique_entry_name_order(entities: list, lfn_entry_name: str) -> int: diff --git a/components/fatfs/fatfs_utils/utils.py b/components/fatfs/fatfs_utils/utils.py index 972f7f0070..8cf57bf4a3 100644 --- a/components/fatfs/fatfs_utils/utils.py +++ b/components/fatfs/fatfs_utils/utils.py @@ -261,7 +261,7 @@ class FATDefaults: SEC_PER_TRACK: int = 0x3f VOLUME_LABEL: str = 'Espressif' FILE_SYS_TYPE: str = 'FAT' - ROOT_ENTRIES_COUNT: int = 512 # number of entries in the root directory + ROOT_ENTRIES_COUNT: int = 512 # number of entries in the root directory, recommended 512 MEDIA_TYPE: int = 0xf8 SIGNATURE_WORD: bytes = b'\x55\xAA' diff --git a/components/fatfs/fatfsgen.py b/components/fatfs/fatfsgen.py index 4d31f9a1bd..c253b31bc9 100755 --- a/components/fatfs/fatfsgen.py +++ b/components/fatfs/fatfsgen.py @@ -39,7 +39,6 @@ class FATFS: root_entry_count: int = FATDefaults.ROOT_ENTRIES_COUNT, explicit_fat_type: int = None, media_type: int = FATDefaults.MEDIA_TYPE) -> None: - # root directory bytes should be aligned by sector size assert (root_entry_count * BYTES_PER_DIRECTORY_ENTRY) % sector_size == 0 # number of bytes in the root dir must be even multiple of BPB_BytsPerSec diff --git a/components/fatfs/fatfsparse.py b/components/fatfs/fatfsparse.py index 13040fda44..656f12daf9 100644 --- a/components/fatfs/fatfsparse.py +++ b/components/fatfs/fatfsparse.py @@ -1,75 +1,79 @@ # SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD # SPDX-License-Identifier: Apache-2.0 +import argparse import os -import sys -from typing import Tuple +import construct from fatfs_utils.boot_sector import BootSector -from fatfs_utils.cluster import Cluster from fatfs_utils.entry import Entry from fatfs_utils.fat import FAT from fatfs_utils.fatfs_state import BootSectorState -from fatfs_utils.utils import PAD_CHAR, FATDefaults, read_filesystem +from fatfs_utils.utils import FULL_BYTE, LONG_NAMES_ENCODING, PAD_CHAR, FATDefaults, lfn_checksum, read_filesystem -def get_chained_full_content(cluster_id_: int, - fat_: FAT, - state_: BootSectorState, - binary_array_: bytearray) -> bytearray: - if fat_.is_cluster_last(cluster_id_): - data_address_ = Cluster.compute_cluster_data_address(state_, cluster_id_) - content_: bytearray = binary_array_[data_address_: data_address_ + state_.sector_size] - return content_ - fat_value_: int = fat_.get_cluster_value(cluster_id_) - data_address_ = Cluster.compute_cluster_data_address(state_, cluster_id_) - content_ = binary_array_[data_address_: data_address_ + state_.sector_size] - - while not fat_.is_cluster_last(cluster_id_): - cluster_id_ = fat_value_ - fat_value_ = fat_.get_cluster_value(cluster_id_) - data_address_ = Cluster.compute_cluster_data_address(state_, cluster_id_) - content_ += binary_array_[data_address_: data_address_ + state_.sector_size] - return content_ +def build_file_name(name1: bytes, name2: bytes, name3: bytes) -> str: + full_name_ = name1 + name2 + name3 + # need to strip empty bytes and null-terminating char ('\x00') + return full_name_.rstrip(FULL_BYTE).decode(LONG_NAMES_ENCODING).rstrip('\x00') -def get_name_and_id(obj_: dict) -> Tuple[str, int]: - cluster_id_ = obj_['DIR_FstClusLO'] +def get_obj_name(obj_: dict, directory_bytes_: bytes, entry_position_: int, lfn_checksum_: int) -> str: obj_ext_ = obj_['DIR_Name_ext'].rstrip(chr(PAD_CHAR)) ext_ = f'.{obj_ext_}' if len(obj_ext_) > 0 else '' - obj_name_ = obj_['DIR_Name'].rstrip(chr(PAD_CHAR)) + ext_ - return obj_name_, cluster_id_ + obj_name_: str = obj_['DIR_Name'].rstrip(chr(PAD_CHAR)) + ext_ # short entry name + + if not args.long_name_support: + return obj_name_ + + full_name = {} + + for pos in range(entry_position_ - 1, -1, -1): # loop from the current entry back to the start + obj_address_: int = FATDefaults.ENTRY_SIZE * pos + entry_bytes_: bytes = directory_bytes_[obj_address_: obj_address_ + FATDefaults.ENTRY_SIZE] + struct_ = Entry.parse_entry_long(entry_bytes_, lfn_checksum_) + if len(struct_.items()) > 0: + full_name[struct_['order']] = build_file_name(struct_['name1'], struct_['name2'], struct_['name3']) + if struct_['is_last']: + break + return ''.join(map(lambda x: x[1], sorted(full_name.items()))) or obj_name_ def traverse_folder_tree(directory_bytes_: bytes, name: str, - state_: BootSectorState, fat_: FAT, + state_: BootSectorState, + fat_: FAT, binary_array_: bytearray) -> None: - if name not in ('.', '..'): - os.makedirs(name) - for i in range(len(directory_bytes_) // FATDefaults.ENTRY_SIZE): - obj_address_ = FATDefaults.ENTRY_SIZE * i - obj_ = Entry.ENTRY_FORMAT_SHORT_NAME.parse( - directory_bytes_[obj_address_: obj_address_ + FATDefaults.ENTRY_SIZE]) + os.makedirs(name) + + assert len(directory_bytes_) % FATDefaults.ENTRY_SIZE == 0 + entries_count_: int = len(directory_bytes_) // FATDefaults.ENTRY_SIZE + + for i in range(entries_count_): + obj_address_: int = FATDefaults.ENTRY_SIZE * i + try: + obj_: dict = Entry.ENTRY_FORMAT_SHORT_NAME.parse( + directory_bytes_[obj_address_: obj_address_ + FATDefaults.ENTRY_SIZE]) + except (construct.core.ConstError, UnicodeDecodeError) as e: + if not args.long_name_support: + raise e + continue + + if obj_['DIR_Attr'] == 0: # empty entry + continue + + obj_name_: str = get_obj_name(obj_, + directory_bytes_, + entry_position_=i, + lfn_checksum_=lfn_checksum(obj_['DIR_Name'] + obj_['DIR_Name_ext'])) if obj_['DIR_Attr'] == Entry.ATTR_ARCHIVE: - obj_name_, cluster_id_ = get_name_and_id(obj_) - content_ = get_chained_full_content( - cluster_id_=cluster_id_, - fat_=fat_, - state_=state_, - binary_array_=binary_array_ - ).rstrip(chr(0x00).encode()) + content_ = fat_.chain_content(cluster_id_=Entry.get_cluster_id(obj_)).rstrip(chr(0x00).encode()) with open(os.path.join(name, obj_name_), 'wb') as new_file: new_file.write(content_) elif obj_['DIR_Attr'] == Entry.ATTR_DIRECTORY: - obj_name_, cluster_id_ = get_name_and_id(obj_) + # avoid creating symlinks to itself and parent folder if obj_name_ in ('.', '..'): continue - child_directory_bytes_ = get_chained_full_content( - cluster_id_=obj_['DIR_FstClusLO'], - fat_=fat_, - state_=state_, - binary_array_=binary_array_ - ) + child_directory_bytes_ = fat_.chain_content(cluster_id_=obj_['DIR_FstClusLO']) traverse_folder_tree(directory_bytes_=child_directory_bytes_, name=os.path.join(name, obj_name_), state_=state_, @@ -78,14 +82,23 @@ def traverse_folder_tree(directory_bytes_: bytes, if __name__ == '__main__': - fs = read_filesystem(sys.argv[1]) - parser = BootSector() - parser.parse_boot_sector(fs) - fat = FAT(parser.boot_sector_state, init_=False) + desc = 'Tool for parsing fatfs image and extracting directory structure on host.' + argument_parser: argparse.ArgumentParser = argparse.ArgumentParser(description=desc) + argument_parser.add_argument('input_image', + help='Path to the image that will be parsed and extracted.') + argument_parser.add_argument('--long-name-support', + action='store_true', + help='Set flag to enable long names support.') + args = argument_parser.parse_args() - boot_dir_start_ = parser.boot_sector_state.root_directory_start - boot_dir_sectors = parser.boot_sector_state.root_dir_sectors_cnt - full_ = fs[boot_dir_start_: boot_dir_start_ + boot_dir_sectors * parser.boot_sector_state.sector_size] + fs = read_filesystem(args.input_image) + boot_sector_ = BootSector() + boot_sector_.parse_boot_sector(fs) + fat = FAT(boot_sector_.boot_sector_state, init_=False) + + boot_dir_start_ = boot_sector_.boot_sector_state.root_directory_start + boot_dir_sectors = boot_sector_.boot_sector_state.root_dir_sectors_cnt + full_ = fs[boot_dir_start_: boot_dir_start_ + boot_dir_sectors * boot_sector_.boot_sector_state.sector_size] traverse_folder_tree(full_, - parser.boot_sector_state.volume_label.rstrip(chr(PAD_CHAR)), - parser.boot_sector_state, fat, fs) + boot_sector_.boot_sector_state.volume_label.rstrip(chr(PAD_CHAR)), + boot_sector_.boot_sector_state, fat, fs) diff --git a/components/fatfs/test_fatfsgen/test_fatfsparse.py b/components/fatfs/test_fatfsgen/test_fatfsparse.py index da72509ec3..caa060579f 100755 --- a/components/fatfs/test_fatfsgen/test_fatfsparse.py +++ b/components/fatfs/test_fatfsgen/test_fatfsparse.py @@ -194,9 +194,7 @@ class FatFSGen(unittest.TestCase): folder3_ = { 'type': 'folder', 'name': 'XYZ2', - 'content': [ - self.file_(f'A{i}') for i in range(50) - ] + 'content': [self.file_(f'A{i}') for i in range(50)] } struct_: dict = { 'type': 'folder', @@ -244,9 +242,7 @@ class FatFSGen(unittest.TestCase): folder3_ = { 'type': 'folder', 'name': 'XYZ2', - 'content': [ - self.file_(f'A{i}') for i in range(50) - ] + [folder2_] + 'content': [self.file_(f'A{i}') for i in range(50)] + [folder2_] } struct_: dict = { @@ -268,6 +264,36 @@ class FatFSGen(unittest.TestCase): run(['python', '../fatfsparse.py', 'fatfs_image.img'], stderr=STDOUT) assert compare_folders('testf', 'Espressif') + def test_e2e_very_deep_long(self) -> None: + folder_ = { + 'type': 'folder', + 'name': 'veryveryverylong111', + 'content': [ + self.file_('myndewveryverylongfile1.txt', content_=4097 * 'a'), + self.file_('mynewveryverylongfile22.txt', content_=2 * 4097 * 'a'), + self.file_('mynewveryverylongfile333.txt' * 8), + self.file_('mynewveryverylongfile4444.txt' * 8), + self.file_('mynewveryverylongfile5555.txt'), + self.file_('SHORT.TXT'), + ] + } + struct_: dict = { + 'type': 'folder', + 'name': 'testf', + 'content': [ + self.file_('mynewveryverylongfile.txt' * 5), + folder_, + ] + } + generate_local_folder_structure(struct_, path_='.') + run([ + 'python', + f'{os.path.join(os.path.dirname(__file__), "..", "fatfsgen.py")}', + 'testf', '--long_name_support' + ], stderr=STDOUT) + run(['python', '../fatfsparse.py', 'fatfs_image.img', '--long-name-support'], stderr=STDOUT) + assert compare_folders('testf', 'Espressif') + if __name__ == '__main__': unittest.main() diff --git a/docs/en/api-reference/storage/fatfs.rst b/docs/en/api-reference/storage/fatfs.rst index 9010cea1f3..861e03b059 100644 --- a/docs/en/api-reference/storage/fatfs.rst +++ b/docs/en/api-reference/storage/fatfs.rst @@ -135,10 +135,10 @@ For an example, see :example:`storage/fatfsgen`. FatFs Partition Analyzer ------------------------ -We provide a partition analyzer for FatFs (:component_file:`fatfsparse.py`). The tool is still in active progress and provides only restricted functionality. +(:component_file:`fatfsparse.py`) is a partition analyzing tool for FatFs. -It is only guaranteed that the tool is able to analyze images generated by FatFs partition generator (:component_file:`fatfsgen.py`) (without support for wear levelling and long names) and generate the folder structure on host with the same name as a FatFs volume label. +It is a reverse tool of (:component_file:`fatfsgen.py`), i.e. it can generate the folder structure on the host based on the FatFs image. Usage:: - ./fatfsparse.py fatfs_image.img + ./fatfsparse.py [-h] [--long-name-support] fatfs_image.img