diff --git a/codexctl/__init__.py b/codexctl/__init__.py index b68f8d0..b0b4a21 100644 --- a/codexctl/__init__.py +++ b/codexctl/__init__.py @@ -1,17 +1,16 @@ ### Importing required general modules import argparse -import os.path -import sys -import logging import importlib.util -import tempfile -import shutil import json +import logging +import os.path import re - -from typing import cast +import shutil +import sys +import tempfile from os import listdir +from typing import cast try: from loguru import logger @@ -67,6 +66,9 @@ def call_func(self, function: str, args: dict) -> None: ### Download functionalities if function == "list": + remarkable_ppure_versions = "\n".join( + self.updater.remarkableppure_versions.keys() + ) remarkable_pp_versions = "\n".join( self.updater.remarkablepp_versions.keys() ) @@ -77,14 +79,26 @@ def call_func(self, function: str, args: dict) -> None: remarkable_1_versions = "\n".join(self.updater.remarkable1_versions.keys()) version_blocks = [] + if remarkable_version is None or remarkable_version == HardwareType.RMPPURE: + version_blocks.append( + f"{HardwareType.RMPPURE.formatted_name}:\n{remarkable_ppure_versions}" + ) if remarkable_version is None or remarkable_version == HardwareType.RMPP: - version_blocks.append(f"ReMarkable Paper Pro:\n{remarkable_pp_versions}") + version_blocks.append( + f"{HardwareType.RMPP.formatted_name}:\n{remarkable_pp_versions}" + ) if remarkable_version is None or remarkable_version == HardwareType.RMPPM: - version_blocks.append(f"ReMarkable Paper Pro Move:\n{remarkable_ppm_versions}") + version_blocks.append( + f"{HardwareType.RMPPM.formatted_name}:\n{remarkable_ppm_versions}" + ) if remarkable_version is None or remarkable_version == HardwareType.RM2: - version_blocks.append(f"ReMarkable 2:\n{remarkable_2_versions}") + version_blocks.append( + f"{HardwareType.RM2.formatted_name}:\n{remarkable_2_versions}" + ) if remarkable_version is None or remarkable_version == HardwareType.RM1: - version_blocks.append(f"ReMarkable 1:\n{remarkable_1_versions}") + version_blocks.append( + f"{HardwareType.RM1.formatted_name}:\n{remarkable_1_versions}" + ) print("\n\n".join(version_blocks)) @@ -122,8 +136,9 @@ def call_func(self, function: str, args: dict) -> None: logger.info(f"Extracted image to {args['out']}") else: try: - from .analysis import get_update_image from remarkable_update_fuse import UpdateFS + + from .analysis import get_update_image except ImportError: raise ImportError( "remarkable_update_fuse is required for mounting. Please install it!" @@ -266,7 +281,9 @@ def version_lookup(version: str | None) -> re.Match[str] | None: image = UpdateImage(update_file) if isinstance(image, CPIOUpdateImage): if image.version is None: - raise SystemError(f"Could not determine version from SWU file: {update_file}") + raise SystemError( + f"Could not determine version from SWU file: {update_file}" + ) version_number = image.version hw_map = { @@ -274,12 +291,17 @@ def version_lookup(version: str | None) -> re.Match[str] | None: "reMarkable2": HardwareType.RM2, "ferrari": HardwareType.RMPP, "chiappa": HardwareType.RMPPM, + "tatsu": HardwareType.RMPPURE, } if image.hardware_type not in hw_map: - raise SystemError(f"Unsupported hardware type in SWU file: {update_file}") + raise SystemError( + f"Unsupported hardware type in SWU file: {update_file}" + ) swu_hardware = hw_map[image.hardware_type] - logger.info(f"Extracted from SWU: version={version_number}, hardware={swu_hardware.name}") + logger.info( + f"Extracted from SWU: version={version_number}, hardware={swu_hardware.name}" + ) if swu_hardware != remarkable.hardware: raise SystemError( @@ -289,7 +311,9 @@ def version_lookup(version: str | None) -> re.Match[str] | None: f"Cannot install firmware for different hardware." ) except ValueError as e: - logger.warning(f"Could not extract metadata from update file: {e}") + logger.warning( + f"Could not extract metadata from update file: {e}" + ) if not version_number: version_match = version_lookup(version) @@ -337,15 +361,18 @@ def version_lookup(version: str | None) -> re.Match[str] | None: bootloader_files_for_install = None - if (device_version_uses_new_engine and - remarkable.hardware == HardwareType.RMPP): - + if ( + device_version_uses_new_engine + and remarkable.hardware == HardwareType.RMPP + ): current_version = remarkable.get_device_status()[2] - if UpdateManager.is_bootloader_boundary_downgrade(current_version, version_number): - print("\n" + "="*60) + if UpdateManager.is_bootloader_boundary_downgrade( + current_version, version_number + ): + print("\n" + "=" * 60) print("WARNING: Bootloader Update Required") - print("="*60) + print("=" * 60) print(f"Current version: {current_version}") print(f"Target version: {version_number}") print() @@ -359,21 +386,23 @@ def version_lookup(version: str | None) -> re.Match[str] | None: print() response = input("Do you want to continue? (y/N): ") - if response.lower() != 'y': + if response.lower() != "y": raise SystemExit("Installation cancelled by user") expected_swu_name = f"remarkable-production-memfault-image-{current_version}-{remarkable.hardware.new_download_hw}-public" expected_swu_path = f"./{expected_swu_name}" if os.path.isfile(expected_swu_path): - print(f"\nUsing existing {expected_swu_name} for bootloader extraction...") + print( + f"\nUsing existing {expected_swu_name} for bootloader extraction..." + ) current_swu_path = expected_swu_path else: - print("\nDownloading current version's SWU for bootloader extraction...") + print( + "\nDownloading current version's SWU for bootloader extraction..." + ) current_swu_path = self.updater.download_version( - remarkable.hardware, - current_version, - "./" + remarkable.hardware, current_version, "./" ) if not current_swu_path: @@ -384,17 +413,26 @@ def version_lookup(version: str | None) -> re.Match[str] | None: print("Extracting bootloader files...") from remarkable_update_image import UpdateImage + swu_image = UpdateImage(current_swu_path) bootloader_files_for_install = { - 'update-bootloader.sh': swu_image.archive[b'update-bootloader.sh'].read(), - 'imx-boot': swu_image.archive[b'imx-boot'].read(), + "update-bootloader.sh": swu_image.archive[ + b"update-bootloader.sh" + ].read(), + "imx-boot": swu_image.archive[b"imx-boot"].read(), } if not all(bootloader_files_for_install.values()): - raise SystemError("Failed to extract bootloader files from current version") + raise SystemError( + "Failed to extract bootloader files from current version" + ) - print(f"✓ Extracted update-bootloader.sh ({len(bootloader_files_for_install['update-bootloader.sh'])} bytes)") - print(f"✓ Extracted imx-boot ({len(bootloader_files_for_install['imx-boot'])} bytes)") + print( + f"✓ Extracted update-bootloader.sh ({len(bootloader_files_for_install['update-bootloader.sh'])} bytes)" + ) + print( + f"✓ Extracted imx-boot ({len(bootloader_files_for_install['imx-boot'])} bytes)" + ) print() if not update_file_requires_new_engine: @@ -437,7 +475,9 @@ def version_lookup(version: str | None) -> re.Match[str] | None: ) if device_version_uses_new_engine: - remarkable.install_sw_update(update_file, bootloader_files=bootloader_files_for_install) + remarkable.install_sw_update( + update_file, bootloader_files=bootloader_files_for_install + ) else: remarkable.install_ohma_update(update_file) diff --git a/codexctl/__main__.py b/codexctl/__main__.py index 09326c0..868d99e 100644 --- a/codexctl/__main__.py +++ b/codexctl/__main__.py @@ -1,4 +1,4 @@ -from . import main - -if __name__ == "__main__": - main() +from . import main + +if __name__ == "__main__": + main() diff --git a/codexctl/analysis.py b/codexctl/analysis.py index f84c4d3..1770341 100644 --- a/codexctl/analysis.py +++ b/codexctl/analysis.py @@ -1,34 +1,34 @@ -import ext4 -import warnings -import errno - -from remarkable_update_image import UpdateImage -from remarkable_update_image import UpdateImageSignatureException -from .device import HardwareType - - -def get_update_image(file: str): - """Extracts files from an update image (<3.11 currently)""" - - image = UpdateImage(file) - volume = ext4.Volume(image, offset=0) - try: - inode = volume.inode_at("/usr/share/update_engine/update-payload-key.pub.pem") - if inode is None: - raise FileNotFoundError() - - inode.verify() - image.verify(inode.open().read()) - - except UpdateImageSignatureException: - warnings.warn("Signature doesn't match contents", RuntimeWarning) - - except FileNotFoundError: - warnings.warn("Public key missing", RuntimeWarning) - - except OSError as e: - if e.errno != errno.ENOTDIR: - raise - warnings.warn("Unable to open public key", RuntimeWarning) - - return image, volume +import ext4 +import warnings +import errno + +from remarkable_update_image import UpdateImage +from remarkable_update_image import UpdateImageSignatureException +from .device import HardwareType + + +def get_update_image(file: str): + """Extracts files from an update image (<3.11 currently)""" + + image = UpdateImage(file) + volume = ext4.Volume(image, offset=0) + try: + inode = volume.inode_at("/usr/share/update_engine/update-payload-key.pub.pem") + if inode is None: + raise FileNotFoundError() + + inode.verify() + image.verify(inode.open().read()) + + except UpdateImageSignatureException: + warnings.warn("Signature doesn't match contents", RuntimeWarning) + + except FileNotFoundError: + warnings.warn("Public key missing", RuntimeWarning) + + except OSError as e: + if e.errno != errno.ENOTDIR: + raise + warnings.warn("Unable to open public key", RuntimeWarning) + + return image, volume diff --git a/codexctl/device.py b/codexctl/device.py index 226ac93..765fe0e 100644 --- a/codexctl/device.py +++ b/codexctl/device.py @@ -1,1024 +1,1124 @@ -import enum -import logging -import os -import re -import shlex -import socket -import subprocess -import tempfile -import threading -import time - -from .server import startUpdate - -try: - import paramiko - import psutil -except ImportError: - pass - - -class HardwareType(enum.Enum): - RM1 = enum.auto() - RM2 = enum.auto() - RMPP = enum.auto() - RMPPM = enum.auto() - - @classmethod - def parse(cls, device_type: str) -> "HardwareType": - match device_type.lower(): - case "ppm" | "rmppm" | "chiappa" | "remarkable chiappa": - return cls.RMPPM - - case "pp" | "pro" | "rmpp" | "ferrari" | "remarkable ferrari": - return cls.RMPP - - case "2" | "rm2" | "remarkable 2" | "remarkable 2.0": - return cls.RM2 - - case "1" | "rm1" | "remarkable 1" | "remarkable 1.0" | "remarkable prototype 1": - return cls.RM1 - - case _: - raise ValueError(f"Unknown hardware version: {device_type} (rm1, rm2, rmpp, rmppm)") - - @property - def old_download_hw(self): - match self: - case HardwareType.RM1: - return "reMarkable" - case HardwareType.RM2: - return "reMarkable2" - case HardwareType.RMPP: - raise ValueError("reMarkable Paper Pro does not support the old update engine") - case HardwareType.RMPPM: - raise ValueError("reMarkable Paper Pro Move does not support the old update engine") - - @property - def new_download_hw(self): - match self: - case HardwareType.RM1: - return "rm1" - case HardwareType.RM2: - return "rm2" - case HardwareType.RMPP: - return "rmpp" - case HardwareType.RMPPM: - return "rmppm" - - @property - def swupdate_hw(self): - match self: - case HardwareType.RM1: - return "reMarkable1" - case HardwareType.RM2: - return "reMarkable2" - case HardwareType.RMPP: - return "ferrari" - case HardwareType.RMPPM: - return "chiappa" - - @property - def toltec_type(self): - match self: - case HardwareType.RM1: - return "rm1" - case HardwareType.RM2: - return "rm2" - case HardwareType.RMPP: - raise ValueError("reMarkable Paper Pro does not support toltec") - case HardwareType.RMPPM: - raise ValueError("reMarkable Paper Pro Move does not support toltec") - -class DeviceManager: - def __init__( - self, logger=None, remote=False, address=None, authentication=None - ) -> None: - """Initializes the DeviceManager for codexctl - - Args: - remote (bool, optional): Whether the device is remote. Defaults to False. - address (bool, optional): Known IP of remote device, if applicable. Defaults to None. - logger (logger, optional): Logger object for logging. Defaults to None. - Authentication (str, optional): Authentication method. Defaults to None. - """ - self.logger = logger - self.address = address - self.authentication = authentication - self.client = None - - if self.logger is None: - self.logger = logging - - if remote: - self.client = self.connect_to_device( - authentication=authentication, remote_address=address - ) - - self.client.authentication = authentication - self.client.address = address - - ftp = self.client.open_sftp() - with ftp.file("/sys/devices/soc0/machine") as file: - machine_contents = file.read().decode("utf-8").strip("\n") - else: - with open("/sys/devices/soc0/machine") as file: - machine_contents = file.read().strip("\n") - - self.hardware = HardwareType.parse(machine_contents) - - def get_host_address(self) -> list[str] | list | None: # Interaction required - """Gets the IP address of the host machine - - Returns: - str | None: IP address of the host machine, or None if not found - """ - - possible_ips = [] - try: - for interface, snics in psutil.net_if_addrs().items(): - self.logger.debug(f"New interface found: {interface}") - for snic in snics: - if snic.family == socket.AF_INET: - if snic.address.startswith("10.11.99"): - return snic.address - self.logger.debug(f"Adding new address: {snic.address}") - possible_ips.append(snic.address) - - except Exception as error: - self.logger.error(f"Error automatically getting interfaces: {error}") - - if possible_ips: - host_interfaces = "\n".join(possible_ips) - else: - host_interfaces = "Could not find any available interfaces." - - print(f"\n{host_interfaces}") - while True: - host_address = input( - "\nPlease enter your host IP for the network the device is connected to: " - ) - - if possible_ips and host_address not in host_interfaces.split("\n"): - print("Error: Invalid IP given") - continue - - if "n" in input("Are you sure? (Y/n): ").lower(): - continue - - break - - return host_address - - def get_remarkable_address(self) -> str: - """Gets the IP address of the remarkable device - - Returns: - str: IP address of the remarkable device - """ - - if self.check_is_address_reachable("10.11.99.1"): - return "10.11.99.1" - - while True: - remote_ip = input("Please enter the IP of the remarkable device: ") - - if self.check_is_address_reachable(remote_ip): - return remote_ip - - print(f"Error: Device {remote_ip} is not reachable. Please try again.") - - def check_is_address_reachable(self, remote_ip="10.11.99.1") -> bool: - """Checks if the given IP address is reachable over SSH - - Args: - remote_ip (str, optional): IP to check. Defaults to '10.11.99.1'. - - Returns: - bool: True if reachable, False otherwise - """ - self.logger.debug(f"Checking if {remote_ip} is reachable") - try: - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.settimeout(1) - - sock.connect((remote_ip, 22)) - sock.shutdown(2) - - return True - - except Exception: - self.logger.debug(f"Device {remote_ip} is not reachable") - return False - - def connect_to_device( - self, remote_address=None, authentication=None - ) -> paramiko.client.SSHClient: - """Connects to the device using the given IP address - - Args: - remote_address (str, optional): IP address of the device. - authentication (str, optional): Authentication credentials. Defaults to None. - - Returns: - paramiko.client.SSHClient: SSH client object for the device. - """ - - if remote_address is None: - remote_address = self.get_remarkable_address() - self.address = remote_address # For future reference - else: - if self.check_is_address_reachable(remote_address) is False: - raise SystemError(f"Error: Device {remote_address} is not reachable!") - - client = paramiko.client.SSHClient() - client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - - if authentication: - self.logger.debug(f"Using authentication: {authentication}") - try: - if os.path.isfile(authentication): - self.logger.debug( - f"Attempting to connect to {remote_address} with key file {authentication}" - ) - client.connect( - remote_address, username="root", key_filename=authentication - ) - else: - self.logger.debug( - f"Attempting to connect to {remote_address} with password {authentication}" - ) - client.connect( - remote_address, username="root", password=authentication - ) - - except paramiko.ssh_exception.AuthenticationException: - print("Incorrect password or ssh path given in arguments!") - - elif ( - "n" in input("Would you like to use a password to connect? (Y/n): ").lower() - ): - while True: - key_path = input("Enter path to SSH key: ") - - try: - self.logger.debug( - f"Attempting to connect to {remote_address} with key file {key_path}" - ) - client.connect( - remote_address, username="root", key_filename=key_path - ) - except Exception: - print("Error while connecting to device: {error}") - - continue - break - else: - while True: - password = input("Enter RM SSH password: ") - - try: - self.logger.debug( - f"Attempting to connect to {remote_address} with password {password}" - ) - client.connect(remote_address, username="root", password=password) - except paramiko.ssh_exception.AuthenticationException: - print("Incorrect password given") - - continue - break - - print("Success: Connected to device") - - return client - - def _read_version_from_path(self, ftp=None, base_path: str = "") -> tuple[str, bool]: - """Reads version from a given path (current partition or mounted backup) - - Args: - ftp: SFTP client connection (None for local file access) - base_path: Base path prefix (empty for current partition, /tmp/mount_pX for backup) - - Returns: - tuple: (version_string, old_update_engine_boolean) - """ - update_conf_path = f"{base_path}/usr/share/remarkable/update.conf" if base_path else "/usr/share/remarkable/update.conf" - os_release_path = f"{base_path}/etc/os-release" if base_path else "/etc/os-release" - - if ftp: - def file_exists(path: str) -> bool: - try: - ftp.stat(path) - return True - except FileNotFoundError: - return False - - def read_file(path: str) -> str: - with ftp.file(path) as file: - return file.read().decode("utf-8") - else: - file_exists = os.path.exists - - def read_file(path: str) -> str: - with open(path, encoding="utf-8") as file: - return file.read() - - if file_exists(update_conf_path): - contents = read_file(update_conf_path).strip("\n") - match = re.search("(?<=REMARKABLE_RELEASE_VERSION=).*", contents) - if match: - return match.group(), True - raise SystemError(f"REMARKABLE_RELEASE_VERSION not found in {update_conf_path}") - - if file_exists(os_release_path): - contents = read_file(os_release_path) - match = re.search("(?<=IMG_VERSION=).*", contents) - if match: - return match.group().strip('"'), False - raise SystemError(f"IMG_VERSION not found in {os_release_path}") - - raise SystemError(f"Cannot read version from {base_path or 'current partition'}: no version file found") - - def _get_active_device(self) -> str: - """Gets the active root device path. - - Returns: - str: Active device path (e.g., /dev/mmcblk2p2) - - Raises: - SystemError: If command fails or returns no output - """ - if self.hardware in (HardwareType.RMPP, HardwareType.RMPPM): - cmd = "swupdate -g" - else: - cmd = "rootdev" - - if self.client: - _stdin, stdout, stderr = self.client.exec_command(cmd) - output = stdout.read().decode("utf-8").strip() - exit_status = stdout.channel.recv_exit_status() - if exit_status != 0 or not output: - error = stderr.read().decode("utf-8", errors="ignore") - raise SystemError(f"Failed to get active device using '{cmd}': {error or 'no output'}") - return output - else: - result = subprocess.run(cmd.split(), capture_output=True, text=True) - if result.returncode != 0 or not result.stdout.strip(): - raise SystemError(f"Failed to get active device using '{cmd}': {result.stderr or 'no output'}") - return result.stdout.strip() - - def _parse_partition_info(self, active_device: str) -> tuple[int, int, str]: - """Parse partition numbers from device path. - - Args: - active_device: Device path (e.g., /dev/mmcblk2p2) - - Returns: - tuple: (active_part, inactive_part, device_base) - """ - active_part = int(active_device.split('p')[-1]) - inactive_part = 3 if active_part == 2 else 2 - device_base = re.sub(r'p\d+$', '', active_device) - return active_part, inactive_part, device_base - - def _get_backup_partition_version(self) -> str: - """Gets the version installed on the backup (inactive) partition - - Returns: - str: Version string (empty string for RM1/RM2 on failure) - - Raises: - SystemError: If backup partition version cannot be determined (Paper Pro only) - """ - try: - active_device = self._get_active_device() - _, inactive_part, device_base = self._parse_partition_info(active_device) - mount_point = f"/tmp/mount_p{inactive_part}" - - if self.client: - ftp = self.client.open_sftp() - self.client.exec_command(f"mkdir -p {mount_point}") - _stdin, stdout, _stderr = self.client.exec_command( - f"mount -o ro {device_base}p{inactive_part} {mount_point}" - ) - exit_status = stdout.channel.recv_exit_status() - - if exit_status != 0: - error_msg = _stderr.read().decode('utf-8') - raise SystemError(f"Failed to mount backup partition: {error_msg}") - - try: - version, _ = self._read_version_from_path(ftp, mount_point) - return version - finally: - self.client.exec_command(f"umount {mount_point}") - self.client.exec_command(f"rm -rf {mount_point}") - else: - os.makedirs(mount_point, exist_ok=True) - result = subprocess.run( - ["mount", "-o", "ro", f"{device_base}p{inactive_part}", mount_point], - capture_output=True, text=True - ) - if result.returncode != 0: - raise SystemError(f"Failed to mount backup partition: {result.stderr}") - - try: - version, _ = self._read_version_from_path(base_path=mount_point) - return version - finally: - subprocess.run(["umount", mount_point]) - subprocess.run(["rm", "-rf", mount_point]) - except SystemError: - if self.hardware in (HardwareType.RMPP, HardwareType.RMPPM): - raise - return "" - - def _get_paper_pro_partition_info(self, current_version: str) -> tuple[int, int, int]: - """Gets partition information for Paper Pro devices - - Args: - current_version: Current OS version string for version-aware detection - - Returns: - tuple: (current_partition, inactive_partition, next_boot_partition) - """ - active_device = self._get_active_device() - current_part, inactive_part, _ = self._parse_partition_info(active_device) - - parts = current_version.split('.') - if len(parts) >= 2 and parts[0].isdigit() and parts[1].isdigit(): - is_new_version = [int(parts[0]), int(parts[1])] >= [3, 22] - else: - raise SystemError(f"Cannot detect partition scheme: unexpected version format '{current_version}'") - - next_boot_part = current_part - - if self.client: - ftp = self.client.open_sftp() - - def file_exists(path: str) -> bool: - try: - ftp.stat(path) - return True - except FileNotFoundError: - return False - - def read_file(path: str) -> str: - with ftp.file(path) as file: - return file.read().decode("utf-8") - else: - file_exists = os.path.exists - - def read_file(path: str) -> str: - with open(path, encoding="utf-8") as file: - return file.read() - - if is_new_version: - boot_part_path = "/sys/bus/mmc/devices/mmc0:0001/boot_part" - try: - if file_exists(boot_part_path): - boot_part_value = read_file(boot_part_path).strip() - next_boot_part = 2 if boot_part_value == "1" else 3 - else: - is_new_version = False - except (IOError, OSError): - is_new_version = False - - if not is_new_version: - root_part_path = "/sys/devices/platform/lpgpr/root_part" - try: - if file_exists(root_part_path): - root_part_value = read_file(root_part_path).strip() - next_boot_part = 2 if root_part_value == "a" else 3 - except (IOError, OSError) as e: - self.logger.debug(f"Failed to read next boot partition: {e}") - - return current_part, inactive_part, next_boot_part - - def get_device_status(self) -> tuple[str | None, str, str, str, str]: - """Gets the status of the device - - Returns: - tuple: Beta status, old_update_engine, current version, version_id, backup version (in that order) - """ - old_update_engine = True - version_id = "" - beta_contents = "" - - if self.client: - self.logger.debug("Connecting to FTP") - ftp = self.client.open_sftp() - self.logger.debug("Connected") - - xochitl_version, old_update_engine = self._read_version_from_path(ftp) - def exists(path:str) -> bool: - try: - _ = ftp.stat(path) - return True - except (FileNotFoundError, IOError): - return False - - if exists("/etc/version"): - with ftp.file("/etc/version") as file: - version_id = file.read().decode("utf-8").strip("\n") - - if exists("/home/root/.config/remarkable/xochitl.conf"): - with ftp.file("/home/root/.config/remarkable/xochitl.conf") as file: - beta_contents = file.read().decode("utf-8") - - else: - xochitl_version, old_update_engine = self._read_version_from_path() - - if os.path.exists("/etc/version"): - with open("/etc/version", encoding="utf-8") as file: - version_id = file.read().rstrip() - - if os.path.exists("/home/root/.config/remarkable/xochitl.conf"): - with open("/home/root/.config/remarkable/xochitl.conf", encoding="utf-8") as file: - beta_contents = file.read().rstrip() - - beta_possible = re.search("(?<=GROUP=).*", beta_contents) - beta = "Release" - - if beta_possible is not None: - beta = re.search("(?<=GROUP=).*", beta_contents).group() - - backup_version = self._get_backup_partition_version() - - return beta, old_update_engine, xochitl_version, version_id, backup_version - - def set_server_config(self, contents: str, server_host_name: str) -> str: - """Converts the contents given to point to the given server IP and port - - Args: - contents (str): Contents of the update.conf file - server_host_name (str): Hostname of the server - - Returns: - str: Converted contents - """ - data_attributes = contents.split("\n") - line = 0 - - self.logger.debug(f"Contents are:\n{contents}") - - for i in range(0, len(data_attributes)): - if data_attributes[i].startswith("[General]"): - self.logger.debug("Found [General] line") - line = i + 1 - if not data_attributes[i].startswith("SERVER="): - continue - - data_attributes[i] = f"#{data_attributes[i]}" - self.logger.debug(f"Using {data_attributes[i]}") - - data_attributes.insert(line, f"SERVER={server_host_name}") - converted = "\n".join(data_attributes) - - self.logger.debug(f"Converted contents are:\n{converted}") - - return converted - - def edit_update_conf(self, server_ip: str, server_port: str) -> bool: - """Edits the update.conf file to point to the given server IP and port - - Args: - server_ip (str): IP of update server - server_port (str): Port of update service - - Returns: - bool: True if successful, False otherwise - """ - server_host_name = f"http://{server_ip}:{server_port}" - self.logger.debug(f"Hostname is: {server_host_name}") - try: - if not self.client: - self.logger.debug("Detected running on local device") - with open( - "/usr/share/remarkable/update.conf", encoding="utf-8" - ) as file: - modified_conf_version = self.set_server_config( - file.read(), server_host_name - ) - - with open("/usr/share/remarkable/update.conf", "w") as file: - file.write(modified_conf_version) - - return True - - self.logger.debug("Connecting to FTP") - ftp = self.client.open_sftp() # or ssh - self.logger.debug("Connected") - - with ftp.file("/usr/share/remarkable/update.conf") as update_conf_file: - modified_conf_version = self.set_server_config( - update_conf_file.read().decode("utf-8"), server_host_name - ) - - with ftp.file("/usr/share/remarkable/update.conf", "w") as update_conf_file: - update_conf_file.write(modified_conf_version) - - return True - except Exception as error: - self.logger.error(f"Error while editing update.conf: {error}") - return False - - def restore_previous_version(self) -> None: - """Restores the previous version of the device""" - - RESTORE_CODE = """/sbin/fw_setenv "upgrade_available" "1" -/sbin/fw_setenv "bootcount" "0" - -OLDPART=$(/sbin/fw_printenv -n active_partition) -if [ $OLDPART == "2" ]; then - NEWPART="3" -else - NEWPART="2" -fi -echo "new: ${NEWPART}" -echo "fallback: ${OLDPART}" - -/sbin/fw_setenv "fallback_partition" "${OLDPART}" -/sbin/fw_setenv "active_partition" "${NEWPART}\"""" - - if self.hardware in (HardwareType.RMPP, HardwareType.RMPPM): - _, _, current_version, _, backup_version = self.get_device_status() - current_part, inactive_part, _ = self._get_paper_pro_partition_info(current_version) - - new_part_label = "a" if inactive_part == 2 else "b" - - parts = current_version.split('.') - if len(parts) < 2 or not parts[0].isdigit() or not parts[1].isdigit(): - raise SystemError(f"Cannot restore: unexpected current version format '{current_version}'") - - current_is_new = [int(parts[0]), int(parts[1])] >= [3, 22] - - parts = backup_version.split('.') - if len(parts) < 2 or not parts[0].isdigit() or not parts[1].isdigit(): - raise SystemError(f"Cannot restore: unexpected backup version format '{backup_version}'") - - target_is_new = [int(parts[0]), int(parts[1])] >= [3, 22] - - code = [ - "#!/bin/bash", - f"echo 'Switching from partition {current_part} to partition {inactive_part}'", - f"echo 'Current version: {current_version}'", - f"echo 'Target version: {backup_version}'", - ] - - if not current_is_new: - code.extend([ - f"echo '{new_part_label}' > /sys/devices/platform/lpgpr/root_part", - "echo 'Set next boot via sysfs (legacy method)'", - ]) - - if target_is_new or current_is_new: - code.extend([ - f"mmc bootpart enable {inactive_part - 1} 0 /dev/mmcblk0boot{inactive_part - 2}", - "echo 'Set next boot via mmc bootpart (new method)'", - ]) - - code.extend([ - f"echo '0' > /sys/devices/platform/lpgpr/root{new_part_label}_errcnt 2>/dev/null || true", - "echo 'Partition switch complete'", - ]) - - RESTORE_CODE = "\n".join(code) - - if self.client: - self.logger.debug("Connecting to FTP") - ftp = self.client.open_sftp() - self.logger.debug("Connected") - - with ftp.file("/tmp/restore.sh", "w") as file: - file.write(RESTORE_CODE) - - self.logger.debug("Setting permissions and running restore.sh") - - self.client.exec_command("chmod +x /tmp/restore.sh") - self.client.exec_command("bash /tmp/restore.sh") - else: - with open("/tmp/restore.sh", "w") as file: - file.write(RESTORE_CODE) - - self.logger.debug("Setting permissions and running restore.sh") - - os.system("chmod +x /tmp/restore.sh") - os.system("/tmp/restore.sh") - - self.logger.debug("Restore script ran") - - def reboot_device(self) -> None: - REBOOT_CODE = """ -if systemctl is-active --quiet tarnish.service; then - rot system call reboot -else - systemctl reboot -fi -""" - if self.client: - self.logger.debug("Connecting to FTP") - ftp = self.client.open_sftp() - self.logger.debug("Connected") - with ftp.file("/tmp/reboot.sh", "w") as file: - file.write(REBOOT_CODE) - - self.logger.debug("Running reboot.sh") - self.client.exec_command("sh /tmp/reboot.sh") - - else: - with open("/tmp/reboot.sh", "w") as file: - file.write(REBOOT_CODE) - - self.logger.debug("Running reboot.sh") - os.system("sh /tmp/reboot.sh") - - self.logger.debug("Device rebooted") - - def install_sw_update(self, version_file: str, bootloader_files: dict[str, bytes] | None = None) -> None: - """ - Installs new version from version file path, utilising swupdate - - Args: - version_file (str): Path to img file - bootloader_files (dict[str, bytes] | None): Bootloader files for Paper Pro downgrade - - Raises: - SystemExit: If there was an error installing the update - - """ - if self.client: - ftp_client = self.client.open_sftp() - - print(f"Uploading {version_file} image") - - out_location = f"/tmp/{os.path.basename(version_file)}.swu" - ftp_client.put( - version_file, out_location, callback=self.output_put_progress - ) - - print("\nDone! Running swupdate (PLEASE BE PATIENT, ~5 MINUTES)") - - command = f"bash -c 'source /usr/lib/swupdate/conf.d/09-swupdate-args && swupdate $SWUPDATE_ARGS -i {shlex.quote(out_location)}'" - self.logger.debug(command) - _stdin, stdout, _stderr = self.client.exec_command(command) - - exit_status = stdout.channel.recv_exit_status() - - if exit_status != 0: - print("".join(_stderr.readlines())) - raise SystemError("Update failed!") - - if bootloader_files: - print("\nApplying bootloader update...") - self._update_paper_pro_bootloader( - bootloader_files['update-bootloader.sh'], - bootloader_files['imx-boot'] - ) - print("✓ Bootloader update completed") - - print("Done! Now rebooting the device and disabling update service") - - #### Now disable automatic updates - - self.client.exec_command("sleep 1 && reboot") # Should be enough - self.client.close() - - time.sleep( - 2 - ) # Somehow the code runs faster than the time it takes for the device to reboot - - print("Trying to connect to device") - - while not self.check_is_address_reachable(self.address): - time.sleep(1) - - self.client = self.connect_to_device( - remote_address=self.address, authentication=self.authentication - ) - - self.client.exec_command("systemctl stop swupdate memfaultd") - - print( - "Update complete and update service disabled, restart device to enable it" - ) - - else: - print("Running swupdate") - command = ["bash", "-c", f"source /usr/lib/swupdate/conf.d/09-swupdate-args && swupdate $SWUPDATE_ARGS -i {shlex.quote(version_file)}"] - self.logger.debug(command) - - try: - output = subprocess.check_output( - command, - stderr=subprocess.STDOUT, - text=True, - env={"PATH": "/bin:/usr/bin:/sbin:/usr/sbin"}, - ) - self.logger.debug(f"Stdout of swupdate: {output}") - except subprocess.CalledProcessError as e: - print(e.output) - raise SystemError("Update failed") - - print("Update complete and device rebooting") - os.system("reboot") - - def _update_paper_pro_bootloader(self, bootloader_script: bytes, imx_boot: bytes) -> None: - """ - Update bootloader on Paper Pro device for 3.22+ -> <3.22 downgrades. - - This method uploads the bootloader script and image to the device, - then runs the update script twice (preinst and postinst) to update - both boot partitions. - - Args: - bootloader_script: Contents of update-bootloader.sh - imx_boot: Contents of imx-boot image file - - Raises: - SystemError: If bootloader update fails - """ - self.logger.info("Starting bootloader update for Paper Pro") - - if not self.client: - raise SystemError("No SSH connection to device") - - ftp_client = None - try: - ftp_client = self.client.open_sftp() - except Exception: - raise SystemError("Failed to open SFTP connection for bootloader update") - - script_path = "/tmp/update-bootloader.sh" - boot_image_path = "/tmp/imx-boot" - - with tempfile.NamedTemporaryFile(mode='wb', delete=False, suffix='.sh') as tmp_script: - tmp_script.write(bootloader_script) - tmp_script_path = tmp_script.name - - with tempfile.NamedTemporaryFile(mode='wb', delete=False, suffix='.img') as tmp_boot: - tmp_boot.write(imx_boot) - tmp_boot_path = tmp_boot.name - - try: - self.logger.debug("Uploading bootloader script to device") - ftp_client.put(tmp_script_path, script_path) - - self.logger.debug("Uploading imx-boot image to device") - ftp_client.put(tmp_boot_path, boot_image_path) - - self.logger.debug("Making bootloader script executable") - _stdin, stdout, _stderr = self.client.exec_command(f"chmod +x {script_path}") - stdout.channel.recv_exit_status() - - self.logger.info("Running bootloader update script (preinst)") - _stdin, stdout, stderr = self.client.exec_command( - f"{script_path} preinst {boot_image_path}" - ) - exit_status = stdout.channel.recv_exit_status() - if exit_status != 0: - error_msg = "".join(stderr.readlines()) - raise SystemError(f"Bootloader preinst failed: {error_msg}") - - self.logger.info("Running bootloader update script (postinst)") - _stdin, stdout, stderr = self.client.exec_command( - f"{script_path} postinst {boot_image_path}" - ) - exit_status = stdout.channel.recv_exit_status() - if exit_status != 0: - error_msg = "".join(stderr.readlines()) - raise SystemError(f"Bootloader postinst failed: {error_msg}") - - self.logger.info("Bootloader update completed successfully") - - finally: - self.logger.debug("Cleaning up temporary bootloader files on device") - self.client.exec_command(f"rm -f {script_path} {boot_image_path}") - - self.logger.debug("Cleaning up local temporary files") - os.unlink(tmp_script_path) - os.unlink(tmp_boot_path) - if ftp_client: - ftp_client.close() - - def install_ohma_update(self, version_available: dict) -> None: - """Installs version from update folder on the device - - Args: - version_available (dict): Version available for installation from `get_available_version` - - Raises: - SystemExit: If there was an error installing the update - """ - - server_host = self.get_host_address() - - self.logger.debug("Editing config file") - - if ( - self.edit_update_conf(server_ip=server_host, server_port=8085) is False - ): # We want a port that probably isn't being used - self.logger.error("Error while editing update.conf") - - return - - thread = threading.Thread( - target=startUpdate, args=(version_available, server_host, 8085), daemon=True - ) - thread.start() - - self.logger.debug("Thread started") - - if self.client: - print("Checking if device can connect to this machine") - - _stdin, stdout, _stderr = self.client.exec_command( - f"sleep 2 && echo | nc {server_host} 8085" - ) - check = stdout.channel.recv_exit_status() - self.logger.debug(f"Stdout of nc checking: {stdout.readlines()}") - - if check != 0: - raise SystemError( - "Device cannot connect to this machine! Is the firewall blocking connections?" - ) - - print("Starting update service on device") - - self.client.exec_command("systemctl start update-engine") - - _stdin, stdout, _stderr = self.client.exec_command( - "/usr/bin/update_engine_client -update" - ) - exit_status = stdout.channel.recv_exit_status() - - if exit_status != 0: - print("".join(_stderr.readlines())) - raise SystemError("There was an error updating :(") - - self.logger.debug( - f"Stdout of update checking service is {''.join(_stderr.readlines())}" - ) - - #### Now disable automatic updates - - print("Done! Now rebooting the device and disabling update service") - - self.client.exec_command("sleep 1 && reboot") # Should be enough - self.client.close() - - time.sleep( - 2 - ) # Somehow the code runs faster than the time it takes for the device to reboot - - print("Trying to connect to device") - - while not self.check_is_address_reachable(self.address): - time.sleep(1) - - self.client = self.connect_to_device( - remote_address=self.address, authentication=self.authentication - ) - self.client.exec_command("systemctl stop update-engine") - - print( - "Update complete and update service disabled. Restart device to enable it" - ) - - else: - print("Enabling update service") - - subprocess.run( - ["/bin/systemctl", "start", "update-engine"], - text=True, - check=True, - env={"PATH": "/bin:/usr/bin:/sbin"}, - ) - - with subprocess.Popen( - ["/usr/bin/update_engine_client", "-update"], - text=True, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - env={"PATH": "/bin:/usr/bin:/sbin"}, - ) as process: - if process.wait() != 0: - print("".join(process.stderr.readlines())) - - raise SystemError("There was an error updating :(") - - self.logger.debug( - f"Stdout of update checking service is {''.join(process.stderr.readlines())}" - ) - - print("Update complete and device rebooting") - os.system("reboot") - - @staticmethod - def output_put_progress(transferred: int, toBeTransferred: int) -> None: - """Used for displaying progress for paramiko ftp.put function""" - - print( - f"Transferring progress{int((transferred / toBeTransferred) * 100)}%", - end="\r", - ) +import enum +import logging +import os +import re +import shlex +import socket +import subprocess +import tempfile +import threading +import time + +from .server import startUpdate + +try: + import paramiko + import psutil +except ImportError: + pass + + +class HardwareType(enum.Enum): + RM1 = enum.auto() + RM2 = enum.auto() + RMPP = enum.auto() + RMPPM = enum.auto() + RMPPURE = enum.auto() + + @classmethod + def parse(cls, device_type: str) -> "HardwareType": + match device_type.lower(): + case "ppm" | "rmppm" | "chiappa" | "remarkable chiappa": + return cls.RMPPM + + case "pp" | "pro" | "rmpp" | "ferrari" | "remarkable ferrari": + return cls.RMPP + + case "2" | "rm2" | "remarkable 2" | "remarkable 2.0": + return cls.RM2 + + case ( + "1" + | "rm1" + | "remarkable 1" + | "remarkable 1.0" + | "remarkable prototype 1" + ): + return cls.RM1 + + case "ppure" | "rmppure" | "tatsu" | "remarkable tatsu": + return cls.RMPPURE + + case _: + raise ValueError( + f"Unknown hardware version: {device_type} (rm1, rm2, rmpp, rmppm)" + ) + + @property + def old_download_hw(self): + match self: + case HardwareType.RM1: + return "reMarkable" + case HardwareType.RM2: + return "reMarkable2" + case HardwareType.RMPP | HardwareType.RMPPM | HardwareType.RMPPURE: + raise ValueError( + f"{self.formatted_name} does not support the old update engine" + ) + + @property + def new_download_hw(self): + match self: + case HardwareType.RM1: + return "rm1" + case HardwareType.RM2: + return "rm2" + case HardwareType.RMPP: + return "rmpp" + case HardwareType.RMPPM: + return "rmppm" + case HardwareType.RMPPURE: + return "rmppure" + + @property + def swupdate_hw(self): + match self: + case HardwareType.RM1: + return "reMarkable1" + case HardwareType.RM2: + return "reMarkable2" + case HardwareType.RMPP: + return "ferrari" + case HardwareType.RMPPM: + return "chiappa" + case HardwareType.RMPPURE: + return "tatsu" + + @property + def formatted_name(self): + match self: + case HardwareType.RM1: + return "reMarkable 1" + case HardwareType.RM2: + return "reMarkable 2" + case HardwareType.RMPP: + return "reMarkable Paper Pro" + case HardwareType.RMPPM: + return "reMarkable Paper Pro Move" + case HardwareType.RMPPURE: + return "reMarkable Paper Pure" + + @property + def toltec_type(self): + match self: + case HardwareType.RM1: + return "rm1" + case HardwareType.RM2: + return "rm2" + case HardwareType.RMPP | HardwareType.RMPPM | HardwareType.RMPPURE: + raise ValueError(f"{self.formatted_name} does not support toltec") + + +class DeviceManager: + def __init__( + self, logger=None, remote=False, address=None, authentication=None + ) -> None: + """Initializes the DeviceManager for codexctl + + Args: + remote (bool, optional): Whether the device is remote. Defaults to False. + address (bool, optional): Known IP of remote device, if applicable. Defaults to None. + logger (logger, optional): Logger object for logging. Defaults to None. + Authentication (str, optional): Authentication method. Defaults to None. + """ + self.logger = logger + self.address = address + self.authentication = authentication + self.client = None + + if self.logger is None: + self.logger = logging + + if remote: + self.client = self.connect_to_device( + authentication=authentication, remote_address=address + ) + + self.client.authentication = authentication + self.client.address = address + + ftp = self.client.open_sftp() + with ftp.file("/sys/devices/soc0/machine") as file: + machine_contents = file.read().decode("utf-8").strip("\n") + else: + with open("/sys/devices/soc0/machine") as file: + machine_contents = file.read().strip("\n") + + self.hardware = HardwareType.parse(machine_contents) + + def get_host_address(self) -> list[str] | list | None: # Interaction required + """Gets the IP address of the host machine + + Returns: + str | None: IP address of the host machine, or None if not found + """ + + possible_ips = [] + try: + for interface, snics in psutil.net_if_addrs().items(): + self.logger.debug(f"New interface found: {interface}") + for snic in snics: + if snic.family == socket.AF_INET: + if snic.address.startswith("10.11.99"): + return snic.address + self.logger.debug(f"Adding new address: {snic.address}") + possible_ips.append(snic.address) + + except Exception as error: + self.logger.error(f"Error automatically getting interfaces: {error}") + + if possible_ips: + host_interfaces = "\n".join(possible_ips) + else: + host_interfaces = "Could not find any available interfaces." + + print(f"\n{host_interfaces}") + while True: + host_address = input( + "\nPlease enter your host IP for the network the device is connected to: " + ) + + if possible_ips and host_address not in host_interfaces.split("\n"): + print("Error: Invalid IP given") + continue + + if "n" in input("Are you sure? (Y/n): ").lower(): + continue + + break + + return host_address + + def get_remarkable_address(self) -> str: + """Gets the IP address of the remarkable device + + Returns: + str: IP address of the remarkable device + """ + + if self.check_is_address_reachable("10.11.99.1"): + return "10.11.99.1" + + while True: + remote_ip = input("Please enter the IP of the remarkable device: ") + + if self.check_is_address_reachable(remote_ip): + return remote_ip + + print(f"Error: Device {remote_ip} is not reachable. Please try again.") + + def check_is_address_reachable(self, remote_ip="10.11.99.1") -> bool: + """Checks if the given IP address is reachable over SSH + + Args: + remote_ip (str, optional): IP to check. Defaults to '10.11.99.1'. + + Returns: + bool: True if reachable, False otherwise + """ + self.logger.debug(f"Checking if {remote_ip} is reachable") + try: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(1) + + sock.connect((remote_ip, 22)) + sock.shutdown(2) + + return True + + except Exception: + self.logger.debug(f"Device {remote_ip} is not reachable") + return False + + def connect_to_device( + self, remote_address=None, authentication=None + ) -> paramiko.client.SSHClient: + """Connects to the device using the given IP address + + Args: + remote_address (str, optional): IP address of the device. + authentication (str, optional): Authentication credentials. Defaults to None. + + Returns: + paramiko.client.SSHClient: SSH client object for the device. + """ + + if remote_address is None: + remote_address = self.get_remarkable_address() + self.address = remote_address # For future reference + else: + if self.check_is_address_reachable(remote_address) is False: + raise SystemError(f"Error: Device {remote_address} is not reachable!") + + client = paramiko.client.SSHClient() + client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + + if authentication: + self.logger.debug(f"Using authentication: {authentication}") + try: + if os.path.isfile(authentication): + self.logger.debug( + f"Attempting to connect to {remote_address} with key file {authentication}" + ) + client.connect( + remote_address, username="root", key_filename=authentication + ) + else: + self.logger.debug( + f"Attempting to connect to {remote_address} with password {authentication}" + ) + client.connect( + remote_address, username="root", password=authentication + ) + + except paramiko.ssh_exception.AuthenticationException: + print("Incorrect password or ssh path given in arguments!") + + elif ( + "n" in input("Would you like to use a password to connect? (Y/n): ").lower() + ): + while True: + key_path = input("Enter path to SSH key: ") + + try: + self.logger.debug( + f"Attempting to connect to {remote_address} with key file {key_path}" + ) + client.connect( + remote_address, username="root", key_filename=key_path + ) + except Exception: + print("Error while connecting to device: {error}") + + continue + break + else: + while True: + password = input("Enter RM SSH password: ") + + try: + self.logger.debug( + f"Attempting to connect to {remote_address} with password {password}" + ) + client.connect(remote_address, username="root", password=password) + except paramiko.ssh_exception.AuthenticationException: + print("Incorrect password given") + + continue + break + + print("Success: Connected to device") + + return client + + def _read_version_from_path( + self, ftp=None, base_path: str = "" + ) -> tuple[str, bool]: + """Reads version from a given path (current partition or mounted backup) + + Args: + ftp: SFTP client connection (None for local file access) + base_path: Base path prefix (empty for current partition, /tmp/mount_pX for backup) + + Returns: + tuple: (version_string, old_update_engine_boolean) + """ + update_conf_path = ( + f"{base_path}/usr/share/remarkable/update.conf" + if base_path + else "/usr/share/remarkable/update.conf" + ) + os_release_path = ( + f"{base_path}/etc/os-release" if base_path else "/etc/os-release" + ) + + if ftp: + + def file_exists(path: str) -> bool: + try: + ftp.stat(path) + return True + except FileNotFoundError: + return False + + def read_file(path: str) -> str: + with ftp.file(path) as file: + return file.read().decode("utf-8") + else: + file_exists = os.path.exists + + def read_file(path: str) -> str: + with open(path, encoding="utf-8") as file: + return file.read() + + if file_exists(update_conf_path): + contents = read_file(update_conf_path).strip("\n") + match = re.search("(?<=REMARKABLE_RELEASE_VERSION=).*", contents) + if match: + return match.group(), True + raise SystemError( + f"REMARKABLE_RELEASE_VERSION not found in {update_conf_path}" + ) + + if file_exists(os_release_path): + contents = read_file(os_release_path) + match = re.search("(?<=IMG_VERSION=).*", contents) + if match: + return match.group().strip('"'), False + raise SystemError(f"IMG_VERSION not found in {os_release_path}") + + raise SystemError( + f"Cannot read version from {base_path or 'current partition'}: no version file found" + ) + + def _get_active_device(self) -> str: + """Gets the active root device path. + + Returns: + str: Active device path (e.g., /dev/mmcblk2p2) + + Raises: + SystemError: If command fails or returns no output + """ + if self.hardware in ( + HardwareType.RMPP, + HardwareType.RMPPM, + HardwareType.RMPPURE, + ): + cmd = "swupdate -g" + else: + cmd = "rootdev" + + if self.client: + _stdin, stdout, stderr = self.client.exec_command(cmd) + output = stdout.read().decode("utf-8").strip() + exit_status = stdout.channel.recv_exit_status() + if exit_status != 0 or not output: + error = stderr.read().decode("utf-8", errors="ignore") + raise SystemError( + f"Failed to get active device using '{cmd}': {error or 'no output'}" + ) + return output + else: + result = subprocess.run(cmd.split(), capture_output=True, text=True) + if result.returncode != 0 or not result.stdout.strip(): + raise SystemError( + f"Failed to get active device using '{cmd}': {result.stderr or 'no output'}" + ) + return result.stdout.strip() + + def _parse_partition_info(self, active_device: str) -> tuple[int, int, str]: + """Parse partition numbers from device path. + + Args: + active_device: Device path (e.g., /dev/mmcblk2p2) + + Returns: + tuple: (active_part, inactive_part, device_base) + """ + active_part = int(active_device.split("p")[-1]) + inactive_part = 3 if active_part == 2 else 2 + device_base = re.sub(r"p\d+$", "", active_device) + return active_part, inactive_part, device_base + + def _get_backup_partition_version(self) -> str: + """Gets the version installed on the backup (inactive) partition + + Returns: + str: Version string (empty string for RM1/RM2 on failure) + + Raises: + SystemError: If backup partition version cannot be determined (Paper Pro only) + """ + try: + active_device = self._get_active_device() + _, inactive_part, device_base = self._parse_partition_info(active_device) + mount_point = f"/tmp/mount_p{inactive_part}" + + if self.client: + ftp = self.client.open_sftp() + self.client.exec_command(f"mkdir -p {mount_point}") + _stdin, stdout, _stderr = self.client.exec_command( + f"mount -o ro {device_base}p{inactive_part} {mount_point}" + ) + exit_status = stdout.channel.recv_exit_status() + + if exit_status != 0: + error_msg = _stderr.read().decode("utf-8") + raise SystemError(f"Failed to mount backup partition: {error_msg}") + + try: + version, _ = self._read_version_from_path(ftp, mount_point) + return version + finally: + self.client.exec_command(f"umount {mount_point}") + self.client.exec_command(f"rm -rf {mount_point}") + else: + os.makedirs(mount_point, exist_ok=True) + result = subprocess.run( + [ + "mount", + "-o", + "ro", + f"{device_base}p{inactive_part}", + mount_point, + ], + capture_output=True, + text=True, + ) + if result.returncode != 0: + raise SystemError( + f"Failed to mount backup partition: {result.stderr}" + ) + + try: + version, _ = self._read_version_from_path(base_path=mount_point) + return version + finally: + subprocess.run(["umount", mount_point]) + subprocess.run(["rm", "-rf", mount_point]) + except SystemError: + if self.hardware in ( + HardwareType.RMPP, + HardwareType.RMPPM, + HardwareType.RMPPURE, + ): + raise + return "" + + def _get_paper_pro_partition_info( + self, current_version: str + ) -> tuple[int, int, int]: + """Gets partition information for Paper Pro devices + + Args: + current_version: Current OS version string for version-aware detection + + Returns: + tuple: (current_partition, inactive_partition, next_boot_partition) + """ + active_device = self._get_active_device() + current_part, inactive_part, _ = self._parse_partition_info(active_device) + + parts = current_version.split(".") + if len(parts) >= 2 and parts[0].isdigit() and parts[1].isdigit(): + is_new_version = [int(parts[0]), int(parts[1])] >= [3, 22] + else: + raise SystemError( + f"Cannot detect partition scheme: unexpected version format '{current_version}'" + ) + + next_boot_part = current_part + + if self.client: + ftp = self.client.open_sftp() + + def file_exists(path: str) -> bool: + try: + ftp.stat(path) + return True + except FileNotFoundError: + return False + + def read_file(path: str) -> str: + with ftp.file(path) as file: + return file.read().decode("utf-8") + else: + file_exists = os.path.exists + + def read_file(path: str) -> str: + with open(path, encoding="utf-8") as file: + return file.read() + + if is_new_version: + boot_part_path = "/sys/bus/mmc/devices/mmc0:0001/boot_part" + try: + if file_exists(boot_part_path): + boot_part_value = read_file(boot_part_path).strip() + next_boot_part = 2 if boot_part_value == "1" else 3 + else: + is_new_version = False + except (IOError, OSError): + is_new_version = False + + if not is_new_version: + root_part_path = "/sys/devices/platform/lpgpr/root_part" + try: + if file_exists(root_part_path): + root_part_value = read_file(root_part_path).strip() + next_boot_part = 2 if root_part_value == "a" else 3 + except (IOError, OSError) as e: + self.logger.debug(f"Failed to read next boot partition: {e}") + + return current_part, inactive_part, next_boot_part + + def get_device_status(self) -> tuple[str | None, str, str, str, str]: + """Gets the status of the device + + Returns: + tuple: Beta status, old_update_engine, current version, version_id, backup version (in that order) + """ + old_update_engine = True + version_id = "" + beta_contents = "" + + if self.client: + self.logger.debug("Connecting to FTP") + ftp = self.client.open_sftp() + self.logger.debug("Connected") + + xochitl_version, old_update_engine = self._read_version_from_path(ftp) + + def exists(path: str) -> bool: + try: + _ = ftp.stat(path) + return True + except (FileNotFoundError, IOError): + return False + + if exists("/etc/version"): + with ftp.file("/etc/version") as file: + version_id = file.read().decode("utf-8").strip("\n") + + if exists("/home/root/.config/remarkable/xochitl.conf"): + with ftp.file("/home/root/.config/remarkable/xochitl.conf") as file: + beta_contents = file.read().decode("utf-8") + + else: + xochitl_version, old_update_engine = self._read_version_from_path() + + if os.path.exists("/etc/version"): + with open("/etc/version", encoding="utf-8") as file: + version_id = file.read().rstrip() + + if os.path.exists("/home/root/.config/remarkable/xochitl.conf"): + with open( + "/home/root/.config/remarkable/xochitl.conf", encoding="utf-8" + ) as file: + beta_contents = file.read().rstrip() + + beta_possible = re.search("(?<=GROUP=).*", beta_contents) + beta = "Release" + + if beta_possible is not None: + beta = re.search("(?<=GROUP=).*", beta_contents).group() + + backup_version = self._get_backup_partition_version() + + return beta, old_update_engine, xochitl_version, version_id, backup_version + + def set_server_config(self, contents: str, server_host_name: str) -> str: + """Converts the contents given to point to the given server IP and port + + Args: + contents (str): Contents of the update.conf file + server_host_name (str): Hostname of the server + + Returns: + str: Converted contents + """ + data_attributes = contents.split("\n") + line = 0 + + self.logger.debug(f"Contents are:\n{contents}") + + for i in range(0, len(data_attributes)): + if data_attributes[i].startswith("[General]"): + self.logger.debug("Found [General] line") + line = i + 1 + if not data_attributes[i].startswith("SERVER="): + continue + + data_attributes[i] = f"#{data_attributes[i]}" + self.logger.debug(f"Using {data_attributes[i]}") + + data_attributes.insert(line, f"SERVER={server_host_name}") + converted = "\n".join(data_attributes) + + self.logger.debug(f"Converted contents are:\n{converted}") + + return converted + + def edit_update_conf(self, server_ip: str, server_port: str) -> bool: + """Edits the update.conf file to point to the given server IP and port + + Args: + server_ip (str): IP of update server + server_port (str): Port of update service + + Returns: + bool: True if successful, False otherwise + """ + server_host_name = f"http://{server_ip}:{server_port}" + self.logger.debug(f"Hostname is: {server_host_name}") + try: + if not self.client: + self.logger.debug("Detected running on local device") + with open( + "/usr/share/remarkable/update.conf", encoding="utf-8" + ) as file: + modified_conf_version = self.set_server_config( + file.read(), server_host_name + ) + + with open("/usr/share/remarkable/update.conf", "w") as file: + file.write(modified_conf_version) + + return True + + self.logger.debug("Connecting to FTP") + ftp = self.client.open_sftp() # or ssh + self.logger.debug("Connected") + + with ftp.file("/usr/share/remarkable/update.conf") as update_conf_file: + modified_conf_version = self.set_server_config( + update_conf_file.read().decode("utf-8"), server_host_name + ) + + with ftp.file("/usr/share/remarkable/update.conf", "w") as update_conf_file: + update_conf_file.write(modified_conf_version) + + return True + except Exception as error: + self.logger.error(f"Error while editing update.conf: {error}") + return False + + def restore_previous_version(self) -> None: + """Restores the previous version of the device""" + + RESTORE_CODE = """/sbin/fw_setenv "upgrade_available" "1" +/sbin/fw_setenv "bootcount" "0" + +OLDPART=$(/sbin/fw_printenv -n active_partition) +if [ $OLDPART == "2" ]; then + NEWPART="3" +else + NEWPART="2" +fi +echo "new: ${NEWPART}" +echo "fallback: ${OLDPART}" + +/sbin/fw_setenv "fallback_partition" "${OLDPART}" +/sbin/fw_setenv "active_partition" "${NEWPART}\"""" + + if self.hardware in ( + HardwareType.RMPP, + HardwareType.RMPPM, + HardwareType.RMPPURE, + ): + _, _, current_version, _, backup_version = self.get_device_status() + current_part, inactive_part, _ = self._get_paper_pro_partition_info( + current_version + ) + + new_part_label = "a" if inactive_part == 2 else "b" + + parts = current_version.split(".") + if len(parts) < 2 or not parts[0].isdigit() or not parts[1].isdigit(): + raise SystemError( + f"Cannot restore: unexpected current version format '{current_version}'" + ) + + current_is_new = [int(parts[0]), int(parts[1])] >= [3, 22] + + parts = backup_version.split(".") + if len(parts) < 2 or not parts[0].isdigit() or not parts[1].isdigit(): + raise SystemError( + f"Cannot restore: unexpected backup version format '{backup_version}'" + ) + + target_is_new = [int(parts[0]), int(parts[1])] >= [3, 22] + + code = [ + "#!/bin/bash", + f"echo 'Switching from partition {current_part} to partition {inactive_part}'", + f"echo 'Current version: {current_version}'", + f"echo 'Target version: {backup_version}'", + ] + + if not current_is_new: + code.extend( + [ + f"echo '{new_part_label}' > /sys/devices/platform/lpgpr/root_part", + "echo 'Set next boot via sysfs (legacy method)'", + ] + ) + + if target_is_new or current_is_new: + code.extend( + [ + f"mmc bootpart enable {inactive_part - 1} 0 /dev/mmcblk0boot{inactive_part - 2}", + "echo 'Set next boot via mmc bootpart (new method)'", + ] + ) + + code.extend( + [ + f"echo '0' > /sys/devices/platform/lpgpr/root{new_part_label}_errcnt 2>/dev/null || true", + "echo 'Partition switch complete'", + ] + ) + + RESTORE_CODE = "\n".join(code) + + if self.client: + self.logger.debug("Connecting to FTP") + ftp = self.client.open_sftp() + self.logger.debug("Connected") + + with ftp.file("/tmp/restore.sh", "w") as file: + file.write(RESTORE_CODE) + + self.logger.debug("Setting permissions and running restore.sh") + + self.client.exec_command("chmod +x /tmp/restore.sh") + self.client.exec_command("bash /tmp/restore.sh") + else: + with open("/tmp/restore.sh", "w") as file: + file.write(RESTORE_CODE) + + self.logger.debug("Setting permissions and running restore.sh") + + os.system("chmod +x /tmp/restore.sh") + os.system("/tmp/restore.sh") + + self.logger.debug("Restore script ran") + + def reboot_device(self) -> None: + REBOOT_CODE = """ +if systemctl is-active --quiet tarnish.service; then + rot system call reboot +else + systemctl reboot +fi +""" + if self.client: + self.logger.debug("Connecting to FTP") + ftp = self.client.open_sftp() + self.logger.debug("Connected") + with ftp.file("/tmp/reboot.sh", "w") as file: + file.write(REBOOT_CODE) + + self.logger.debug("Running reboot.sh") + self.client.exec_command("sh /tmp/reboot.sh") + + else: + with open("/tmp/reboot.sh", "w") as file: + file.write(REBOOT_CODE) + + self.logger.debug("Running reboot.sh") + os.system("sh /tmp/reboot.sh") + + self.logger.debug("Device rebooted") + + def install_sw_update( + self, version_file: str, bootloader_files: dict[str, bytes] | None = None + ) -> None: + """ + Installs new version from version file path, utilising swupdate + + Args: + version_file (str): Path to img file + bootloader_files (dict[str, bytes] | None): Bootloader files for Paper Pro downgrade + + Raises: + SystemExit: If there was an error installing the update + + """ + if self.client: + ftp_client = self.client.open_sftp() + + print(f"Uploading {version_file} image") + + out_location = f"/tmp/{os.path.basename(version_file)}.swu" + ftp_client.put( + version_file, out_location, callback=self.output_put_progress + ) + + print("\nDone! Running swupdate (PLEASE BE PATIENT, ~5 MINUTES)") + + command = f"bash -c 'source /usr/lib/swupdate/conf.d/09-swupdate-args && swupdate $SWUPDATE_ARGS -i {shlex.quote(out_location)}'" + self.logger.debug(command) + _stdin, stdout, _stderr = self.client.exec_command(command) + + exit_status = stdout.channel.recv_exit_status() + + if exit_status != 0: + print("".join(_stderr.readlines())) + raise SystemError("Update failed!") + + if bootloader_files: + print("\nApplying bootloader update...") + self._update_paper_pro_bootloader( + bootloader_files["update-bootloader.sh"], + bootloader_files["imx-boot"], + ) + print("✓ Bootloader update completed") + + print("Done! Now rebooting the device and disabling update service") + + #### Now disable automatic updates + + self.client.exec_command("sleep 1 && reboot") # Should be enough + self.client.close() + + time.sleep( + 2 + ) # Somehow the code runs faster than the time it takes for the device to reboot + + print("Trying to connect to device") + + while not self.check_is_address_reachable(self.address): + time.sleep(1) + + self.client = self.connect_to_device( + remote_address=self.address, authentication=self.authentication + ) + + self.client.exec_command("systemctl stop swupdate memfaultd") + + print( + "Update complete and update service disabled, restart device to enable it" + ) + + else: + print("Running swupdate") + command = [ + "bash", + "-c", + f"source /usr/lib/swupdate/conf.d/09-swupdate-args && swupdate $SWUPDATE_ARGS -i {shlex.quote(version_file)}", + ] + self.logger.debug(command) + + try: + output = subprocess.check_output( + command, + stderr=subprocess.STDOUT, + text=True, + env={"PATH": "/bin:/usr/bin:/sbin:/usr/sbin"}, + ) + self.logger.debug(f"Stdout of swupdate: {output}") + except subprocess.CalledProcessError as e: + print(e.output) + raise SystemError("Update failed") + + print("Update complete and device rebooting") + os.system("reboot") + + def _update_paper_pro_bootloader( + self, bootloader_script: bytes, imx_boot: bytes + ) -> None: + """ + Update bootloader on Paper Pro device for 3.22+ -> <3.22 downgrades. + + This method uploads the bootloader script and image to the device, + then runs the update script twice (preinst and postinst) to update + both boot partitions. + + Args: + bootloader_script: Contents of update-bootloader.sh + imx_boot: Contents of imx-boot image file + + Raises: + SystemError: If bootloader update fails + """ + self.logger.info("Starting bootloader update for Paper Pro") + + if not self.client: + raise SystemError("No SSH connection to device") + + ftp_client = None + try: + ftp_client = self.client.open_sftp() + except Exception: + raise SystemError("Failed to open SFTP connection for bootloader update") + + script_path = "/tmp/update-bootloader.sh" + boot_image_path = "/tmp/imx-boot" + + with tempfile.NamedTemporaryFile( + mode="wb", delete=False, suffix=".sh" + ) as tmp_script: + tmp_script.write(bootloader_script) + tmp_script_path = tmp_script.name + + with tempfile.NamedTemporaryFile( + mode="wb", delete=False, suffix=".img" + ) as tmp_boot: + tmp_boot.write(imx_boot) + tmp_boot_path = tmp_boot.name + + try: + self.logger.debug("Uploading bootloader script to device") + ftp_client.put(tmp_script_path, script_path) + + self.logger.debug("Uploading imx-boot image to device") + ftp_client.put(tmp_boot_path, boot_image_path) + + self.logger.debug("Making bootloader script executable") + _stdin, stdout, _stderr = self.client.exec_command( + f"chmod +x {script_path}" + ) + stdout.channel.recv_exit_status() + + self.logger.info("Running bootloader update script (preinst)") + _stdin, stdout, stderr = self.client.exec_command( + f"{script_path} preinst {boot_image_path}" + ) + exit_status = stdout.channel.recv_exit_status() + if exit_status != 0: + error_msg = "".join(stderr.readlines()) + raise SystemError(f"Bootloader preinst failed: {error_msg}") + + self.logger.info("Running bootloader update script (postinst)") + _stdin, stdout, stderr = self.client.exec_command( + f"{script_path} postinst {boot_image_path}" + ) + exit_status = stdout.channel.recv_exit_status() + if exit_status != 0: + error_msg = "".join(stderr.readlines()) + raise SystemError(f"Bootloader postinst failed: {error_msg}") + + self.logger.info("Bootloader update completed successfully") + + finally: + self.logger.debug("Cleaning up temporary bootloader files on device") + self.client.exec_command(f"rm -f {script_path} {boot_image_path}") + + self.logger.debug("Cleaning up local temporary files") + os.unlink(tmp_script_path) + os.unlink(tmp_boot_path) + if ftp_client: + ftp_client.close() + + def install_ohma_update(self, version_available: dict) -> None: + """Installs version from update folder on the device + + Args: + version_available (dict): Version available for installation from `get_available_version` + + Raises: + SystemExit: If there was an error installing the update + """ + + server_host = self.get_host_address() + + self.logger.debug("Editing config file") + + if ( + self.edit_update_conf(server_ip=server_host, server_port=8085) is False + ): # We want a port that probably isn't being used + self.logger.error("Error while editing update.conf") + + return + + thread = threading.Thread( + target=startUpdate, args=(version_available, server_host, 8085), daemon=True + ) + thread.start() + + self.logger.debug("Thread started") + + if self.client: + print("Checking if device can connect to this machine") + + _stdin, stdout, _stderr = self.client.exec_command( + f"sleep 2 && echo | nc {server_host} 8085" + ) + check = stdout.channel.recv_exit_status() + self.logger.debug(f"Stdout of nc checking: {stdout.readlines()}") + + if check != 0: + raise SystemError( + "Device cannot connect to this machine! Is the firewall blocking connections?" + ) + + print("Starting update service on device") + + self.client.exec_command("systemctl start update-engine") + + _stdin, stdout, _stderr = self.client.exec_command( + "/usr/bin/update_engine_client -update" + ) + exit_status = stdout.channel.recv_exit_status() + + if exit_status != 0: + print("".join(_stderr.readlines())) + raise SystemError("There was an error updating :(") + + self.logger.debug( + f"Stdout of update checking service is {''.join(_stderr.readlines())}" + ) + + #### Now disable automatic updates + + print("Done! Now rebooting the device and disabling update service") + + self.client.exec_command("sleep 1 && reboot") # Should be enough + self.client.close() + + time.sleep( + 2 + ) # Somehow the code runs faster than the time it takes for the device to reboot + + print("Trying to connect to device") + + while not self.check_is_address_reachable(self.address): + time.sleep(1) + + self.client = self.connect_to_device( + remote_address=self.address, authentication=self.authentication + ) + self.client.exec_command("systemctl stop update-engine") + + print( + "Update complete and update service disabled. Restart device to enable it" + ) + + else: + print("Enabling update service") + + subprocess.run( + ["/bin/systemctl", "start", "update-engine"], + text=True, + check=True, + env={"PATH": "/bin:/usr/bin:/sbin"}, + ) + + with subprocess.Popen( + ["/usr/bin/update_engine_client", "-update"], + text=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + env={"PATH": "/bin:/usr/bin:/sbin"}, + ) as process: + if process.wait() != 0: + print("".join(process.stderr.readlines())) + + raise SystemError("There was an error updating :(") + + self.logger.debug( + f"Stdout of update checking service is {''.join(process.stderr.readlines())}" + ) + + print("Update complete and device rebooting") + os.system("reboot") + + @staticmethod + def output_put_progress(transferred: int, toBeTransferred: int) -> None: + """Used for displaying progress for paramiko ftp.put function""" + + print( + f"Transferring progress{int((transferred / toBeTransferred) * 100)}%", + end="\r", + ) diff --git a/codexctl/server.py b/codexctl/server.py index 766d8f7..46072a4 100644 --- a/codexctl/server.py +++ b/codexctl/server.py @@ -1,172 +1,172 @@ -from http.server import HTTPServer, SimpleHTTPRequestHandler -import xml.etree.ElementTree as ET -import os -import hashlib -import binascii - -response_ok = """ - - - - - - - -""" - -response_template = """ - - - - - - - - - - - - - - - - - - - - -""" - - -def getupdateinfo(platform, version, update_name): - full_path = os.path.join("updates", update_name) - - update_size = str(os.path.getsize(full_path)) - - BUF_SIZE = 8192 - - sha1 = hashlib.sha1() - sha256 = hashlib.sha256() - with open(full_path, "rb") as f: - while True: - data = f.read(BUF_SIZE) - if not data: - break - sha1.update(data) - sha256.update(data) - update_sha1 = binascii.b2a_base64(sha1.digest(), newline=False).decode() - update_sha256 = binascii.b2a_base64(sha256.digest(), newline=False).decode() - return (update_sha1, update_sha256, update_size) - - -def get_available_version(version): - available_versions = scanUpdates() - - for device, ids in available_versions.items(): - if version in ids: - available_version = {device: ids} - - return available_version - - -def scanUpdates(): - files = os.listdir("updates") - versions = {} - - for f in files: - p = f.split("_") - if len(p) != 2: - continue - t = p[1].split(".") - if len(t) != 2: - continue - - z = t[0].split("-") - - version = p[0] - # print(version) - product = z[0] - - if product not in versions or versions[product][0] < version: - versions[product] = (version, f) - - return versions - - -class MySimpleHTTPRequestHandler(SimpleHTTPRequestHandler): - def do_POST(self): - length = int(self.headers.get("Content-Length")) - body = self.rfile.read(length).decode("utf-8") - # print(body) - print("Updating...") - xml = ET.fromstring(body) - updatecheck_node = xml.find("app/updatecheck") - - # check for update - if updatecheck_node is not None: - version = xml.attrib["version"] - platform = xml.find("os").attrib["platform"] - print("requested: ", version) - print("platform: ", platform) - - version, update_name = available_versions[platform] - - update_sha1, update_sha256, update_size = getupdateinfo( - platform, version, update_name - ) - params = { - "version": version, - "update_name": f"updates/{update_name}", - "update_sha1": update_sha1, - "update_sha256": update_sha256, - "update_size": update_size, - "codebase_url": host_url, - } - - response = response_template.format(**params) - # print("Response:") - # print(response) - self.send_response(200) - self.end_headers() - self.wfile.write(response.encode()) - return - - event_node = xml.find("app/event") - event_type = int(event_node.attrib["eventtype"]) - event_result = int(event_node.attrib["eventresult"]) - - # post install status - if event_result != 0: - print("Update downloaded, please wait for device to install...") - if "errorcode" in event_node.attrib: - print("With errorcode:", event_node.attrib["errorcode"]) - return - - # update done - if event_type == 14: - print("OK Response:") - print(response_ok) - self.send_response(200) - self.end_headers() - self.wfile.write(response_ok.encode()) - return - - -def startUpdate(versionsGiven, host, port=8080): - global available_versions - global host_url # I am aware globals are generally bad practice, but this is a quick and dirty solution - - host_url = f"http://{host}:{port}/" - available_versions = versionsGiven - - if not available_versions: - raise FileNotFoundError("Could not find any update files") - - handler = MySimpleHTTPRequestHandler - print(f"Starting fake updater at {host}:{port}") - try: - httpd = HTTPServer((host, port), handler) - except OSError: - print("Error: Could not start fake updater. Is the port already in use?") - return - httpd.serve_forever() +from http.server import HTTPServer, SimpleHTTPRequestHandler +import xml.etree.ElementTree as ET +import os +import hashlib +import binascii + +response_ok = """ + + + + + + + +""" + +response_template = """ + + + + + + + + + + + + + + + + + + + + +""" + + +def getupdateinfo(platform, version, update_name): + full_path = os.path.join("updates", update_name) + + update_size = str(os.path.getsize(full_path)) + + BUF_SIZE = 8192 + + sha1 = hashlib.sha1() + sha256 = hashlib.sha256() + with open(full_path, "rb") as f: + while True: + data = f.read(BUF_SIZE) + if not data: + break + sha1.update(data) + sha256.update(data) + update_sha1 = binascii.b2a_base64(sha1.digest(), newline=False).decode() + update_sha256 = binascii.b2a_base64(sha256.digest(), newline=False).decode() + return (update_sha1, update_sha256, update_size) + + +def get_available_version(version): + available_versions = scanUpdates() + + for device, ids in available_versions.items(): + if version in ids: + available_version = {device: ids} + + return available_version + + +def scanUpdates(): + files = os.listdir("updates") + versions = {} + + for f in files: + p = f.split("_") + if len(p) != 2: + continue + t = p[1].split(".") + if len(t) != 2: + continue + + z = t[0].split("-") + + version = p[0] + # print(version) + product = z[0] + + if product not in versions or versions[product][0] < version: + versions[product] = (version, f) + + return versions + + +class MySimpleHTTPRequestHandler(SimpleHTTPRequestHandler): + def do_POST(self): + length = int(self.headers.get("Content-Length")) + body = self.rfile.read(length).decode("utf-8") + # print(body) + print("Updating...") + xml = ET.fromstring(body) + updatecheck_node = xml.find("app/updatecheck") + + # check for update + if updatecheck_node is not None: + version = xml.attrib["version"] + platform = xml.find("os").attrib["platform"] + print("requested: ", version) + print("platform: ", platform) + + version, update_name = available_versions[platform] + + update_sha1, update_sha256, update_size = getupdateinfo( + platform, version, update_name + ) + params = { + "version": version, + "update_name": f"updates/{update_name}", + "update_sha1": update_sha1, + "update_sha256": update_sha256, + "update_size": update_size, + "codebase_url": host_url, + } + + response = response_template.format(**params) + # print("Response:") + # print(response) + self.send_response(200) + self.end_headers() + self.wfile.write(response.encode()) + return + + event_node = xml.find("app/event") + event_type = int(event_node.attrib["eventtype"]) + event_result = int(event_node.attrib["eventresult"]) + + # post install status + if event_result != 0: + print("Update downloaded, please wait for device to install...") + if "errorcode" in event_node.attrib: + print("With errorcode:", event_node.attrib["errorcode"]) + return + + # update done + if event_type == 14: + print("OK Response:") + print(response_ok) + self.send_response(200) + self.end_headers() + self.wfile.write(response_ok.encode()) + return + + +def startUpdate(versionsGiven, host, port=8080): + global available_versions + global host_url # I am aware globals are generally bad practice, but this is a quick and dirty solution + + host_url = f"http://{host}:{port}/" + available_versions = versionsGiven + + if not available_versions: + raise FileNotFoundError("Could not find any update files") + + handler = MySimpleHTTPRequestHandler + print(f"Starting fake updater at {host}:{port}") + try: + httpd = HTTPServer((host, port), handler) + except OSError: + print("Error: Could not start fake updater. Is the port already in use?") + return + httpd.serve_forever() diff --git a/codexctl/sync.py b/codexctl/sync.py index 5d80d37..95067c5 100644 --- a/codexctl/sync.py +++ b/codexctl/sync.py @@ -1,243 +1,246 @@ -import glob -import logging -import os -import time - -import requests - - -class RmWebInterfaceAPI: # TODO: Add docstrings - def __init__(self, BASE="http://10.11.99.1/", logger=None): - self.logger = logger - - if self.logger is None: - self.logger = logging - - self.BASE = BASE - self.ID_ATTRIBUTE = "ID" - self.NAME_ATTRIBUTE = "VissibleName" - self.MTIME_ATTRIBUTE = "ModifiedClient" - - self.logger.debug(f"Base is: {BASE}") - - def __POST(self, endpoint, data={}, fileUpload=False): - try: - logging.debug( - f"Sending POST request to {self.BASE + endpoint} with data {data}" - ) - - if fileUpload: - result = requests.post(self.BASE + endpoint, files=data) - else: - result = requests.post(self.BASE + endpoint, data=data) - - if result.status_code == 408: - self.logger.error("Request timed out!") - - logging.debug(f"Result headers: {result.headers}") - if "application/json" in result.headers["Content-Type"]: - return result.json() - return result.content - - except Exception: - return None - - def __get_documents_recursive( - self, folderId="", currentLocation="", currentDocuments=[] - ): - data = self.__POST(f"documents/{folderId}") - - for item in data: - self.logger.debug(f"Checking item: {item}") - - if "fileType" in item: - item["location"] = currentLocation - currentDocuments.append(item) - else: - self.logger.debug( - f"Getting documents over {item[self.ID_ATTRIBUTE]}, current location is {currentLocation}/{item[self.NAME_ATTRIBUTE]}" - ) - self.__get_documents_recursive( - item[self.ID_ATTRIBUTE], - f"{currentLocation}/{item[self.NAME_ATTRIBUTE]}", - currentDocuments, - ) - - return currentDocuments - - def __get_folder_id(self, folderName, _from=""): - results = self.__POST(f"documents/{_from}") - - if results is None: - return None - - results.reverse() # We only want folders - - for data in results: - self.logger.debug(f"Folder: {data}") - - if "fileType" in data: - return None - - if data[self.NAME_ATTRIBUTE].strip() == folderName.strip(): - return data[self.ID_ATTRIBUTE] - - self.logger.debug( - f"Getting folders over {folderName}, {data[self.ID_ATTRIBUTE]}" - ) - - recursiveResults = self.__get_folder_id(folderName, data[self.ID_ATTRIBUTE]) - if recursiveResults is None: - continue - else: - return recursiveResults - - def __get_docs(self, folderName="", recursive=True): - folderId = "" - - if folderName: - folderId = self.__get_folder_id(folderName) - - if folderId is None: - return {} - - if recursive: - self.logger.debug(f"Calling recursive function on {folderName}") - return self.__get_documents_recursive( - folderId=folderId, currentLocation=folderName - ) - - data = self.__POST(f"documents/{folderId}") - - for item in data: - item["location"] = "" - - return [item for item in data if "fileType" in item] - - def download(self, document, location="", overwrite=False, incremental=False): - filename = document[self.NAME_ATTRIBUTE] - if "/" in filename: - filename = filename.replace("/", "_") - - self.logger.debug(f"Downloading {filename}, location {location}") - - if not os.path.exists(location): - self.logger.debug("Download folder does not exist, creating it") - os.makedirs(location) - - try: - fileLocation = f"{location}/{filename}.pdf" - isFile = os.path.isfile(fileLocation) - - if isFile and not overwrite: - self.logger.debug("Not overwriting file") - return True - - if isFile and incremental and not self.__is_newer(document, fileLocation): - self.logger.debug("Local file already exists and is newer, skipping") - return True - - binaryData = self.__POST( - f"download/{document[self.ID_ATTRIBUTE]}/placeholder" - ) - - if isinstance(binaryData, dict): - print(f"Error trying to download {filename}: {binaryData}") - return False - - with open(fileLocation, "wb") as outFile: - outFile.write(binaryData) - - return True - - except Exception as error: - print(f"Error trying to download {filename}: {error}") - return False - - def __is_newer(self, document, fileLocation): - remote_ts = document[self.MTIME_ATTRIBUTE] - - local_mtime = os.path.getmtime(fileLocation) - local_ts = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(local_mtime)) - - return remote_ts > local_ts - - def upload(self, input_paths, remoteFolder): - folderId = "" - if remoteFolder: - folderId = self.__get_folder_id(remoteFolder) - - if folderId is None: - raise SystemError(f"Error: Folder {remoteFolder} does not exist!") - - self.__POST(f"documents/{folderId}") # Setting up for upload... - - errors, documents = [], [] - - for document in input_paths: # This needs improvement... - if os.path.isdir(document): - for file in glob.glob(f"{document}/*"): - if not file.endswith(".pdf"): - self.logger.error(f"Error: {document} is not a pdf!") - else: - documents.append(file) - elif os.path.isfile(document): - if not document.endswith(".pdf"): - errors.append(document) - self.logger.error(f"Error: {document} is not a pdf!") - else: - documents.append(document) - else: - errors.append(document) - self.logger.error(f"Error: {document} is not a file or directory!") - - for document in documents: - self.logger.debug( - f"Uploading {document} to {remoteFolder if remoteFolder else 'root'}" - ) - with open(document, "rb") as inFile: - response = self.__POST("upload", data={"file": inFile}, fileUpload=True) - - if response is None: - self.logger.error( - f"Error: Unknown error while uploading {document}!" - ) - errors.append(document) - elif response == {"status": "Upload successful"}: - self.logger.debug(f"Uploaded {document} successfully!") - - if len(errors) > 0: - print("The following files failed to upload: " + ",".join(errors)) - - print(f"Done! {len(documents)-len(errors)} files were uploaded.") - - def sync( - self, - localFolder, - remoteFolder="", - overwrite=False, - incremental=False, - recursive=True, - ): - count = 0 - - if not os.path.exists(localFolder): - self.logger.debug("Local folder does not exist, creating it") - os.mkdir(localFolder) - - documents = self.__get_docs(remoteFolder, recursive) - - if documents == {}: - print("No documents were found!") - - else: - for doc in documents: - self.logger.debug(f"Processing {doc}") - count += 1 - self.download( - document=doc, - location=f"{localFolder}/{doc['location']}", - overwrite=overwrite, - incremental=incremental, - ) - print(f"Done! {count} files were exported.") +import glob +import logging +import os +import time + +import requests + + +class RmWebInterfaceAPI: # TODO: Add docstrings + def __init__(self, BASE="http://10.11.99.1/", logger=None): + self.logger = logger + + if self.logger is None: + self.logger = logging + + self.BASE = BASE + self.ID_ATTRIBUTE = "ID" + self.NAME_ATTRIBUTE = "VissibleName" + self.MTIME_ATTRIBUTE = "ModifiedClient" + + self.logger.debug(f"Base is: {BASE}") + + def __POST(self, endpoint, data={}, fileUpload=False): + try: + logging.debug( + f"Sending POST request to {self.BASE + endpoint} with data {data}" + ) + + if fileUpload: + result = requests.post(self.BASE + endpoint, files=data) + else: + result = requests.post(self.BASE + endpoint, data=data) + + if result.status_code == 408: + self.logger.error("Request timed out!") + + logging.debug(f"Result headers: {result.headers}") + if "application/json" in result.headers["Content-Type"]: + return result.json() + return result.content + + except Exception: + return None + + def __get_documents_recursive( + self, folderId="", currentLocation="", currentDocuments=None + ): + if currentDocuments is None: + currentDocuments = [] + + data = self.__POST(f"documents/{folderId}") + + for item in data: + self.logger.debug(f"Checking item: {item}") + + if "fileType" in item: + item["location"] = currentLocation + currentDocuments.append(item) + else: + self.logger.debug( + f"Getting documents over {item[self.ID_ATTRIBUTE]}, current location is {currentLocation}/{item[self.NAME_ATTRIBUTE]}" + ) + self.__get_documents_recursive( + item[self.ID_ATTRIBUTE], + f"{currentLocation}/{item[self.NAME_ATTRIBUTE]}", + currentDocuments, + ) + + return currentDocuments + + def __get_folder_id(self, folderName, _from=""): + results = self.__POST(f"documents/{_from}") + + if results is None: + return None + + results.reverse() # We only want folders + + for data in results: + self.logger.debug(f"Folder: {data}") + + if "fileType" in data: + return None + + if data[self.NAME_ATTRIBUTE].strip() == folderName.strip(): + return data[self.ID_ATTRIBUTE] + + self.logger.debug( + f"Getting folders over {folderName}, {data[self.ID_ATTRIBUTE]}" + ) + + recursiveResults = self.__get_folder_id(folderName, data[self.ID_ATTRIBUTE]) + if recursiveResults is None: + continue + else: + return recursiveResults + + def __get_docs(self, folderName="", recursive=True): + folderId = "" + + if folderName: + folderId = self.__get_folder_id(folderName) + + if folderId is None: + return {} + + if recursive: + self.logger.debug(f"Calling recursive function on {folderName}") + return self.__get_documents_recursive( + folderId=folderId, currentLocation=folderName + ) + + data = self.__POST(f"documents/{folderId}") + + for item in data: + item["location"] = "" + + return [item for item in data if "fileType" in item] + + def download(self, document, location="", overwrite=False, incremental=False): + filename = document[self.NAME_ATTRIBUTE] + if "/" in filename: + filename = filename.replace("/", "_") + + self.logger.debug(f"Downloading {filename}, location {location}") + + if not os.path.exists(location): + self.logger.debug("Download folder does not exist, creating it") + os.makedirs(location) + + try: + fileLocation = f"{location}/{filename}.pdf" + isFile = os.path.isfile(fileLocation) + + if isFile and not overwrite: + self.logger.debug("Not overwriting file") + return True + + if isFile and incremental and not self.__is_newer(document, fileLocation): + self.logger.debug("Local file already exists and is newer, skipping") + return True + + binaryData = self.__POST( + f"download/{document[self.ID_ATTRIBUTE]}/placeholder" + ) + + if isinstance(binaryData, dict): + print(f"Error trying to download {filename}: {binaryData}") + return False + + with open(fileLocation, "wb") as outFile: + outFile.write(binaryData) + + return True + + except Exception as error: + print(f"Error trying to download {filename}: {error}") + return False + + def __is_newer(self, document, fileLocation): + remote_ts = document[self.MTIME_ATTRIBUTE] + + local_mtime = os.path.getmtime(fileLocation) + local_ts = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(local_mtime)) + + return remote_ts > local_ts + + def upload(self, input_paths, remoteFolder): + folderId = "" + if remoteFolder: + folderId = self.__get_folder_id(remoteFolder) + + if folderId is None: + raise SystemError(f"Error: Folder {remoteFolder} does not exist!") + + self.__POST(f"documents/{folderId}") # Setting up for upload... + + errors, documents = [], [] + + for document in input_paths: # This needs improvement... + if os.path.isdir(document): + for file in glob.glob(f"{document}/*"): + if not file.endswith(".pdf"): + self.logger.error(f"Error: {document} is not a pdf!") + else: + documents.append(file) + elif os.path.isfile(document): + if not document.endswith(".pdf"): + errors.append(document) + self.logger.error(f"Error: {document} is not a pdf!") + else: + documents.append(document) + else: + errors.append(document) + self.logger.error(f"Error: {document} is not a file or directory!") + + for document in documents: + self.logger.debug( + f"Uploading {document} to {remoteFolder if remoteFolder else 'root'}" + ) + with open(document, "rb") as inFile: + response = self.__POST("upload", data={"file": inFile}, fileUpload=True) + + if response is None: + self.logger.error( + f"Error: Unknown error while uploading {document}!" + ) + errors.append(document) + elif response == {"status": "Upload successful"}: + self.logger.debug(f"Uploaded {document} successfully!") + + if len(errors) > 0: + print("The following files failed to upload: " + ",".join(errors)) + + print(f"Done! {len(documents) - len(errors)} files were uploaded.") + + def sync( + self, + localFolder, + remoteFolder="", + overwrite=False, + incremental=False, + recursive=True, + ): + count = 0 + + if not os.path.exists(localFolder): + self.logger.debug("Local folder does not exist, creating it") + os.mkdir(localFolder) + + documents = self.__get_docs(remoteFolder, recursive) + + if documents == {}: + print("No documents were found!") + + else: + for doc in documents: + self.logger.debug(f"Processing {doc}") + count += 1 + self.download( + document=doc, + location=f"{localFolder}/{doc['location']}", + overwrite=overwrite, + incremental=incremental, + ) + print(f"Done! {count} files were exported.") diff --git a/codexctl/updates.py b/codexctl/updates.py index 80debc7..21500c3 100644 --- a/codexctl/updates.py +++ b/codexctl/updates.py @@ -1,436 +1,459 @@ -import os -import requests -import uuid -import sys -import json -import hashlib -import logging - -from pathlib import Path -from datetime import datetime - -import xml.etree.ElementTree as ET - -from .device import HardwareType - - -class UpdateManager: - def __init__(self, logger=None) -> None: - """Manager for downloading update versions - - Args: - logger (logger, optional): Logger object for logging. Defaults to None. - """ - - self.logger = logger - - if self.logger is None: - self.logger = logging - - ( - self.remarkablepp_versions, - self.remarkableppm_versions, - self.remarkable2_versions, - self.remarkable1_versions, - self.external_provider_urls, - ) = self.get_remarkable_versions() - - def get_remarkable_versions(self) -> tuple[dict, dict, dict, dict, list]: - """Gets the avaliable versions for the device, by checking the local version-ids.json file and then updating it if necessary - - Returns: - tuple: A tuple containing the version ids for the remarkablepp, remarkableppm, remarkable2, remarkable1, and external provider urls (in that order) - """ - - if os.path.exists("data/version-ids.json"): - file_location = "data/version-ids.json" - - self.logger.debug("Found version-ids at data/version-ids.json") - - else: - if os.name == "nt": # Windows - folder_location = os.getenv("APPDATA") + "/codexctl" - elif os.name in ("posix", "darwin"): # Linux or MacOS - folder_location = os.path.expanduser("~/.config/codexctl") - else: - raise SystemError("Unsupported OS") - - self.logger.debug(f"Version config folder location is {folder_location}") - if not os.path.exists(folder_location): - os.makedirs(folder_location, exist_ok=True) - - file_location = folder_location + "/version-ids.json" - - if not os.path.exists(file_location): - self.update_version_ids(file_location) - - try: - with open(file_location) as f: - contents = json.load(f) - except ValueError: - raise SystemError( - f"Version-ids.json @ {file_location} is corrupted! Please delete it and try again. Also, PLEASE open an issue on the repo showing the contents of the file." - ) - - if ( - int(datetime.now().timestamp()) - contents["last-updated"] - > 5256000 # 2 months - ): - self.update_version_ids(file_location) - - with open(file_location) as f: - contents = json.load(f) - - self.logger.debug(f"Version ids contents are {contents}") - - provider_urls = contents.get("external-provider-urls", contents.get("external-provider-url")) - if provider_urls is None: - raise SystemError( - f"version-ids.json at {file_location} is missing external provider URLs. " - "Please delete the file and try again, or open an issue on the repo." - ) - - if isinstance(provider_urls, str): - provider_urls = [provider_urls] - - return ( - contents.get("remarkablepp", {}), - contents.get("remarkableppm", {}), - contents.get("remarkable2", {}), - contents.get("remarkable1", {}), - provider_urls, - ) - - def update_version_ids(self, location: str) -> None: - """Updates the version-ids.json file - - Args: - location (str): Location to save the file - - Raises: - SystemExit: If the file cannot be updated - """ - with open(location, "w", newline="\n") as f: - try: - self.logger.debug("Downloading version-ids.json") - contents = requests.get( - "https://raw.githubusercontent.com/Jayy001/codexctl/main/data/version-ids.json" - ).json() - json.dump(contents, f, indent=4) - f.write("\n") - except requests.exceptions.Timeout: - raise SystemExit( - "Connection timed out while downloading version-ids.json! Do you have an internet connection?" - ) - except Exception as error: - raise SystemExit( - f"Unknown error while downloading version-ids.json! {error}" - ) - - def get_latest_version(self, hardware_type: HardwareType) -> str: - """Gets the latest version available for the device - - Args: - hardware_type (HardwareType enum): Type of the device - - Returns: - str: Latest version available for the device - """ - match hardware_type: - case HardwareType.RM1: - versions = self.remarkable1_versions - case HardwareType.RM2: - versions = self.remarkable2_versions - case HardwareType.RMPP: - versions = self.remarkablepp_versions - case HardwareType.RMPPM: - versions = self.remarkableppm_versions - - return self.__max_version(versions.keys()) - - def get_toltec_version(self, hardware_type: HardwareType) -> str: - """Gets the latest version available toltec for the device - - Args: - hardware_type (HardwareType enum): Type of the device - - Returns: - str: Latest version available for the device - """ - - try: - toltec_type = hardware_type.toltec_type - except ValueError as ex: - raise SystemExit(*ex.args) - - response = requests.get("https://toltec-dev.org/stable/Compatibility") - if response.status_code != 200: - raise SystemExit( - f"Error: Failed to get toltec compatibility table: {response.status_code}" - ) - - return self.__max_version( - [ - x.split("=")[1] - for x in response.text.splitlines() - if x.startswith(f"{toltec_type}=") - ] - ) - - def download_version( - self, hardware_type: HardwareType, update_version: str, download_folder: str|None = None - ) -> str | None: - """Downloads the specified version of the update - - Args: - hardware_type (HardwareType enum): Type of the device - update_version (str): Id of version to download. - download_folder (str, optional): Location of download folder. Defaults to download folder for OS. - - Returns: - str | None: Location of the file if the download was successful, None otherwise - """ - - if download_folder is None: - download_folder = str(Path( - os.environ["XDG_DOWNLOAD_DIR"] - if ( - "XDG_DOWNLOAD_DIR" in os.environ - and os.path.exists(os.environ["XDG_DOWNLOAD_DIR"]) - ) - else Path.home() / "Downloads" - )) - - if not os.path.exists(download_folder): - self.logger.error( - f"Download folder {download_folder} does not exist! Creating it now." - ) - os.makedirs(download_folder) - - BASE_URL = "https://updates-download.cloud.remarkable.engineering/build/reMarkable%20Device%20Beta/RM110" # Default URL for v2 versions - BASE_URL_V3 = "https://updates-download.cloud.remarkable.engineering/build/reMarkable%20Device/reMarkable" - - match hardware_type: - case HardwareType.RMPP: - version_lookup = self.remarkablepp_versions - - case HardwareType.RMPPM: - version_lookup = self.remarkableppm_versions - - case HardwareType.RM2: - version_lookup = self.remarkable2_versions - BASE_URL_V3 += "2" - - case HardwareType.RM1: - version_lookup = self.remarkable1_versions - - if update_version not in version_lookup: - self.logger.error( - f"Version {update_version} not found in version-ids.json! Please update your version-ids.json file." - ) - return - - version_id, version_checksum = version_lookup[update_version] - version = tuple([int(x) for x in update_version.split(".")]) - if version >= (3,): - BASE_URL = BASE_URL_V3 - - if version <= (3, 11, 2, 5): - file_name = f"{update_version}_{hardware_type.old_download_hw}-{version_id}.signed" - file_url = f"{BASE_URL}/{update_version}/{file_name}" - self.logger.debug(f"File URL is {file_url}, File name is {file_name}") - return self.__download_version_file( - file_url, file_name, download_folder, version_checksum - ) - - else: - file_name = f"remarkable-production-memfault-image-{update_version}-{hardware_type.new_download_hw}-public" - - for provider_url in self.external_provider_urls: - file_url = provider_url.replace("REPLACE_ID", version_id) - self.logger.debug(f"Trying to download from {file_url}") - - result = self.__download_version_file( - file_url, file_name, download_folder, version_checksum - ) - - if result is not None: - self.logger.debug(f"Successfully downloaded from {provider_url}") - return result - - self.logger.debug(f"Failed to download from {provider_url}, trying next source...") - - self.logger.error(f"Failed to download {file_name} from all sources") - return None - - def __generate_xml_data(self) -> str: - """Generates and returns XML data for the update request""" - params = { - "installsource": "scheduler", - "requestid": str(uuid.uuid4()), - "sessionid": str(uuid.uuid4()), - "machineid": "00".zfill(32), - "oem": "RM100-753-12345", - "appid": "98DA7DF2-4E3E-4744-9DE6-EC931886ABAB", - "bootid": str(uuid.uuid4()), - "current": "3.2.3.1595", - "group": "Prod", - "platform": "reMarkable2", - } - - return """ - - - - - -""".format(**params) - - def __parse_response(self, resp: str) -> tuple[str, str, str] | None: - """Parses the response from the update server and returns the file name, uri, and version if an update is available - - Args: - resp (str): Response from the server - - Returns: - tuple[str, str, str] | None: File name, uri, and version if an update is available, None otherwise - """ - xml_data = ET.fromstring(resp) - - if "noupdate" in resp or xml_data is None: - return None - - file_name = xml_data.find("app/updatecheck/manifest/packages/package").attrib[ - "name" - ] - file_uri = ( - f"{xml_data.find('app/updatecheck/urls/url').attrib['codebase']}{file_name}" - ) - file_version = xml_data.find("app/updatecheck/manifest").attrib["version"] - - self.logger.debug( - f"File version is {file_version}, file uri is {file_uri}, file name is {file_name}" - ) - return file_version, file_uri, file_name - - def __download_version_file( - self, uri: str, name: str, download_folder: str, checksum: str - ) -> str | None: - """Downloads the version file from the server and checks the checksum - - Args: - uri (str): Location to the file - name (str): Name of the file - download_folder (str): Location of download folder - checksum (str): Sha256 Checksum of the file - - Returns: - str | None: Location of the file if the checksum matches, None otherwise - """ - response = requests.get(uri, stream=True) - if response.status_code != 200: - self.logger.debug(f"Unable to download update file: {response.status_code}") - return None - - file_length = response.headers.get("content-length") - - self.logger.debug(f"Downloading {name} from {uri} to {download_folder}") - try: - file_length = int(file_length) - - if int(file_length) < 10000000: # 10MB, invalid version file - self.logger.error( - f"File {name} is too small to be a valid version file" - ) - return None - except TypeError: - self.logger.error( - f"Could not get content length for {name}. Do you have an internet connection?" - ) - return None - - self.logger.debug(f"{name} is {file_length} bytes") - - filename = f"{download_folder}/{name}" - with open(filename, "wb") as out_file: - dl = 0 - - for data in response.iter_content(chunk_size=4096): - dl += len(data) - out_file.write(data) - if sys.stdout.isatty(): - done = int(50 * dl / file_length) - sys.stdout.write("\r[%s%s]" % ("=" * done, " " * (50 - done))) - sys.stdout.flush() - - if sys.stdout.isatty(): - print(end="\r\n") - - self.logger.debug(f"Downloaded {name}") - - with open(filename, "rb") as f: - file_checksum = hashlib.sha256(f.read()).hexdigest() - - if file_checksum != checksum: - os.remove(filename) - self.logger.error( - f"File checksum mismatch! Expected {checksum}, got {file_checksum}" - ) - return None - - return filename - - @staticmethod - def __max_version(versions: list) -> str: - """Returns the highest avaliable version from a list with semantic versioning""" - return sorted(versions, key=lambda v: tuple(map(int, v.split("."))))[-1] - - @staticmethod - def uses_new_update_engine(version: str) -> bool: - """ - Checks if the version given is above 3.11 and so requires the newer update engine - - Args: - version (str): version to check against - - Returns: - bool: If it uses the new update engine or not - """ - return tuple([int(x) for x in version.split(".")]) >= (3, 11, 2, 5) - - @staticmethod - def is_bootloader_boundary_downgrade(current_version: str, target_version: str) -> bool: - """ - Checks if downgrade crosses the 3.22 bootloader boundary (3.22+ -> <3.22). - - Paper Pro devices require bootloader updates when downgrading from - version 3.22 or higher to any version below 3.22. - - Args: - current_version (str): Currently installed version - target_version (str): Target version to install - - Returns: - bool: True if crossing boundary downward (3.22+ -> <3.22) - - Raises: - ValueError: If either version is empty or has invalid format - """ - if not current_version or not current_version.strip(): - raise ValueError("current_version cannot be empty") - if not target_version or not target_version.strip(): - raise ValueError("target_version cannot be empty") - - try: - current_parts = [int(x) for x in current_version.split('.')] - target_parts = [int(x) for x in target_version.split('.')] - except ValueError as e: - raise ValueError(f"Invalid version format: {e}") from e - - if len(current_parts) < 2 or len(target_parts) < 2: - raise ValueError("Version must have at least 2 components (e.g., '3.22')") - - current_is_322_or_higher = current_parts >= [3, 22] - target_is_below_322 = target_parts < [3, 22] - - return current_is_322_or_higher and target_is_below_322 +import hashlib +import json +import logging +import os +import sys +import uuid +import xml.etree.ElementTree as ET +from datetime import datetime +from pathlib import Path + +import requests + +from .device import HardwareType + + +class UpdateManager: + def __init__(self, logger=None) -> None: + """Manager for downloading update versions + + Args: + logger (logger, optional): Logger object for logging. Defaults to None. + """ + + self.logger = logger + + if self.logger is None: + self.logger = logging + + ( + self.remarkableppure_versions, + self.remarkablepp_versions, + self.remarkableppm_versions, + self.remarkable2_versions, + self.remarkable1_versions, + self.external_provider_urls, + ) = self.get_remarkable_versions() + + def get_remarkable_versions(self) -> tuple[dict, dict, dict, dict, list]: + """Gets the avaliable versions for the device, by checking the local version-ids.json file and then updating it if necessary + + Returns: + tuple: A tuple containing the version ids for the remarkablepp, remarkableppm, remarkable2, remarkable1, and external provider urls (in that order) + """ + + if os.path.exists("data/version-ids.json"): + file_location = "data/version-ids.json" + + self.logger.debug("Found version-ids at data/version-ids.json") + + else: + if os.name == "nt": # Windows + folder_location = os.getenv("APPDATA") + "/codexctl" + elif os.name in ("posix", "darwin"): # Linux or MacOS + folder_location = os.path.expanduser("~/.config/codexctl") + else: + raise SystemError("Unsupported OS") + + self.logger.debug(f"Version config folder location is {folder_location}") + if not os.path.exists(folder_location): + os.makedirs(folder_location, exist_ok=True) + + file_location = folder_location + "/version-ids.json" + + if not os.path.exists(file_location): + self.update_version_ids(file_location) + + try: + with open(file_location) as f: + contents = json.load(f) + except ValueError: + raise SystemError( + f"Version-ids.json @ {file_location} is corrupted! Please delete it and try again. Also, PLEASE open an issue on the repo showing the contents of the file." + ) + + if ( + int(datetime.now().timestamp()) - contents["last-updated"] + > 5256000 # 2 months + ): + self.update_version_ids(file_location) + + with open(file_location) as f: + contents = json.load(f) + + self.logger.debug(f"Version ids contents are {contents}") + + provider_urls = contents.get( + "external-provider-urls", contents.get("external-provider-url") + ) + if provider_urls is None: + raise SystemError( + f"version-ids.json at {file_location} is missing external provider URLs. " + "Please delete the file and try again, or open an issue on the repo." + ) + + if isinstance(provider_urls, str): + provider_urls = [provider_urls] + + return ( + contents.get("remarkableppure", {}), + contents.get("remarkablepp", {}), + contents.get("remarkableppm", {}), + contents.get("remarkable2", {}), + contents.get("remarkable1", {}), + provider_urls, + ) + + def update_version_ids(self, location: str) -> None: + """Updates the version-ids.json file + + Args: + location (str): Location to save the file + + Raises: + SystemExit: If the file cannot be updated + """ + with open(location, "w", newline="\n") as f: + try: + self.logger.debug("Downloading version-ids.json") + contents = requests.get( + "https://raw.githubusercontent.com/Jayy001/codexctl/main/data/version-ids.json" + ).json() + json.dump(contents, f, indent=4) + f.write("\n") + except requests.exceptions.Timeout: + raise SystemExit( + "Connection timed out while downloading version-ids.json! Do you have an internet connection?" + ) + except Exception as error: + raise SystemExit( + f"Unknown error while downloading version-ids.json! {error}" + ) + + def get_latest_version(self, hardware_type: HardwareType) -> str: + """Gets the latest version available for the device + + Args: + hardware_type (HardwareType enum): Type of the device + + Returns: + str: Latest version available for the device + """ + match hardware_type: + case HardwareType.RM1: + versions = self.remarkable1_versions + + case HardwareType.RM2: + versions = self.remarkable2_versions + + case HardwareType.RMPP: + versions = self.remarkablepp_versions + + case HardwareType.RMPPM: + versions = self.remarkableppm_versions + + case HardwareType.RMPPURE: + versions = self.remarkableppure_versions + + return self.__max_version(versions.keys()) + + def get_toltec_version(self, hardware_type: HardwareType) -> str: + """Gets the latest version available toltec for the device + + Args: + hardware_type (HardwareType enum): Type of the device + + Returns: + str: Latest version available for the device + """ + + try: + toltec_type = hardware_type.toltec_type + except ValueError as ex: + raise SystemExit(*ex.args) + + response = requests.get("https://toltec-dev.org/stable/Compatibility") + if response.status_code != 200: + raise SystemExit( + f"Error: Failed to get toltec compatibility table: {response.status_code}" + ) + + return self.__max_version( + [ + x.split("=")[1] + for x in response.text.splitlines() + if x.startswith(f"{toltec_type}=") + ] + ) + + def download_version( + self, + hardware_type: HardwareType, + update_version: str, + download_folder: str | None = None, + ) -> str | None: + """Downloads the specified version of the update + + Args: + hardware_type (HardwareType enum): Type of the device + update_version (str): Id of version to download. + download_folder (str, optional): Location of download folder. Defaults to download folder for OS. + + Returns: + str | None: Location of the file if the download was successful, None otherwise + """ + + if download_folder is None: + download_folder = str( + Path( + os.environ["XDG_DOWNLOAD_DIR"] + if ( + "XDG_DOWNLOAD_DIR" in os.environ + and os.path.exists(os.environ["XDG_DOWNLOAD_DIR"]) + ) + else Path.home() / "Downloads" + ) + ) + + if not os.path.exists(download_folder): + self.logger.error( + f"Download folder {download_folder} does not exist! Creating it now." + ) + os.makedirs(download_folder) + + BASE_URL = "https://updates-download.cloud.remarkable.engineering/build/reMarkable%20Device%20Beta/RM110" # Default URL for v2 versions + BASE_URL_V3 = "https://updates-download.cloud.remarkable.engineering/build/reMarkable%20Device/reMarkable" + + match hardware_type: + case HardwareType.RMPPURE: + version_lookup = self.remarkableppure_versions + + case HardwareType.RMPP: + version_lookup = self.remarkablepp_versions + + case HardwareType.RMPPM: + version_lookup = self.remarkableppm_versions + + case HardwareType.RM2: + version_lookup = self.remarkable2_versions + BASE_URL_V3 += "2" + + case HardwareType.RM1: + version_lookup = self.remarkable1_versions + + if update_version not in version_lookup: + self.logger.error( + f"Version {update_version} not found in version-ids.json! Please update your version-ids.json file." + ) + return + + version_id, version_checksum = version_lookup[update_version] + version = tuple([int(x) for x in update_version.split(".")]) + if version >= (3,): + BASE_URL = BASE_URL_V3 + + if version <= (3, 11, 2, 5): + file_name = ( + f"{update_version}_{hardware_type.old_download_hw}-{version_id}.signed" + ) + file_url = f"{BASE_URL}/{update_version}/{file_name}" + self.logger.debug(f"File URL is {file_url}, File name is {file_name}") + return self.__download_version_file( + file_url, file_name, download_folder, version_checksum + ) + + else: + file_name = f"remarkable-production-memfault-image-{update_version}-{hardware_type.new_download_hw}-public" + + for provider_url in self.external_provider_urls: + file_url = provider_url.replace("REPLACE_ID", version_id) + self.logger.debug(f"Trying to download from {file_url}") + + result = self.__download_version_file( + file_url, file_name, download_folder, version_checksum + ) + + if result is not None: + self.logger.debug(f"Successfully downloaded from {provider_url}") + return result + + self.logger.debug( + f"Failed to download from {provider_url}, trying next source..." + ) + + self.logger.error(f"Failed to download {file_name} from all sources") + return None + + def __generate_xml_data(self) -> str: + """Generates and returns XML data for the update request""" + params = { + "installsource": "scheduler", + "requestid": str(uuid.uuid4()), + "sessionid": str(uuid.uuid4()), + "machineid": "00".zfill(32), + "oem": "RM100-753-12345", + "appid": "98DA7DF2-4E3E-4744-9DE6-EC931886ABAB", + "bootid": str(uuid.uuid4()), + "current": "3.2.3.1595", + "group": "Prod", + "platform": "reMarkable2", + } + + return """ + + + + + +""".format(**params) + + def __parse_response(self, resp: str) -> tuple[str, str, str] | None: + """Parses the response from the update server and returns the file name, uri, and version if an update is available + + Args: + resp (str): Response from the server + + Returns: + tuple[str, str, str] | None: File name, uri, and version if an update is available, None otherwise + """ + xml_data = ET.fromstring(resp) + + if "noupdate" in resp or xml_data is None: + return None + + file_name = xml_data.find("app/updatecheck/manifest/packages/package").attrib[ + "name" + ] + file_uri = ( + f"{xml_data.find('app/updatecheck/urls/url').attrib['codebase']}{file_name}" + ) + file_version = xml_data.find("app/updatecheck/manifest").attrib["version"] + + self.logger.debug( + f"File version is {file_version}, file uri is {file_uri}, file name is {file_name}" + ) + return file_version, file_uri, file_name + + def __download_version_file( + self, uri: str, name: str, download_folder: str, checksum: str + ) -> str | None: + """Downloads the version file from the server and checks the checksum + + Args: + uri (str): Location to the file + name (str): Name of the file + download_folder (str): Location of download folder + checksum (str): Sha256 Checksum of the file + + Returns: + str | None: Location of the file if the checksum matches, None otherwise + """ + response = requests.get(uri, stream=True) + if response.status_code != 200: + self.logger.debug(f"Unable to download update file: {response.status_code}") + return None + + file_length = response.headers.get("content-length") + + self.logger.debug(f"Downloading {name} from {uri} to {download_folder}") + try: + file_length = int(file_length) + + if int(file_length) < 10000000: # 10MB, invalid version file + self.logger.error( + f"File {name} is too small to be a valid version file" + ) + return None + except TypeError: + self.logger.error( + f"Could not get content length for {name}. Do you have an internet connection?" + ) + return None + + self.logger.debug(f"{name} is {file_length} bytes") + + filename = f"{download_folder}/{name}" + with open(filename, "wb") as out_file: + dl = 0 + + for data in response.iter_content(chunk_size=4096): + dl += len(data) + out_file.write(data) + if sys.stdout.isatty(): + done = int(50 * dl / file_length) + sys.stdout.write("\r[%s%s]" % ("=" * done, " " * (50 - done))) + sys.stdout.flush() + + if sys.stdout.isatty(): + print(end="\r\n") + + self.logger.debug(f"Downloaded {name}") + + with open(filename, "rb") as f: + file_checksum = hashlib.sha256(f.read()).hexdigest() + + if file_checksum != checksum: + os.remove(filename) + self.logger.error( + f"File checksum mismatch! Expected {checksum}, got {file_checksum}" + ) + return None + + return filename + + @staticmethod + def __max_version(versions: list) -> str: + """Returns the highest avaliable version from a list with semantic versioning""" + return sorted(versions, key=lambda v: tuple(map(int, v.split("."))))[-1] + + @staticmethod + def uses_new_update_engine(version: str) -> bool: + """ + Checks if the version given is above 3.11 and so requires the newer update engine + + Args: + version (str): version to check against + + Returns: + bool: If it uses the new update engine or not + """ + return tuple([int(x) for x in version.split(".")]) >= (3, 11, 2, 5) + + @staticmethod + def is_bootloader_boundary_downgrade( + current_version: str, target_version: str + ) -> bool: + """ + Checks if downgrade crosses the 3.22 bootloader boundary (3.22+ -> <3.22). + + Paper Pro devices require bootloader updates when downgrading from + version 3.22 or higher to any version below 3.22. + + Args: + current_version (str): Currently installed version + target_version (str): Target version to install + + Returns: + bool: True if crossing boundary downward (3.22+ -> <3.22) + + Raises: + ValueError: If either version is empty or has invalid format + """ + if not current_version or not current_version.strip(): + raise ValueError("current_version cannot be empty") + if not target_version or not target_version.strip(): + raise ValueError("target_version cannot be empty") + + try: + current_parts = [int(x) for x in current_version.split(".")] + target_parts = [int(x) for x in target_version.split(".")] + except ValueError as e: + raise ValueError(f"Invalid version format: {e}") from e + + if len(current_parts) < 2 or len(target_parts) < 2: + raise ValueError("Version must have at least 2 components (e.g., '3.22')") + + current_is_322_or_higher = current_parts >= [3, 22] + target_is_below_322 = target_parts < [3, 22] + + return current_is_322_or_higher and target_is_below_322 diff --git a/data/version-ids.json b/data/version-ids.json index c969ea5..a58d52d 100644 --- a/data/version-ids.json +++ b/data/version-ids.json @@ -555,6 +555,12 @@ "b494b36e6c1749bbe205ef26f2cea31728f6fe82aa5abf5057e37a09ec326ffa" ] }, + "remarkableppure": { + "3.27.1.0": [ + "remarkable-production-image-3.27.1.0-tatsu-public.swu", + "7008c8376e49a65b766bbd672f83454d55290af73b099bba396f3d9b2276ce3f" + ] + }, "last-updated": 1778173895, "external-provider-urls": [ "https://storage.googleapis.com/remarkable-versions/REPLACE_ID", diff --git a/pyproject.toml b/pyproject.toml index 30651b1..a4022a2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,8 +12,8 @@ paramiko = "3.4.1" psutil = "6.0.0" requests = "2.32.4" loguru = "0.7.3" -remarkable-update-image = { version = "1.3", markers = "sys_platform != 'linux'" } -remarkable-update-fuse = { version = "1.3", markers = "sys_platform == 'linux'" } +remarkable-update-image = { version = "1.4.1", markers = "sys_platform != 'linux'" } +remarkable-update-fuse = { version = "1.4.2", markers = "sys_platform == 'linux'" } [build-system] requires = ["poetry-core"] diff --git a/requirements.txt b/requirements.txt index 3b66c54..efabadb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ requests==2.32.5 loguru==0.7.3 -remarkable-update-image==1.4; sys_platform != 'linux' -remarkable-update-fuse==1.4.1; sys_platform == 'linux' +remarkable-update-image==1.4.1; sys_platform != 'linux' +remarkable-update-fuse==1.4.2; sys_platform == 'linux' diff --git a/tests/test.py b/tests/test.py index 3a5d0ad..8170884 100644 --- a/tests/test.py +++ b/tests/test.py @@ -1,252 +1,252 @@ -import os -import sys -import difflib -import contextlib -import logging -from unittest.mock import NonCallableMock - -sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) - -from codexctl.device import HardwareType, DeviceManager -from codexctl.updates import UpdateManager -from codexctl import Manager - -# Mock device manager object, only the `logger` field is accessed by `set_server_config` -device_manager = NonCallableMock(["logger"]) - -set_server_config = DeviceManager.set_server_config -codexctl = Manager(device="reMarkable2", logger=logging.getLogger(__name__)) -updater = UpdateManager(logger=logging.getLogger(__name__)) - -from io import StringIO -from io import BytesIO - -FAILED = False -UPDATE_FILE_PATH = ".venv/2.15.1.1189_reMarkable2-wVbHkgKisg-.signed" - -assert os.path.exists(UPDATE_FILE_PATH), "Update image missing" - -class BufferWriter: - def __init__(self, buffer): - self._buffer = buffer - - def write(self, data): - self._buffer.write(data) - - -class BufferBytesIO(BytesIO): - @property - def buffer(self): - return BufferWriter(self) - - -def assert_value(msg, value, expected): - global FAILED - print(f"Testing {msg}: ", end="") - if value == expected: - print("pass") - return - - FAILED = True - print("fail") - print(f" {value} != {expected}") - - -def assert_gt(msg, value, expected): - global FAILED - print(f"Testing {msg}: ", end="") - if value >= expected: - print("pass") - return - - FAILED = True - print("fail") - print(f" {value} != {expected}") - -@contextlib.contextmanager -def assert_raises(msg, expected): - global FAILED - print(f"Testing {msg}: ", end="") - try: - yield - got = "no exception" - except expected: - print("pass") - return - except Exception as e: - got = e.__class__.__name__ - - FAILED = True - print("fail") - print(f" {got} != {expected.__name__}") - -def test_set_server_config(original, expected): - global FAILED - print("Testing set_server_config: ", end="") - result = set_server_config(device_manager, original, "test") - if result == expected: - print("pass") - return - - FAILED = True - print("fail") - for diff in difflib.ndiff( - expected.splitlines(keepends=True), result.splitlines(keepends=True) - ): - print(f" {diff}") - - -def test_ls(path, expected): - global FAILED - global UPDATE_FILE_PATH - print(f"Testing ls {path}: ", end="") - with contextlib.redirect_stdout(StringIO()) as f: - try: - codexctl.call_func("ls", {'file': UPDATE_FILE_PATH, 'target_path': path}) - - except SystemExit: - pass - - result = f.getvalue() - if result == expected: - print("pass") - return - - FAILED = True - print("fail") - for diff in difflib.ndiff( - expected.splitlines(keepends=True), result.splitlines(keepends=True) - ): - print(f" {diff}") - - -def test_cat(path, expected): - global FAILED - global UPDATE_FILE_PATH - print(f"Testing cat {path}: ", end="") - with contextlib.redirect_stdout(BufferBytesIO()) as f: - try: - codexctl.call_func("cat", {'file': UPDATE_FILE_PATH, 'target_path': path}) - except SystemExit: - pass - - result = f.getvalue() - if result == expected: - print("pass") - return - - FAILED = True - print("fail") - for diff in difflib.ndiff( - expected.decode("utf-8").splitlines(keepends=True), - result.decode("utf-8").splitlines(keepends=True), - ): - print(f" {diff}") - - -test_set_server_config( - "", - "SERVER=test\n", -) - -test_set_server_config( - """[General] -#REMARKABLE_RELEASE_APPID={98DA7DF2-4E3E-4744-9DE6-EC931886ABAB} -#SERVER=https://updates.cloud.remarkable.engineering/service/update2 -#GROUP=Prod -#PLATFORM=reMarkable2 -REMARKABLE_RELEASE_VERSION=3.9.5.2026 -""", - """[General] -SERVER=test -#REMARKABLE_RELEASE_APPID={98DA7DF2-4E3E-4744-9DE6-EC931886ABAB} -#SERVER=https://updates.cloud.remarkable.engineering/service/update2 -#GROUP=Prod -#PLATFORM=reMarkable2 -REMARKABLE_RELEASE_VERSION=3.9.5.2026 -""", -) - -test_set_server_config( - """[General] -SERVER=testing -#REMARKABLE_RELEASE_APPID={98DA7DF2-4E3E-4744-9DE6-EC931886ABAB} -#SERVER=https://updates.cloud.remarkable.engineering/service/update2 -#GROUP=Prod -#PLATFORM=reMarkable2 -REMARKABLE_RELEASE_VERSION=3.9.5.2026 -""", - """[General] -SERVER=test -#SERVER=testing -#REMARKABLE_RELEASE_APPID={98DA7DF2-4E3E-4744-9DE6-EC931886ABAB} -#SERVER=https://updates.cloud.remarkable.engineering/service/update2 -#GROUP=Prod -#PLATFORM=reMarkable2 -REMARKABLE_RELEASE_VERSION=3.9.5.2026 -""", -) - -test_ls( - "/", - ". .. lost+found bin boot dev etc home lib media mnt postinst proc run sbin sys tmp uboot-postinst uboot-version usr var\n", -) -test_ls( - "/mnt", - ". ..\n", -) - -test_cat("/etc/version", b"20221026104022\n") - -assert_gt( - "toltec rm1 version", - updater.get_toltec_version(HardwareType.RM1), - "3.3.2.1666" -) -assert_gt( - "toltec rm2 version", - updater.get_toltec_version(HardwareType.RM2), - "3.3.2.1666" -) -with assert_raises("toltec rmpp version", SystemExit): - updater.get_toltec_version(HardwareType.RMPP) -with assert_raises("toltec rmppm version", SystemExit): - updater.get_toltec_version(HardwareType.RMPPM) - -assert_value( - "boundary cross 3.23->3.20", - UpdateManager.is_bootloader_boundary_downgrade("3.23.0.64", "3.20.0.92"), - True -) -assert_value( - "boundary cross 3.22->3.20", - UpdateManager.is_bootloader_boundary_downgrade("3.22.0.64", "3.20.0.92"), - True -) -assert_value( - "no boundary 3.23->3.22", - UpdateManager.is_bootloader_boundary_downgrade("3.23.0.64", "3.22.0.64"), - False -) -assert_value( - "no boundary 3.20->3.19", - UpdateManager.is_bootloader_boundary_downgrade("3.20.0.92", "3.19.0.82"), - False -) -assert_value( - "upgrade 3.20->3.22", - UpdateManager.is_bootloader_boundary_downgrade("3.20.0.92", "3.22.0.64"), - False -) -assert_value( - "same version 3.22->3.22", - UpdateManager.is_bootloader_boundary_downgrade("3.22.0.64", "3.22.0.64"), - False -) -with assert_raises("empty string current", ValueError): - UpdateManager.is_bootloader_boundary_downgrade("", "3.20.0.92") -with assert_raises("non-numeric version", ValueError): - UpdateManager.is_bootloader_boundary_downgrade("abc.def", "3.20.0.92") - -if FAILED: - sys.exit(1) +import contextlib +import difflib +import logging +import os +import sys +from unittest.mock import NonCallableMock + +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) + +from codexctl import Manager +from codexctl.device import DeviceManager, HardwareType +from codexctl.updates import UpdateManager + +# Mock device manager object, only the `logger` field is accessed by `set_server_config` +device_manager = NonCallableMock(["logger"]) + +set_server_config = DeviceManager.set_server_config +codexctl = Manager(device="reMarkable2", logger=logging.getLogger(__name__)) +updater = UpdateManager(logger=logging.getLogger(__name__)) + +from io import BytesIO, StringIO + +FAILED = False +UPDATE_FILE_PATH = ".venv/2.15.1.1189_reMarkable2-wVbHkgKisg-.signed" + +assert os.path.exists(UPDATE_FILE_PATH), "Update image missing" + + +class BufferWriter: + def __init__(self, buffer): + self._buffer = buffer + + def write(self, data): + self._buffer.write(data) + + +class BufferBytesIO(BytesIO): + @property + def buffer(self): + return BufferWriter(self) + + +def assert_value(msg, value, expected): + global FAILED + print(f"Testing {msg}: ", end="") + if value == expected: + print("pass") + return + + FAILED = True + print("fail") + print(f" {value} != {expected}") + + +def assert_gt(msg, value, expected): + global FAILED + print(f"Testing {msg}: ", end="") + if value >= expected: + print("pass") + return + + FAILED = True + print("fail") + print(f" {value} != {expected}") + + +@contextlib.contextmanager +def assert_raises(msg, expected): + global FAILED + print(f"Testing {msg}: ", end="") + try: + yield + got = "no exception" + except expected: + print("pass") + return + except Exception as e: + got = e.__class__.__name__ + + FAILED = True + print("fail") + print(f" {got} != {expected.__name__}") + + +def test_set_server_config(original, expected): + global FAILED + print("Testing set_server_config: ", end="") + result = set_server_config(device_manager, original, "test") + if result == expected: + print("pass") + return + + FAILED = True + print("fail") + for diff in difflib.ndiff( + expected.splitlines(keepends=True), result.splitlines(keepends=True) + ): + print(f" {diff}") + + +def test_ls(path, expected): + global FAILED + global UPDATE_FILE_PATH + print(f"Testing ls {path}: ", end="") + with contextlib.redirect_stdout(StringIO()) as f: + try: + codexctl.call_func("ls", {"file": UPDATE_FILE_PATH, "target_path": path}) + + except SystemExit: + pass + + result = f.getvalue() + if result == expected: + print("pass") + return + + FAILED = True + print("fail") + for diff in difflib.ndiff( + expected.splitlines(keepends=True), result.splitlines(keepends=True) + ): + print(f" {diff}") + + +def test_cat(path, expected): + global FAILED + global UPDATE_FILE_PATH + print(f"Testing cat {path}: ", end="") + with contextlib.redirect_stdout(BufferBytesIO()) as f: + try: + codexctl.call_func("cat", {"file": UPDATE_FILE_PATH, "target_path": path}) + except SystemExit: + pass + + result = f.getvalue() + if result == expected: + print("pass") + return + + FAILED = True + print("fail") + for diff in difflib.ndiff( + expected.decode("utf-8").splitlines(keepends=True), + result.decode("utf-8").splitlines(keepends=True), + ): + print(f" {diff}") + + +test_set_server_config( + "", + "SERVER=test\n", +) + +test_set_server_config( + """[General] +#REMARKABLE_RELEASE_APPID={98DA7DF2-4E3E-4744-9DE6-EC931886ABAB} +#SERVER=https://updates.cloud.remarkable.engineering/service/update2 +#GROUP=Prod +#PLATFORM=reMarkable2 +REMARKABLE_RELEASE_VERSION=3.9.5.2026 +""", + """[General] +SERVER=test +#REMARKABLE_RELEASE_APPID={98DA7DF2-4E3E-4744-9DE6-EC931886ABAB} +#SERVER=https://updates.cloud.remarkable.engineering/service/update2 +#GROUP=Prod +#PLATFORM=reMarkable2 +REMARKABLE_RELEASE_VERSION=3.9.5.2026 +""", +) + +test_set_server_config( + """[General] +SERVER=testing +#REMARKABLE_RELEASE_APPID={98DA7DF2-4E3E-4744-9DE6-EC931886ABAB} +#SERVER=https://updates.cloud.remarkable.engineering/service/update2 +#GROUP=Prod +#PLATFORM=reMarkable2 +REMARKABLE_RELEASE_VERSION=3.9.5.2026 +""", + """[General] +SERVER=test +#SERVER=testing +#REMARKABLE_RELEASE_APPID={98DA7DF2-4E3E-4744-9DE6-EC931886ABAB} +#SERVER=https://updates.cloud.remarkable.engineering/service/update2 +#GROUP=Prod +#PLATFORM=reMarkable2 +REMARKABLE_RELEASE_VERSION=3.9.5.2026 +""", +) + +test_ls( + "/", + ". .. lost+found bin boot dev etc home lib media mnt postinst proc run sbin sys tmp uboot-postinst uboot-version usr var\n", +) +test_ls( + "/mnt", + ". ..\n", +) + +test_cat("/etc/version", b"20221026104022\n") + +assert_gt( + "toltec rm1 version", updater.get_toltec_version(HardwareType.RM1), "3.3.2.1666" +) +assert_gt( + "toltec rm2 version", updater.get_toltec_version(HardwareType.RM2), "3.3.2.1666" +) +with assert_raises("toltec rmpp version", SystemExit): + updater.get_toltec_version(HardwareType.RMPP) +with assert_raises("toltec rmppm version", SystemExit): + updater.get_toltec_version(HardwareType.RMPPM) +with assert_raises("toltec rmppure version", SystemExit): + updater.get_toltec_version(HardwareType.RMPPURE) + +assert_value( + "boundary cross 3.23->3.20", + UpdateManager.is_bootloader_boundary_downgrade("3.23.0.64", "3.20.0.92"), + True, +) +assert_value( + "boundary cross 3.22->3.20", + UpdateManager.is_bootloader_boundary_downgrade("3.22.0.64", "3.20.0.92"), + True, +) +assert_value( + "no boundary 3.23->3.22", + UpdateManager.is_bootloader_boundary_downgrade("3.23.0.64", "3.22.0.64"), + False, +) +assert_value( + "no boundary 3.20->3.19", + UpdateManager.is_bootloader_boundary_downgrade("3.20.0.92", "3.19.0.82"), + False, +) +assert_value( + "upgrade 3.20->3.22", + UpdateManager.is_bootloader_boundary_downgrade("3.20.0.92", "3.22.0.64"), + False, +) +assert_value( + "same version 3.22->3.22", + UpdateManager.is_bootloader_boundary_downgrade("3.22.0.64", "3.22.0.64"), + False, +) +with assert_raises("empty string current", ValueError): + UpdateManager.is_bootloader_boundary_downgrade("", "3.20.0.92") +with assert_raises("non-numeric version", ValueError): + UpdateManager.is_bootloader_boundary_downgrade("abc.def", "3.20.0.92") + +if FAILED: + sys.exit(1)