Source code for facedancer.devices.umass.disk_image

from mmap import mmap

import os


[docs] class DiskImage: """ Class representing an arbitrary disk image, which can be procedurally generated, or which can be rendered from e.g. a file. Currently limited to representing disk with 512-byte sectors. """
[docs] def close(self): """ Closes and cleans up any resources held by the disk image. """ pass
[docs] def get_sector_size(self): return 512
[docs] def get_sector_count(self): """ Returns the disk's sector count. """ raise NotImplementedError()
[docs] def get_data(self, address, length): data_to_read = length sector_size = self.get_sector_size() data = bytes() while data_to_read > 0: data.extend(self.get_sector_data(address)) data_to_read -= sector_size address += 1 return data
[docs] def get_sector_data(self, address): """ Returns the raw binary data for a given sector. """ raise NotImplementedError()
[docs] def put_data(self, address, data): sector_size = self.get_sector_size() while data: sector = data[:sector_size] data = data[sector_size:] self.put_sector_data(address, sector) address += 1 return data
[docs] def put_sector_data(self, address, data): """ Sets the raw binary data for a given disk sector. """ sys.stderr.write("WARNING: UMS write ignored; this type of image does not support writing.\n")
[docs] class FAT32DiskImage(DiskImage): """ Class for manufacturing synthetic FAT32 disk images. """ CLUSTER_SIZE = 512 MBR_SECTOR = 0 BPB_SECTOR = 2048 # set by our MBR partition entry FSINFO_SECTOR = 2049 # set by our BPB entry FAT_START = 2080 # specified by our BPB entry (partition start + reserved sectors) FAT_END = 6113 # specified by our BPB entry (fat start + fat size) DATA_SECTION_START = 10146 # specified by our BPB entry (fat start + num_fats * fat_size) ROOT_DIR_ENTRY = 10146 # specified by our BPB entry (we put the directory at the very start) def __init__(self, size = 1024 * 1024 * 256, verbose=0): self.verbose = verbose self.size = size # Initialize the commands we'll use to handle sector writes. self._initialize_sector_handlers() def _register_sector_handler(self, sector_or_lambda, name, handler=None): if handler is None: handler = self.handle_unhandled_sector descriptor = { "sector_or_lambda": sector_or_lambda, "name": name, "handler": handler } self.sector_handlers.append(descriptor) def _initialize_sector_handlers(self): self.sector_handlers = [] # Handlers for disk special sectors... self._register_sector_handler(self.MBR_SECTOR, "MBR/partition table", self.handle_mbr_read) self._register_sector_handler(self.BPB_SECTOR, "BIOS Parameter Block", self.handle_bpb_read) self._register_sector_handler(self.FSINFO_SECTOR, "FSINFO Block", self.handle_fsinfo_read) self._register_sector_handler(lambda x : x >= self.FAT_START and x < self.FAT_END, "File Allocation Table", self.handle_fat_read) self._register_sector_handler(self.ROOT_DIR_ENTRY, "Root Directory", self.handle_root_dir_read)
[docs] def handle_mbr_read(self, address): """ Returns a master boot record directing the target device to our emulated FAT32 partition. """ response = 440 * b'\0' # bootstrap code + timestamp response += b'\xDE\xAD\xBE\xEF' # disk signature (we're making one up) response += b'\x00\x00' # 0 = not copy protected response += self._generate_fat_partition_entry() # partition entry for our FAT32 partition response += (16 * 3) * b'\0' # three empty partition slots response += b'\x55\xAA' # end of sector signature return response
[docs] def handle_bpb_read(self, address): """ Returns a valid Boot Parameter Block, which tells the device how to interpret our FAT filesystem. """ response = b'\xEB\x00\x90' # jump to bootloader (oddly, checked on some non-x86 uCs) response += b'MSWIN4.1' # OEM name (this one seems broadly compatible) # Bytes per disk sector. response += self.get_sector_size().to_bytes(2, byteorder='little') # Sectors per cluster. response += self._sectors_per_cluster().to_bytes(1, byteorder='little') response += b'\x20\x00' # reserved sectors response += b'\x02' # number of FATs (must be 2) response += b'\x00\x00' # root entries (must be 0 for fat32) response += b'\x00\x00' # total 16-bit count of sectors (must be 0 for fat32) response += b'\xF8' # media type: hard drive (0xF8) response += b'\x00\x00' # sectors per FAT (must be 0 for fat32) response += b'\x00\x00' # sectors per track (most likely ignored) response += b'\x00\x00' # number of heads (most likely ignored) response += b'\x00\x00\x00\x00' # hidden sectors (most likely ignored) # The total number of sectors in the volume. response += self.get_partition_sectors().to_bytes(4, byteorder='little') response += b'\xC1\x0F\x00\x00' # sectors per FAT response += b'\x00\x00' # flags response += b'\x00\x00' # filesystem revision response += b'\x02\x00\x00\x00' # cluster for the root directory response += b'\x01\x00' # address of the fsinfo sector response += b'\x06\x00' # address of the backup of the boot sector response += 12 * b'\x00' # reserved space response += b'\x80' # drive number for PC-BIOS (0x80 = hard disk) response += b'\x00' # reserved space response += b'\x29' # boot signature (from mkfs.vfat) response += b'0000' # disk serial number (for volume tracking) response += b'Facedancer ' # volume label (must be 11 bytes; spaces for padding) response += b'FAT32 ' # should be "FAT32" for FAT32, padded to eight bytes response += 420 * b'\x00' # reserved space response += b'\x55\xAA' # end of sector marker return response
[docs] def handle_fsinfo_read(self, address): """ Returns a valid filesystem info block, which is used to cache information about free sectors on the filesystem. We don't actually sport writing, so we return a valid-but-useless block. """ response = b'\x52\x52\x61\x41' # fsinfo block signature (magic number) response += 480 * b'\x00' # reserved for future use response += b'\x72\x72\x41\x61' # second signature (magic number) response += b'\xFF\xFF\xFF\xFF' # free sector count (-1 = "don't know") response += b'\xFF\xFF\xFF\xFF' # next free sector (-1 = "don't know") response += 12 * b'\x00' # reserved for future use response += b'\x00\x00\x55\xAA' # final signature (magic number) return response
def _generate_directory_entry(self, filename, file_size, cluster_number, flags=b'\x00'): # TODO: automatically convert filename, filesize to bytes # TODO: support long form name entries? cluster_number_bytes = cluster_number.to_bytes(4, byteorder='little') cluster_number_low = cluster_number_bytes[:2] cluster_number_high = cluster_number_bytes[2:] entry = filename # short name (first 8 are name; last three are extension) entry += flags # file attributes entry += b'\x00' # reserved byte entry += 5 * b'\x00' # dir creation date/time entry += 2 * b'\x00' # last access date entry += cluster_number_high # high word of the entry's first cluster number entry += 4 * b'\x00' # last write date/time entry += cluster_number_low # low word of the entry's first cluster number entry += file_size.to_bytes(4, byteorder='little') return entry def _short_filename_checksum(self, short_filename): """ Generates a long-form name checksum for a given 8.3 short filename. """ sum = 0 for byte in short_filename: # I'm sorry. This is copied directly from the bloody spec. # Don't judge me. Judge them. sum = (((sum & 1) << 7) | ((sum & 0xfe) >> 1)) + byte return sum & 0xFF def _is_valid_83_char(self, c): """ Returns true iff the given character is a valid 8-3 filename character. """ if c in " !#$%&'()-@^_`{}~'": return True if c.isupper() or c.isdigit(): return True return False def _is_valid_83_name(self, long_filename): """ Returns true iff the given filename is a valid filename. """ if len(long_filename) != 11: return False return all([self._is_valid_83_char(c) for c in long_filename]) def _short_filename_from_long(self, long_filename): """ Generates short-form filenames from a long-form name. """ # TODO: Generalize this to behave like Windows if self._is_valid_83_name(long_filename): return long_filename.encode('utf-8') else: # FIXME: This breaks in lots of cases; it's just Good Enough (TM) # for now. prefix = re.sub(r'\W+', '', long_filename)[:6] extension = long_filename[-3:] short_name = '{}~1{}'.format(prefix, extension) return short_name.encode('utf-8') def _generate_long_directory_entries(self, long_filename, short_filename): """ Generate long-form directory entries for a long filename. Should be called immediately before calling the short_form directory entry functions. """ index = 1 entries = [] # Null terminate our long filename, as the filesystem expects. long_filename += "\0" while long_filename: entry_file = long_filename[:13] long_filename = long_filename[13:] # If this is the final entry, set the sixth bit of the index, # indicating that this is the final index present. if not long_filename: index |= 0x40 # Compute the checksum for the short filename. checksum = self._short_filename_checksum(short_filename) # Encode the filename in UTF-8, padded with FFs as necessary. entry_file = bytes(entry_file.encode('utf-16'))[2:] entry_file = entry_file.ljust(26, b'\xFF') # Generate the entry itself. entry = index.to_bytes(1, byteorder='little') # index of this entry entry += entry_file[:10] # first five characters entry += b'\x0F' # attribute indicating this is a long filename entry += b'\x00' # always zeroes for VFAT LFNs entry += checksum.to_bytes(1, byteorder='little') # checksum of the short name entry += entry_file[10:22] # next six characters of the filename entry += b'\x00\x00' # always zeroes entry += entry_file[22:] # the final two characters of the filename # Move to the next entry... index += 1 entries.append(entry) # Reverse the order of the entries, and convert them to a byte string. return b''.join(entries[::-1])
[docs] def handle_root_dir_read(self, address): """ Returns a valid entry describing the root directory of our FAT filesystem. """ # Generate the volume label entry. response = self._generate_directory_entry(b'Facedancer ', 0, 0, flags=b'\x08') return response
def _generate_fat_partition_entry(self): """ Returns a partition entry pointing to our synthetic FAT partition. """ response = b'\x00' # Status: 0x00 = not bootable, 0x80 = bootable response += b'\x00\x00\x00' # CHS address of the partition's first sector; typically ignored response += b'\x0B' # disk type: FAT32 with CHS/LBA addressing response += b'\x00\x00\x00' # CHS address of the partition's end; typically ignored # LBA of our first sector. response += self.BPB_SECTOR.to_bytes(4, byteorder='little') # Report the size of the partition, in sectors. We'll use up all "unallocated" # space on the drive with our FAT partition. response += self.get_partition_sectors().to_bytes(4, byteorder='little') return response def _sectors_per_cluster(self): """ Returns the number of sectors in a cluster. """ return int(self.CLUSTER_SIZE / self.get_sector_size())
[docs] def handle_fat_read(self, address): """ Handles an access to the device's file allocation table. """ # TODO: Create general method for reading from the FAT based on # virtual files, and methods to add those files! raise NotImplementedError()
[docs] def handle_unhandled_sector(self, address): """ Handles unsupported sector reads. """ if self.verbose > 3: print("<-- !!! unhandled sector {}, returning all zeroes".format(address)) return bytes(bytearray(self.get_sector_size()))
[docs] def get_sector_count(self): """ Returns the total number of sectors present on the disk. """ return int(self.size / self.get_sector_size()) - 1
[docs] def get_partition_sectors(self): """ Get the amount of sectors available for use by our main FAT partition. """ # Return everything but the MBR and reserved space. return (self.get_sector_count() - 4096)
def _find_sector_handler(self, address): """ Locates the function that should handle generation of the given sector. """ # Check each of our sector handlers to see if it is appropriate to handle # the given sector... for handler in self.sector_handlers: sector_or_lambda = handler['sector_or_lambda'] if(callable(sector_or_lambda)): matches = sector_or_lambda(address) else: matches = (sector_or_lambda == address) if matches: return handler return None
[docs] def get_sector_data(self, address): """ Fetches the data at the given sector of our emulated disk. """ handler = self._find_sector_handler(address) # If we have a handler for this sector, handle it. if handler: name = handler['name'] function = handler['handler'] if self.verbose > 0: print("<-- handling read of {} sector ({})".format(name, address)) # Call the main handler. response = function(address) # If our response is smaller than our sector size, pad it out with zeroes. if len(response) < self.get_sector_size(): needed_bytes = self.get_sector_size() - len(response) response += needed_bytes * b'\x00' if self.verbose > 4: print(" response: {} ({})".format(len(response), response)) return response # Otherwise, run the unknown command handler. else: return self.handle_unhandled_sector(address)
[docs] class RawDiskImage(DiskImage): """ Raw disk image backed by a file. """ def __init__(self, filename, block_size, verbose=0): self.filename = filename self.block_size = block_size self.verbose = verbose statinfo = os.stat(self.filename) self.size = statinfo.st_size self.file = open(self.filename, 'r+b') self.image = mmap(self.file.fileno(), 0)
[docs] def close(self): self.image.flush() self.image.close()
[docs] def get_sector_count(self): return int(self.size / self.block_size) - 1
[docs] def get_sector_data(self, address): if self.verbose == 2: print("<-- reading sector {}".format(address)) block_start = address * self.block_size block_end = (address + 1) * self.block_size # slices are NON-inclusive data = self.image[block_start:block_end] if self.verbose > 3: if not any(data): print("<-- reading sector {} [all zeroes]".format(address)) else: print("<-- reading sector {} [{}]".format(address, data)) return data
[docs] def put_data(self, address, data): if self.verbose > 1: blocks = int(len(data) / self.block_size) print("--> writing {} blocks at lba {}".format(blocks, address)) super().put_data(address, data)
[docs] def put_sector_data(self, address, data): if self.verbose == 2: print("--> writing sector {}".format(address)) if len(data) > self.block_size: print("WARNING: got {} bytes of sector data; expected a max of {}".format(len(data), self.block_size)) block_start = address * self.block_size block_end = (address + 1) * self.block_size # slices are NON-inclusive if self.verbose > 3: if not any(data): print("--> writing sector {} [all zeroes]".format(address)) else: print("--> writing sector {} [{}]".format(address, data)) self.image[block_start:block_end] = data[:self.block_size] self.image.flush()