diff --git a/codexctl/__init__.py b/codexctl/__init__.py
index b68f8d0..b0b4a21 100644
--- a/codexctl/__init__.py
+++ b/codexctl/__init__.py
@@ -1,17 +1,16 @@
### Importing required general modules
import argparse
-import os.path
-import sys
-import logging
import importlib.util
-import tempfile
-import shutil
import json
+import logging
+import os.path
import re
-
-from typing import cast
+import shutil
+import sys
+import tempfile
from os import listdir
+from typing import cast
try:
from loguru import logger
@@ -67,6 +66,9 @@ def call_func(self, function: str, args: dict) -> None:
### Download functionalities
if function == "list":
+ remarkable_ppure_versions = "\n".join(
+ self.updater.remarkableppure_versions.keys()
+ )
remarkable_pp_versions = "\n".join(
self.updater.remarkablepp_versions.keys()
)
@@ -77,14 +79,26 @@ def call_func(self, function: str, args: dict) -> None:
remarkable_1_versions = "\n".join(self.updater.remarkable1_versions.keys())
version_blocks = []
+ if remarkable_version is None or remarkable_version == HardwareType.RMPPURE:
+ version_blocks.append(
+ f"{HardwareType.RMPPURE.formatted_name}:\n{remarkable_ppure_versions}"
+ )
if remarkable_version is None or remarkable_version == HardwareType.RMPP:
- version_blocks.append(f"ReMarkable Paper Pro:\n{remarkable_pp_versions}")
+ version_blocks.append(
+ f"{HardwareType.RMPP.formatted_name}:\n{remarkable_pp_versions}"
+ )
if remarkable_version is None or remarkable_version == HardwareType.RMPPM:
- version_blocks.append(f"ReMarkable Paper Pro Move:\n{remarkable_ppm_versions}")
+ version_blocks.append(
+ f"{HardwareType.RMPPM.formatted_name}:\n{remarkable_ppm_versions}"
+ )
if remarkable_version is None or remarkable_version == HardwareType.RM2:
- version_blocks.append(f"ReMarkable 2:\n{remarkable_2_versions}")
+ version_blocks.append(
+ f"{HardwareType.RM2.formatted_name}:\n{remarkable_2_versions}"
+ )
if remarkable_version is None or remarkable_version == HardwareType.RM1:
- version_blocks.append(f"ReMarkable 1:\n{remarkable_1_versions}")
+ version_blocks.append(
+ f"{HardwareType.RM1.formatted_name}:\n{remarkable_1_versions}"
+ )
print("\n\n".join(version_blocks))
@@ -122,8 +136,9 @@ def call_func(self, function: str, args: dict) -> None:
logger.info(f"Extracted image to {args['out']}")
else:
try:
- from .analysis import get_update_image
from remarkable_update_fuse import UpdateFS
+
+ from .analysis import get_update_image
except ImportError:
raise ImportError(
"remarkable_update_fuse is required for mounting. Please install it!"
@@ -266,7 +281,9 @@ def version_lookup(version: str | None) -> re.Match[str] | None:
image = UpdateImage(update_file)
if isinstance(image, CPIOUpdateImage):
if image.version is None:
- raise SystemError(f"Could not determine version from SWU file: {update_file}")
+ raise SystemError(
+ f"Could not determine version from SWU file: {update_file}"
+ )
version_number = image.version
hw_map = {
@@ -274,12 +291,17 @@ def version_lookup(version: str | None) -> re.Match[str] | None:
"reMarkable2": HardwareType.RM2,
"ferrari": HardwareType.RMPP,
"chiappa": HardwareType.RMPPM,
+ "tatsu": HardwareType.RMPPURE,
}
if image.hardware_type not in hw_map:
- raise SystemError(f"Unsupported hardware type in SWU file: {update_file}")
+ raise SystemError(
+ f"Unsupported hardware type in SWU file: {update_file}"
+ )
swu_hardware = hw_map[image.hardware_type]
- logger.info(f"Extracted from SWU: version={version_number}, hardware={swu_hardware.name}")
+ logger.info(
+ f"Extracted from SWU: version={version_number}, hardware={swu_hardware.name}"
+ )
if swu_hardware != remarkable.hardware:
raise SystemError(
@@ -289,7 +311,9 @@ def version_lookup(version: str | None) -> re.Match[str] | None:
f"Cannot install firmware for different hardware."
)
except ValueError as e:
- logger.warning(f"Could not extract metadata from update file: {e}")
+ logger.warning(
+ f"Could not extract metadata from update file: {e}"
+ )
if not version_number:
version_match = version_lookup(version)
@@ -337,15 +361,18 @@ def version_lookup(version: str | None) -> re.Match[str] | None:
bootloader_files_for_install = None
- if (device_version_uses_new_engine and
- remarkable.hardware == HardwareType.RMPP):
-
+ if (
+ device_version_uses_new_engine
+ and remarkable.hardware == HardwareType.RMPP
+ ):
current_version = remarkable.get_device_status()[2]
- if UpdateManager.is_bootloader_boundary_downgrade(current_version, version_number):
- print("\n" + "="*60)
+ if UpdateManager.is_bootloader_boundary_downgrade(
+ current_version, version_number
+ ):
+ print("\n" + "=" * 60)
print("WARNING: Bootloader Update Required")
- print("="*60)
+ print("=" * 60)
print(f"Current version: {current_version}")
print(f"Target version: {version_number}")
print()
@@ -359,21 +386,23 @@ def version_lookup(version: str | None) -> re.Match[str] | None:
print()
response = input("Do you want to continue? (y/N): ")
- if response.lower() != 'y':
+ if response.lower() != "y":
raise SystemExit("Installation cancelled by user")
expected_swu_name = f"remarkable-production-memfault-image-{current_version}-{remarkable.hardware.new_download_hw}-public"
expected_swu_path = f"./{expected_swu_name}"
if os.path.isfile(expected_swu_path):
- print(f"\nUsing existing {expected_swu_name} for bootloader extraction...")
+ print(
+ f"\nUsing existing {expected_swu_name} for bootloader extraction..."
+ )
current_swu_path = expected_swu_path
else:
- print("\nDownloading current version's SWU for bootloader extraction...")
+ print(
+ "\nDownloading current version's SWU for bootloader extraction..."
+ )
current_swu_path = self.updater.download_version(
- remarkable.hardware,
- current_version,
- "./"
+ remarkable.hardware, current_version, "./"
)
if not current_swu_path:
@@ -384,17 +413,26 @@ def version_lookup(version: str | None) -> re.Match[str] | None:
print("Extracting bootloader files...")
from remarkable_update_image import UpdateImage
+
swu_image = UpdateImage(current_swu_path)
bootloader_files_for_install = {
- 'update-bootloader.sh': swu_image.archive[b'update-bootloader.sh'].read(),
- 'imx-boot': swu_image.archive[b'imx-boot'].read(),
+ "update-bootloader.sh": swu_image.archive[
+ b"update-bootloader.sh"
+ ].read(),
+ "imx-boot": swu_image.archive[b"imx-boot"].read(),
}
if not all(bootloader_files_for_install.values()):
- raise SystemError("Failed to extract bootloader files from current version")
+ raise SystemError(
+ "Failed to extract bootloader files from current version"
+ )
- print(f"✓ Extracted update-bootloader.sh ({len(bootloader_files_for_install['update-bootloader.sh'])} bytes)")
- print(f"✓ Extracted imx-boot ({len(bootloader_files_for_install['imx-boot'])} bytes)")
+ print(
+ f"✓ Extracted update-bootloader.sh ({len(bootloader_files_for_install['update-bootloader.sh'])} bytes)"
+ )
+ print(
+ f"✓ Extracted imx-boot ({len(bootloader_files_for_install['imx-boot'])} bytes)"
+ )
print()
if not update_file_requires_new_engine:
@@ -437,7 +475,9 @@ def version_lookup(version: str | None) -> re.Match[str] | None:
)
if device_version_uses_new_engine:
- remarkable.install_sw_update(update_file, bootloader_files=bootloader_files_for_install)
+ remarkable.install_sw_update(
+ update_file, bootloader_files=bootloader_files_for_install
+ )
else:
remarkable.install_ohma_update(update_file)
diff --git a/codexctl/__main__.py b/codexctl/__main__.py
index 09326c0..868d99e 100644
--- a/codexctl/__main__.py
+++ b/codexctl/__main__.py
@@ -1,4 +1,4 @@
-from . import main
-
-if __name__ == "__main__":
- main()
+from . import main
+
+if __name__ == "__main__":
+ main()
diff --git a/codexctl/analysis.py b/codexctl/analysis.py
index f84c4d3..1770341 100644
--- a/codexctl/analysis.py
+++ b/codexctl/analysis.py
@@ -1,34 +1,34 @@
-import ext4
-import warnings
-import errno
-
-from remarkable_update_image import UpdateImage
-from remarkable_update_image import UpdateImageSignatureException
-from .device import HardwareType
-
-
-def get_update_image(file: str):
- """Extracts files from an update image (<3.11 currently)"""
-
- image = UpdateImage(file)
- volume = ext4.Volume(image, offset=0)
- try:
- inode = volume.inode_at("/usr/share/update_engine/update-payload-key.pub.pem")
- if inode is None:
- raise FileNotFoundError()
-
- inode.verify()
- image.verify(inode.open().read())
-
- except UpdateImageSignatureException:
- warnings.warn("Signature doesn't match contents", RuntimeWarning)
-
- except FileNotFoundError:
- warnings.warn("Public key missing", RuntimeWarning)
-
- except OSError as e:
- if e.errno != errno.ENOTDIR:
- raise
- warnings.warn("Unable to open public key", RuntimeWarning)
-
- return image, volume
+import ext4
+import warnings
+import errno
+
+from remarkable_update_image import UpdateImage
+from remarkable_update_image import UpdateImageSignatureException
+from .device import HardwareType
+
+
+def get_update_image(file: str):
+ """Extracts files from an update image (<3.11 currently)"""
+
+ image = UpdateImage(file)
+ volume = ext4.Volume(image, offset=0)
+ try:
+ inode = volume.inode_at("/usr/share/update_engine/update-payload-key.pub.pem")
+ if inode is None:
+ raise FileNotFoundError()
+
+ inode.verify()
+ image.verify(inode.open().read())
+
+ except UpdateImageSignatureException:
+ warnings.warn("Signature doesn't match contents", RuntimeWarning)
+
+ except FileNotFoundError:
+ warnings.warn("Public key missing", RuntimeWarning)
+
+ except OSError as e:
+ if e.errno != errno.ENOTDIR:
+ raise
+ warnings.warn("Unable to open public key", RuntimeWarning)
+
+ return image, volume
diff --git a/codexctl/device.py b/codexctl/device.py
index 226ac93..765fe0e 100644
--- a/codexctl/device.py
+++ b/codexctl/device.py
@@ -1,1024 +1,1124 @@
-import enum
-import logging
-import os
-import re
-import shlex
-import socket
-import subprocess
-import tempfile
-import threading
-import time
-
-from .server import startUpdate
-
-try:
- import paramiko
- import psutil
-except ImportError:
- pass
-
-
-class HardwareType(enum.Enum):
- RM1 = enum.auto()
- RM2 = enum.auto()
- RMPP = enum.auto()
- RMPPM = enum.auto()
-
- @classmethod
- def parse(cls, device_type: str) -> "HardwareType":
- match device_type.lower():
- case "ppm" | "rmppm" | "chiappa" | "remarkable chiappa":
- return cls.RMPPM
-
- case "pp" | "pro" | "rmpp" | "ferrari" | "remarkable ferrari":
- return cls.RMPP
-
- case "2" | "rm2" | "remarkable 2" | "remarkable 2.0":
- return cls.RM2
-
- case "1" | "rm1" | "remarkable 1" | "remarkable 1.0" | "remarkable prototype 1":
- return cls.RM1
-
- case _:
- raise ValueError(f"Unknown hardware version: {device_type} (rm1, rm2, rmpp, rmppm)")
-
- @property
- def old_download_hw(self):
- match self:
- case HardwareType.RM1:
- return "reMarkable"
- case HardwareType.RM2:
- return "reMarkable2"
- case HardwareType.RMPP:
- raise ValueError("reMarkable Paper Pro does not support the old update engine")
- case HardwareType.RMPPM:
- raise ValueError("reMarkable Paper Pro Move does not support the old update engine")
-
- @property
- def new_download_hw(self):
- match self:
- case HardwareType.RM1:
- return "rm1"
- case HardwareType.RM2:
- return "rm2"
- case HardwareType.RMPP:
- return "rmpp"
- case HardwareType.RMPPM:
- return "rmppm"
-
- @property
- def swupdate_hw(self):
- match self:
- case HardwareType.RM1:
- return "reMarkable1"
- case HardwareType.RM2:
- return "reMarkable2"
- case HardwareType.RMPP:
- return "ferrari"
- case HardwareType.RMPPM:
- return "chiappa"
-
- @property
- def toltec_type(self):
- match self:
- case HardwareType.RM1:
- return "rm1"
- case HardwareType.RM2:
- return "rm2"
- case HardwareType.RMPP:
- raise ValueError("reMarkable Paper Pro does not support toltec")
- case HardwareType.RMPPM:
- raise ValueError("reMarkable Paper Pro Move does not support toltec")
-
-class DeviceManager:
- def __init__(
- self, logger=None, remote=False, address=None, authentication=None
- ) -> None:
- """Initializes the DeviceManager for codexctl
-
- Args:
- remote (bool, optional): Whether the device is remote. Defaults to False.
- address (bool, optional): Known IP of remote device, if applicable. Defaults to None.
- logger (logger, optional): Logger object for logging. Defaults to None.
- Authentication (str, optional): Authentication method. Defaults to None.
- """
- self.logger = logger
- self.address = address
- self.authentication = authentication
- self.client = None
-
- if self.logger is None:
- self.logger = logging
-
- if remote:
- self.client = self.connect_to_device(
- authentication=authentication, remote_address=address
- )
-
- self.client.authentication = authentication
- self.client.address = address
-
- ftp = self.client.open_sftp()
- with ftp.file("/sys/devices/soc0/machine") as file:
- machine_contents = file.read().decode("utf-8").strip("\n")
- else:
- with open("/sys/devices/soc0/machine") as file:
- machine_contents = file.read().strip("\n")
-
- self.hardware = HardwareType.parse(machine_contents)
-
- def get_host_address(self) -> list[str] | list | None: # Interaction required
- """Gets the IP address of the host machine
-
- Returns:
- str | None: IP address of the host machine, or None if not found
- """
-
- possible_ips = []
- try:
- for interface, snics in psutil.net_if_addrs().items():
- self.logger.debug(f"New interface found: {interface}")
- for snic in snics:
- if snic.family == socket.AF_INET:
- if snic.address.startswith("10.11.99"):
- return snic.address
- self.logger.debug(f"Adding new address: {snic.address}")
- possible_ips.append(snic.address)
-
- except Exception as error:
- self.logger.error(f"Error automatically getting interfaces: {error}")
-
- if possible_ips:
- host_interfaces = "\n".join(possible_ips)
- else:
- host_interfaces = "Could not find any available interfaces."
-
- print(f"\n{host_interfaces}")
- while True:
- host_address = input(
- "\nPlease enter your host IP for the network the device is connected to: "
- )
-
- if possible_ips and host_address not in host_interfaces.split("\n"):
- print("Error: Invalid IP given")
- continue
-
- if "n" in input("Are you sure? (Y/n): ").lower():
- continue
-
- break
-
- return host_address
-
- def get_remarkable_address(self) -> str:
- """Gets the IP address of the remarkable device
-
- Returns:
- str: IP address of the remarkable device
- """
-
- if self.check_is_address_reachable("10.11.99.1"):
- return "10.11.99.1"
-
- while True:
- remote_ip = input("Please enter the IP of the remarkable device: ")
-
- if self.check_is_address_reachable(remote_ip):
- return remote_ip
-
- print(f"Error: Device {remote_ip} is not reachable. Please try again.")
-
- def check_is_address_reachable(self, remote_ip="10.11.99.1") -> bool:
- """Checks if the given IP address is reachable over SSH
-
- Args:
- remote_ip (str, optional): IP to check. Defaults to '10.11.99.1'.
-
- Returns:
- bool: True if reachable, False otherwise
- """
- self.logger.debug(f"Checking if {remote_ip} is reachable")
- try:
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.settimeout(1)
-
- sock.connect((remote_ip, 22))
- sock.shutdown(2)
-
- return True
-
- except Exception:
- self.logger.debug(f"Device {remote_ip} is not reachable")
- return False
-
- def connect_to_device(
- self, remote_address=None, authentication=None
- ) -> paramiko.client.SSHClient:
- """Connects to the device using the given IP address
-
- Args:
- remote_address (str, optional): IP address of the device.
- authentication (str, optional): Authentication credentials. Defaults to None.
-
- Returns:
- paramiko.client.SSHClient: SSH client object for the device.
- """
-
- if remote_address is None:
- remote_address = self.get_remarkable_address()
- self.address = remote_address # For future reference
- else:
- if self.check_is_address_reachable(remote_address) is False:
- raise SystemError(f"Error: Device {remote_address} is not reachable!")
-
- client = paramiko.client.SSHClient()
- client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
-
- if authentication:
- self.logger.debug(f"Using authentication: {authentication}")
- try:
- if os.path.isfile(authentication):
- self.logger.debug(
- f"Attempting to connect to {remote_address} with key file {authentication}"
- )
- client.connect(
- remote_address, username="root", key_filename=authentication
- )
- else:
- self.logger.debug(
- f"Attempting to connect to {remote_address} with password {authentication}"
- )
- client.connect(
- remote_address, username="root", password=authentication
- )
-
- except paramiko.ssh_exception.AuthenticationException:
- print("Incorrect password or ssh path given in arguments!")
-
- elif (
- "n" in input("Would you like to use a password to connect? (Y/n): ").lower()
- ):
- while True:
- key_path = input("Enter path to SSH key: ")
-
- try:
- self.logger.debug(
- f"Attempting to connect to {remote_address} with key file {key_path}"
- )
- client.connect(
- remote_address, username="root", key_filename=key_path
- )
- except Exception:
- print("Error while connecting to device: {error}")
-
- continue
- break
- else:
- while True:
- password = input("Enter RM SSH password: ")
-
- try:
- self.logger.debug(
- f"Attempting to connect to {remote_address} with password {password}"
- )
- client.connect(remote_address, username="root", password=password)
- except paramiko.ssh_exception.AuthenticationException:
- print("Incorrect password given")
-
- continue
- break
-
- print("Success: Connected to device")
-
- return client
-
- def _read_version_from_path(self, ftp=None, base_path: str = "") -> tuple[str, bool]:
- """Reads version from a given path (current partition or mounted backup)
-
- Args:
- ftp: SFTP client connection (None for local file access)
- base_path: Base path prefix (empty for current partition, /tmp/mount_pX for backup)
-
- Returns:
- tuple: (version_string, old_update_engine_boolean)
- """
- update_conf_path = f"{base_path}/usr/share/remarkable/update.conf" if base_path else "/usr/share/remarkable/update.conf"
- os_release_path = f"{base_path}/etc/os-release" if base_path else "/etc/os-release"
-
- if ftp:
- def file_exists(path: str) -> bool:
- try:
- ftp.stat(path)
- return True
- except FileNotFoundError:
- return False
-
- def read_file(path: str) -> str:
- with ftp.file(path) as file:
- return file.read().decode("utf-8")
- else:
- file_exists = os.path.exists
-
- def read_file(path: str) -> str:
- with open(path, encoding="utf-8") as file:
- return file.read()
-
- if file_exists(update_conf_path):
- contents = read_file(update_conf_path).strip("\n")
- match = re.search("(?<=REMARKABLE_RELEASE_VERSION=).*", contents)
- if match:
- return match.group(), True
- raise SystemError(f"REMARKABLE_RELEASE_VERSION not found in {update_conf_path}")
-
- if file_exists(os_release_path):
- contents = read_file(os_release_path)
- match = re.search("(?<=IMG_VERSION=).*", contents)
- if match:
- return match.group().strip('"'), False
- raise SystemError(f"IMG_VERSION not found in {os_release_path}")
-
- raise SystemError(f"Cannot read version from {base_path or 'current partition'}: no version file found")
-
- def _get_active_device(self) -> str:
- """Gets the active root device path.
-
- Returns:
- str: Active device path (e.g., /dev/mmcblk2p2)
-
- Raises:
- SystemError: If command fails or returns no output
- """
- if self.hardware in (HardwareType.RMPP, HardwareType.RMPPM):
- cmd = "swupdate -g"
- else:
- cmd = "rootdev"
-
- if self.client:
- _stdin, stdout, stderr = self.client.exec_command(cmd)
- output = stdout.read().decode("utf-8").strip()
- exit_status = stdout.channel.recv_exit_status()
- if exit_status != 0 or not output:
- error = stderr.read().decode("utf-8", errors="ignore")
- raise SystemError(f"Failed to get active device using '{cmd}': {error or 'no output'}")
- return output
- else:
- result = subprocess.run(cmd.split(), capture_output=True, text=True)
- if result.returncode != 0 or not result.stdout.strip():
- raise SystemError(f"Failed to get active device using '{cmd}': {result.stderr or 'no output'}")
- return result.stdout.strip()
-
- def _parse_partition_info(self, active_device: str) -> tuple[int, int, str]:
- """Parse partition numbers from device path.
-
- Args:
- active_device: Device path (e.g., /dev/mmcblk2p2)
-
- Returns:
- tuple: (active_part, inactive_part, device_base)
- """
- active_part = int(active_device.split('p')[-1])
- inactive_part = 3 if active_part == 2 else 2
- device_base = re.sub(r'p\d+$', '', active_device)
- return active_part, inactive_part, device_base
-
- def _get_backup_partition_version(self) -> str:
- """Gets the version installed on the backup (inactive) partition
-
- Returns:
- str: Version string (empty string for RM1/RM2 on failure)
-
- Raises:
- SystemError: If backup partition version cannot be determined (Paper Pro only)
- """
- try:
- active_device = self._get_active_device()
- _, inactive_part, device_base = self._parse_partition_info(active_device)
- mount_point = f"/tmp/mount_p{inactive_part}"
-
- if self.client:
- ftp = self.client.open_sftp()
- self.client.exec_command(f"mkdir -p {mount_point}")
- _stdin, stdout, _stderr = self.client.exec_command(
- f"mount -o ro {device_base}p{inactive_part} {mount_point}"
- )
- exit_status = stdout.channel.recv_exit_status()
-
- if exit_status != 0:
- error_msg = _stderr.read().decode('utf-8')
- raise SystemError(f"Failed to mount backup partition: {error_msg}")
-
- try:
- version, _ = self._read_version_from_path(ftp, mount_point)
- return version
- finally:
- self.client.exec_command(f"umount {mount_point}")
- self.client.exec_command(f"rm -rf {mount_point}")
- else:
- os.makedirs(mount_point, exist_ok=True)
- result = subprocess.run(
- ["mount", "-o", "ro", f"{device_base}p{inactive_part}", mount_point],
- capture_output=True, text=True
- )
- if result.returncode != 0:
- raise SystemError(f"Failed to mount backup partition: {result.stderr}")
-
- try:
- version, _ = self._read_version_from_path(base_path=mount_point)
- return version
- finally:
- subprocess.run(["umount", mount_point])
- subprocess.run(["rm", "-rf", mount_point])
- except SystemError:
- if self.hardware in (HardwareType.RMPP, HardwareType.RMPPM):
- raise
- return ""
-
- def _get_paper_pro_partition_info(self, current_version: str) -> tuple[int, int, int]:
- """Gets partition information for Paper Pro devices
-
- Args:
- current_version: Current OS version string for version-aware detection
-
- Returns:
- tuple: (current_partition, inactive_partition, next_boot_partition)
- """
- active_device = self._get_active_device()
- current_part, inactive_part, _ = self._parse_partition_info(active_device)
-
- parts = current_version.split('.')
- if len(parts) >= 2 and parts[0].isdigit() and parts[1].isdigit():
- is_new_version = [int(parts[0]), int(parts[1])] >= [3, 22]
- else:
- raise SystemError(f"Cannot detect partition scheme: unexpected version format '{current_version}'")
-
- next_boot_part = current_part
-
- if self.client:
- ftp = self.client.open_sftp()
-
- def file_exists(path: str) -> bool:
- try:
- ftp.stat(path)
- return True
- except FileNotFoundError:
- return False
-
- def read_file(path: str) -> str:
- with ftp.file(path) as file:
- return file.read().decode("utf-8")
- else:
- file_exists = os.path.exists
-
- def read_file(path: str) -> str:
- with open(path, encoding="utf-8") as file:
- return file.read()
-
- if is_new_version:
- boot_part_path = "/sys/bus/mmc/devices/mmc0:0001/boot_part"
- try:
- if file_exists(boot_part_path):
- boot_part_value = read_file(boot_part_path).strip()
- next_boot_part = 2 if boot_part_value == "1" else 3
- else:
- is_new_version = False
- except (IOError, OSError):
- is_new_version = False
-
- if not is_new_version:
- root_part_path = "/sys/devices/platform/lpgpr/root_part"
- try:
- if file_exists(root_part_path):
- root_part_value = read_file(root_part_path).strip()
- next_boot_part = 2 if root_part_value == "a" else 3
- except (IOError, OSError) as e:
- self.logger.debug(f"Failed to read next boot partition: {e}")
-
- return current_part, inactive_part, next_boot_part
-
- def get_device_status(self) -> tuple[str | None, str, str, str, str]:
- """Gets the status of the device
-
- Returns:
- tuple: Beta status, old_update_engine, current version, version_id, backup version (in that order)
- """
- old_update_engine = True
- version_id = ""
- beta_contents = ""
-
- if self.client:
- self.logger.debug("Connecting to FTP")
- ftp = self.client.open_sftp()
- self.logger.debug("Connected")
-
- xochitl_version, old_update_engine = self._read_version_from_path(ftp)
- def exists(path:str) -> bool:
- try:
- _ = ftp.stat(path)
- return True
- except (FileNotFoundError, IOError):
- return False
-
- if exists("/etc/version"):
- with ftp.file("/etc/version") as file:
- version_id = file.read().decode("utf-8").strip("\n")
-
- if exists("/home/root/.config/remarkable/xochitl.conf"):
- with ftp.file("/home/root/.config/remarkable/xochitl.conf") as file:
- beta_contents = file.read().decode("utf-8")
-
- else:
- xochitl_version, old_update_engine = self._read_version_from_path()
-
- if os.path.exists("/etc/version"):
- with open("/etc/version", encoding="utf-8") as file:
- version_id = file.read().rstrip()
-
- if os.path.exists("/home/root/.config/remarkable/xochitl.conf"):
- with open("/home/root/.config/remarkable/xochitl.conf", encoding="utf-8") as file:
- beta_contents = file.read().rstrip()
-
- beta_possible = re.search("(?<=GROUP=).*", beta_contents)
- beta = "Release"
-
- if beta_possible is not None:
- beta = re.search("(?<=GROUP=).*", beta_contents).group()
-
- backup_version = self._get_backup_partition_version()
-
- return beta, old_update_engine, xochitl_version, version_id, backup_version
-
- def set_server_config(self, contents: str, server_host_name: str) -> str:
- """Converts the contents given to point to the given server IP and port
-
- Args:
- contents (str): Contents of the update.conf file
- server_host_name (str): Hostname of the server
-
- Returns:
- str: Converted contents
- """
- data_attributes = contents.split("\n")
- line = 0
-
- self.logger.debug(f"Contents are:\n{contents}")
-
- for i in range(0, len(data_attributes)):
- if data_attributes[i].startswith("[General]"):
- self.logger.debug("Found [General] line")
- line = i + 1
- if not data_attributes[i].startswith("SERVER="):
- continue
-
- data_attributes[i] = f"#{data_attributes[i]}"
- self.logger.debug(f"Using {data_attributes[i]}")
-
- data_attributes.insert(line, f"SERVER={server_host_name}")
- converted = "\n".join(data_attributes)
-
- self.logger.debug(f"Converted contents are:\n{converted}")
-
- return converted
-
- def edit_update_conf(self, server_ip: str, server_port: str) -> bool:
- """Edits the update.conf file to point to the given server IP and port
-
- Args:
- server_ip (str): IP of update server
- server_port (str): Port of update service
-
- Returns:
- bool: True if successful, False otherwise
- """
- server_host_name = f"http://{server_ip}:{server_port}"
- self.logger.debug(f"Hostname is: {server_host_name}")
- try:
- if not self.client:
- self.logger.debug("Detected running on local device")
- with open(
- "/usr/share/remarkable/update.conf", encoding="utf-8"
- ) as file:
- modified_conf_version = self.set_server_config(
- file.read(), server_host_name
- )
-
- with open("/usr/share/remarkable/update.conf", "w") as file:
- file.write(modified_conf_version)
-
- return True
-
- self.logger.debug("Connecting to FTP")
- ftp = self.client.open_sftp() # or ssh
- self.logger.debug("Connected")
-
- with ftp.file("/usr/share/remarkable/update.conf") as update_conf_file:
- modified_conf_version = self.set_server_config(
- update_conf_file.read().decode("utf-8"), server_host_name
- )
-
- with ftp.file("/usr/share/remarkable/update.conf", "w") as update_conf_file:
- update_conf_file.write(modified_conf_version)
-
- return True
- except Exception as error:
- self.logger.error(f"Error while editing update.conf: {error}")
- return False
-
- def restore_previous_version(self) -> None:
- """Restores the previous version of the device"""
-
- RESTORE_CODE = """/sbin/fw_setenv "upgrade_available" "1"
-/sbin/fw_setenv "bootcount" "0"
-
-OLDPART=$(/sbin/fw_printenv -n active_partition)
-if [ $OLDPART == "2" ]; then
- NEWPART="3"
-else
- NEWPART="2"
-fi
-echo "new: ${NEWPART}"
-echo "fallback: ${OLDPART}"
-
-/sbin/fw_setenv "fallback_partition" "${OLDPART}"
-/sbin/fw_setenv "active_partition" "${NEWPART}\""""
-
- if self.hardware in (HardwareType.RMPP, HardwareType.RMPPM):
- _, _, current_version, _, backup_version = self.get_device_status()
- current_part, inactive_part, _ = self._get_paper_pro_partition_info(current_version)
-
- new_part_label = "a" if inactive_part == 2 else "b"
-
- parts = current_version.split('.')
- if len(parts) < 2 or not parts[0].isdigit() or not parts[1].isdigit():
- raise SystemError(f"Cannot restore: unexpected current version format '{current_version}'")
-
- current_is_new = [int(parts[0]), int(parts[1])] >= [3, 22]
-
- parts = backup_version.split('.')
- if len(parts) < 2 or not parts[0].isdigit() or not parts[1].isdigit():
- raise SystemError(f"Cannot restore: unexpected backup version format '{backup_version}'")
-
- target_is_new = [int(parts[0]), int(parts[1])] >= [3, 22]
-
- code = [
- "#!/bin/bash",
- f"echo 'Switching from partition {current_part} to partition {inactive_part}'",
- f"echo 'Current version: {current_version}'",
- f"echo 'Target version: {backup_version}'",
- ]
-
- if not current_is_new:
- code.extend([
- f"echo '{new_part_label}' > /sys/devices/platform/lpgpr/root_part",
- "echo 'Set next boot via sysfs (legacy method)'",
- ])
-
- if target_is_new or current_is_new:
- code.extend([
- f"mmc bootpart enable {inactive_part - 1} 0 /dev/mmcblk0boot{inactive_part - 2}",
- "echo 'Set next boot via mmc bootpart (new method)'",
- ])
-
- code.extend([
- f"echo '0' > /sys/devices/platform/lpgpr/root{new_part_label}_errcnt 2>/dev/null || true",
- "echo 'Partition switch complete'",
- ])
-
- RESTORE_CODE = "\n".join(code)
-
- if self.client:
- self.logger.debug("Connecting to FTP")
- ftp = self.client.open_sftp()
- self.logger.debug("Connected")
-
- with ftp.file("/tmp/restore.sh", "w") as file:
- file.write(RESTORE_CODE)
-
- self.logger.debug("Setting permissions and running restore.sh")
-
- self.client.exec_command("chmod +x /tmp/restore.sh")
- self.client.exec_command("bash /tmp/restore.sh")
- else:
- with open("/tmp/restore.sh", "w") as file:
- file.write(RESTORE_CODE)
-
- self.logger.debug("Setting permissions and running restore.sh")
-
- os.system("chmod +x /tmp/restore.sh")
- os.system("/tmp/restore.sh")
-
- self.logger.debug("Restore script ran")
-
- def reboot_device(self) -> None:
- REBOOT_CODE = """
-if systemctl is-active --quiet tarnish.service; then
- rot system call reboot
-else
- systemctl reboot
-fi
-"""
- if self.client:
- self.logger.debug("Connecting to FTP")
- ftp = self.client.open_sftp()
- self.logger.debug("Connected")
- with ftp.file("/tmp/reboot.sh", "w") as file:
- file.write(REBOOT_CODE)
-
- self.logger.debug("Running reboot.sh")
- self.client.exec_command("sh /tmp/reboot.sh")
-
- else:
- with open("/tmp/reboot.sh", "w") as file:
- file.write(REBOOT_CODE)
-
- self.logger.debug("Running reboot.sh")
- os.system("sh /tmp/reboot.sh")
-
- self.logger.debug("Device rebooted")
-
- def install_sw_update(self, version_file: str, bootloader_files: dict[str, bytes] | None = None) -> None:
- """
- Installs new version from version file path, utilising swupdate
-
- Args:
- version_file (str): Path to img file
- bootloader_files (dict[str, bytes] | None): Bootloader files for Paper Pro downgrade
-
- Raises:
- SystemExit: If there was an error installing the update
-
- """
- if self.client:
- ftp_client = self.client.open_sftp()
-
- print(f"Uploading {version_file} image")
-
- out_location = f"/tmp/{os.path.basename(version_file)}.swu"
- ftp_client.put(
- version_file, out_location, callback=self.output_put_progress
- )
-
- print("\nDone! Running swupdate (PLEASE BE PATIENT, ~5 MINUTES)")
-
- command = f"bash -c 'source /usr/lib/swupdate/conf.d/09-swupdate-args && swupdate $SWUPDATE_ARGS -i {shlex.quote(out_location)}'"
- self.logger.debug(command)
- _stdin, stdout, _stderr = self.client.exec_command(command)
-
- exit_status = stdout.channel.recv_exit_status()
-
- if exit_status != 0:
- print("".join(_stderr.readlines()))
- raise SystemError("Update failed!")
-
- if bootloader_files:
- print("\nApplying bootloader update...")
- self._update_paper_pro_bootloader(
- bootloader_files['update-bootloader.sh'],
- bootloader_files['imx-boot']
- )
- print("✓ Bootloader update completed")
-
- print("Done! Now rebooting the device and disabling update service")
-
- #### Now disable automatic updates
-
- self.client.exec_command("sleep 1 && reboot") # Should be enough
- self.client.close()
-
- time.sleep(
- 2
- ) # Somehow the code runs faster than the time it takes for the device to reboot
-
- print("Trying to connect to device")
-
- while not self.check_is_address_reachable(self.address):
- time.sleep(1)
-
- self.client = self.connect_to_device(
- remote_address=self.address, authentication=self.authentication
- )
-
- self.client.exec_command("systemctl stop swupdate memfaultd")
-
- print(
- "Update complete and update service disabled, restart device to enable it"
- )
-
- else:
- print("Running swupdate")
- command = ["bash", "-c", f"source /usr/lib/swupdate/conf.d/09-swupdate-args && swupdate $SWUPDATE_ARGS -i {shlex.quote(version_file)}"]
- self.logger.debug(command)
-
- try:
- output = subprocess.check_output(
- command,
- stderr=subprocess.STDOUT,
- text=True,
- env={"PATH": "/bin:/usr/bin:/sbin:/usr/sbin"},
- )
- self.logger.debug(f"Stdout of swupdate: {output}")
- except subprocess.CalledProcessError as e:
- print(e.output)
- raise SystemError("Update failed")
-
- print("Update complete and device rebooting")
- os.system("reboot")
-
- def _update_paper_pro_bootloader(self, bootloader_script: bytes, imx_boot: bytes) -> None:
- """
- Update bootloader on Paper Pro device for 3.22+ -> <3.22 downgrades.
-
- This method uploads the bootloader script and image to the device,
- then runs the update script twice (preinst and postinst) to update
- both boot partitions.
-
- Args:
- bootloader_script: Contents of update-bootloader.sh
- imx_boot: Contents of imx-boot image file
-
- Raises:
- SystemError: If bootloader update fails
- """
- self.logger.info("Starting bootloader update for Paper Pro")
-
- if not self.client:
- raise SystemError("No SSH connection to device")
-
- ftp_client = None
- try:
- ftp_client = self.client.open_sftp()
- except Exception:
- raise SystemError("Failed to open SFTP connection for bootloader update")
-
- script_path = "/tmp/update-bootloader.sh"
- boot_image_path = "/tmp/imx-boot"
-
- with tempfile.NamedTemporaryFile(mode='wb', delete=False, suffix='.sh') as tmp_script:
- tmp_script.write(bootloader_script)
- tmp_script_path = tmp_script.name
-
- with tempfile.NamedTemporaryFile(mode='wb', delete=False, suffix='.img') as tmp_boot:
- tmp_boot.write(imx_boot)
- tmp_boot_path = tmp_boot.name
-
- try:
- self.logger.debug("Uploading bootloader script to device")
- ftp_client.put(tmp_script_path, script_path)
-
- self.logger.debug("Uploading imx-boot image to device")
- ftp_client.put(tmp_boot_path, boot_image_path)
-
- self.logger.debug("Making bootloader script executable")
- _stdin, stdout, _stderr = self.client.exec_command(f"chmod +x {script_path}")
- stdout.channel.recv_exit_status()
-
- self.logger.info("Running bootloader update script (preinst)")
- _stdin, stdout, stderr = self.client.exec_command(
- f"{script_path} preinst {boot_image_path}"
- )
- exit_status = stdout.channel.recv_exit_status()
- if exit_status != 0:
- error_msg = "".join(stderr.readlines())
- raise SystemError(f"Bootloader preinst failed: {error_msg}")
-
- self.logger.info("Running bootloader update script (postinst)")
- _stdin, stdout, stderr = self.client.exec_command(
- f"{script_path} postinst {boot_image_path}"
- )
- exit_status = stdout.channel.recv_exit_status()
- if exit_status != 0:
- error_msg = "".join(stderr.readlines())
- raise SystemError(f"Bootloader postinst failed: {error_msg}")
-
- self.logger.info("Bootloader update completed successfully")
-
- finally:
- self.logger.debug("Cleaning up temporary bootloader files on device")
- self.client.exec_command(f"rm -f {script_path} {boot_image_path}")
-
- self.logger.debug("Cleaning up local temporary files")
- os.unlink(tmp_script_path)
- os.unlink(tmp_boot_path)
- if ftp_client:
- ftp_client.close()
-
- def install_ohma_update(self, version_available: dict) -> None:
- """Installs version from update folder on the device
-
- Args:
- version_available (dict): Version available for installation from `get_available_version`
-
- Raises:
- SystemExit: If there was an error installing the update
- """
-
- server_host = self.get_host_address()
-
- self.logger.debug("Editing config file")
-
- if (
- self.edit_update_conf(server_ip=server_host, server_port=8085) is False
- ): # We want a port that probably isn't being used
- self.logger.error("Error while editing update.conf")
-
- return
-
- thread = threading.Thread(
- target=startUpdate, args=(version_available, server_host, 8085), daemon=True
- )
- thread.start()
-
- self.logger.debug("Thread started")
-
- if self.client:
- print("Checking if device can connect to this machine")
-
- _stdin, stdout, _stderr = self.client.exec_command(
- f"sleep 2 && echo | nc {server_host} 8085"
- )
- check = stdout.channel.recv_exit_status()
- self.logger.debug(f"Stdout of nc checking: {stdout.readlines()}")
-
- if check != 0:
- raise SystemError(
- "Device cannot connect to this machine! Is the firewall blocking connections?"
- )
-
- print("Starting update service on device")
-
- self.client.exec_command("systemctl start update-engine")
-
- _stdin, stdout, _stderr = self.client.exec_command(
- "/usr/bin/update_engine_client -update"
- )
- exit_status = stdout.channel.recv_exit_status()
-
- if exit_status != 0:
- print("".join(_stderr.readlines()))
- raise SystemError("There was an error updating :(")
-
- self.logger.debug(
- f"Stdout of update checking service is {''.join(_stderr.readlines())}"
- )
-
- #### Now disable automatic updates
-
- print("Done! Now rebooting the device and disabling update service")
-
- self.client.exec_command("sleep 1 && reboot") # Should be enough
- self.client.close()
-
- time.sleep(
- 2
- ) # Somehow the code runs faster than the time it takes for the device to reboot
-
- print("Trying to connect to device")
-
- while not self.check_is_address_reachable(self.address):
- time.sleep(1)
-
- self.client = self.connect_to_device(
- remote_address=self.address, authentication=self.authentication
- )
- self.client.exec_command("systemctl stop update-engine")
-
- print(
- "Update complete and update service disabled. Restart device to enable it"
- )
-
- else:
- print("Enabling update service")
-
- subprocess.run(
- ["/bin/systemctl", "start", "update-engine"],
- text=True,
- check=True,
- env={"PATH": "/bin:/usr/bin:/sbin"},
- )
-
- with subprocess.Popen(
- ["/usr/bin/update_engine_client", "-update"],
- text=True,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- env={"PATH": "/bin:/usr/bin:/sbin"},
- ) as process:
- if process.wait() != 0:
- print("".join(process.stderr.readlines()))
-
- raise SystemError("There was an error updating :(")
-
- self.logger.debug(
- f"Stdout of update checking service is {''.join(process.stderr.readlines())}"
- )
-
- print("Update complete and device rebooting")
- os.system("reboot")
-
- @staticmethod
- def output_put_progress(transferred: int, toBeTransferred: int) -> None:
- """Used for displaying progress for paramiko ftp.put function"""
-
- print(
- f"Transferring progress{int((transferred / toBeTransferred) * 100)}%",
- end="\r",
- )
+import enum
+import logging
+import os
+import re
+import shlex
+import socket
+import subprocess
+import tempfile
+import threading
+import time
+
+from .server import startUpdate
+
+try:
+ import paramiko
+ import psutil
+except ImportError:
+ pass
+
+
+class HardwareType(enum.Enum):
+ RM1 = enum.auto()
+ RM2 = enum.auto()
+ RMPP = enum.auto()
+ RMPPM = enum.auto()
+ RMPPURE = enum.auto()
+
+ @classmethod
+ def parse(cls, device_type: str) -> "HardwareType":
+ match device_type.lower():
+ case "ppm" | "rmppm" | "chiappa" | "remarkable chiappa":
+ return cls.RMPPM
+
+ case "pp" | "pro" | "rmpp" | "ferrari" | "remarkable ferrari":
+ return cls.RMPP
+
+ case "2" | "rm2" | "remarkable 2" | "remarkable 2.0":
+ return cls.RM2
+
+ case (
+ "1"
+ | "rm1"
+ | "remarkable 1"
+ | "remarkable 1.0"
+ | "remarkable prototype 1"
+ ):
+ return cls.RM1
+
+ case "ppure" | "rmppure" | "tatsu" | "remarkable tatsu":
+ return cls.RMPPURE
+
+ case _:
+ raise ValueError(
+ f"Unknown hardware version: {device_type} (rm1, rm2, rmpp, rmppm)"
+ )
+
+ @property
+ def old_download_hw(self):
+ match self:
+ case HardwareType.RM1:
+ return "reMarkable"
+ case HardwareType.RM2:
+ return "reMarkable2"
+ case HardwareType.RMPP | HardwareType.RMPPM | HardwareType.RMPPURE:
+ raise ValueError(
+ f"{self.formatted_name} does not support the old update engine"
+ )
+
+ @property
+ def new_download_hw(self):
+ match self:
+ case HardwareType.RM1:
+ return "rm1"
+ case HardwareType.RM2:
+ return "rm2"
+ case HardwareType.RMPP:
+ return "rmpp"
+ case HardwareType.RMPPM:
+ return "rmppm"
+ case HardwareType.RMPPURE:
+ return "rmppure"
+
+ @property
+ def swupdate_hw(self):
+ match self:
+ case HardwareType.RM1:
+ return "reMarkable1"
+ case HardwareType.RM2:
+ return "reMarkable2"
+ case HardwareType.RMPP:
+ return "ferrari"
+ case HardwareType.RMPPM:
+ return "chiappa"
+ case HardwareType.RMPPURE:
+ return "tatsu"
+
+ @property
+ def formatted_name(self):
+ match self:
+ case HardwareType.RM1:
+ return "reMarkable 1"
+ case HardwareType.RM2:
+ return "reMarkable 2"
+ case HardwareType.RMPP:
+ return "reMarkable Paper Pro"
+ case HardwareType.RMPPM:
+ return "reMarkable Paper Pro Move"
+ case HardwareType.RMPPURE:
+ return "reMarkable Paper Pure"
+
+ @property
+ def toltec_type(self):
+ match self:
+ case HardwareType.RM1:
+ return "rm1"
+ case HardwareType.RM2:
+ return "rm2"
+ case HardwareType.RMPP | HardwareType.RMPPM | HardwareType.RMPPURE:
+ raise ValueError(f"{self.formatted_name} does not support toltec")
+
+
+class DeviceManager:
+ def __init__(
+ self, logger=None, remote=False, address=None, authentication=None
+ ) -> None:
+ """Initializes the DeviceManager for codexctl
+
+ Args:
+ remote (bool, optional): Whether the device is remote. Defaults to False.
+ address (bool, optional): Known IP of remote device, if applicable. Defaults to None.
+ logger (logger, optional): Logger object for logging. Defaults to None.
+ Authentication (str, optional): Authentication method. Defaults to None.
+ """
+ self.logger = logger
+ self.address = address
+ self.authentication = authentication
+ self.client = None
+
+ if self.logger is None:
+ self.logger = logging
+
+ if remote:
+ self.client = self.connect_to_device(
+ authentication=authentication, remote_address=address
+ )
+
+ self.client.authentication = authentication
+ self.client.address = address
+
+ ftp = self.client.open_sftp()
+ with ftp.file("/sys/devices/soc0/machine") as file:
+ machine_contents = file.read().decode("utf-8").strip("\n")
+ else:
+ with open("/sys/devices/soc0/machine") as file:
+ machine_contents = file.read().strip("\n")
+
+ self.hardware = HardwareType.parse(machine_contents)
+
+ def get_host_address(self) -> list[str] | list | None: # Interaction required
+ """Gets the IP address of the host machine
+
+ Returns:
+ str | None: IP address of the host machine, or None if not found
+ """
+
+ possible_ips = []
+ try:
+ for interface, snics in psutil.net_if_addrs().items():
+ self.logger.debug(f"New interface found: {interface}")
+ for snic in snics:
+ if snic.family == socket.AF_INET:
+ if snic.address.startswith("10.11.99"):
+ return snic.address
+ self.logger.debug(f"Adding new address: {snic.address}")
+ possible_ips.append(snic.address)
+
+ except Exception as error:
+ self.logger.error(f"Error automatically getting interfaces: {error}")
+
+ if possible_ips:
+ host_interfaces = "\n".join(possible_ips)
+ else:
+ host_interfaces = "Could not find any available interfaces."
+
+ print(f"\n{host_interfaces}")
+ while True:
+ host_address = input(
+ "\nPlease enter your host IP for the network the device is connected to: "
+ )
+
+ if possible_ips and host_address not in host_interfaces.split("\n"):
+ print("Error: Invalid IP given")
+ continue
+
+ if "n" in input("Are you sure? (Y/n): ").lower():
+ continue
+
+ break
+
+ return host_address
+
+ def get_remarkable_address(self) -> str:
+ """Gets the IP address of the remarkable device
+
+ Returns:
+ str: IP address of the remarkable device
+ """
+
+ if self.check_is_address_reachable("10.11.99.1"):
+ return "10.11.99.1"
+
+ while True:
+ remote_ip = input("Please enter the IP of the remarkable device: ")
+
+ if self.check_is_address_reachable(remote_ip):
+ return remote_ip
+
+ print(f"Error: Device {remote_ip} is not reachable. Please try again.")
+
+ def check_is_address_reachable(self, remote_ip="10.11.99.1") -> bool:
+ """Checks if the given IP address is reachable over SSH
+
+ Args:
+ remote_ip (str, optional): IP to check. Defaults to '10.11.99.1'.
+
+ Returns:
+ bool: True if reachable, False otherwise
+ """
+ self.logger.debug(f"Checking if {remote_ip} is reachable")
+ try:
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.settimeout(1)
+
+ sock.connect((remote_ip, 22))
+ sock.shutdown(2)
+
+ return True
+
+ except Exception:
+ self.logger.debug(f"Device {remote_ip} is not reachable")
+ return False
+
+ def connect_to_device(
+ self, remote_address=None, authentication=None
+ ) -> paramiko.client.SSHClient:
+ """Connects to the device using the given IP address
+
+ Args:
+ remote_address (str, optional): IP address of the device.
+ authentication (str, optional): Authentication credentials. Defaults to None.
+
+ Returns:
+ paramiko.client.SSHClient: SSH client object for the device.
+ """
+
+ if remote_address is None:
+ remote_address = self.get_remarkable_address()
+ self.address = remote_address # For future reference
+ else:
+ if self.check_is_address_reachable(remote_address) is False:
+ raise SystemError(f"Error: Device {remote_address} is not reachable!")
+
+ client = paramiko.client.SSHClient()
+ client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+
+ if authentication:
+ self.logger.debug(f"Using authentication: {authentication}")
+ try:
+ if os.path.isfile(authentication):
+ self.logger.debug(
+ f"Attempting to connect to {remote_address} with key file {authentication}"
+ )
+ client.connect(
+ remote_address, username="root", key_filename=authentication
+ )
+ else:
+ self.logger.debug(
+ f"Attempting to connect to {remote_address} with password {authentication}"
+ )
+ client.connect(
+ remote_address, username="root", password=authentication
+ )
+
+ except paramiko.ssh_exception.AuthenticationException:
+ print("Incorrect password or ssh path given in arguments!")
+
+ elif (
+ "n" in input("Would you like to use a password to connect? (Y/n): ").lower()
+ ):
+ while True:
+ key_path = input("Enter path to SSH key: ")
+
+ try:
+ self.logger.debug(
+ f"Attempting to connect to {remote_address} with key file {key_path}"
+ )
+ client.connect(
+ remote_address, username="root", key_filename=key_path
+ )
+ except Exception:
+ print("Error while connecting to device: {error}")
+
+ continue
+ break
+ else:
+ while True:
+ password = input("Enter RM SSH password: ")
+
+ try:
+ self.logger.debug(
+ f"Attempting to connect to {remote_address} with password {password}"
+ )
+ client.connect(remote_address, username="root", password=password)
+ except paramiko.ssh_exception.AuthenticationException:
+ print("Incorrect password given")
+
+ continue
+ break
+
+ print("Success: Connected to device")
+
+ return client
+
+ def _read_version_from_path(
+ self, ftp=None, base_path: str = ""
+ ) -> tuple[str, bool]:
+ """Reads version from a given path (current partition or mounted backup)
+
+ Args:
+ ftp: SFTP client connection (None for local file access)
+ base_path: Base path prefix (empty for current partition, /tmp/mount_pX for backup)
+
+ Returns:
+ tuple: (version_string, old_update_engine_boolean)
+ """
+ update_conf_path = (
+ f"{base_path}/usr/share/remarkable/update.conf"
+ if base_path
+ else "/usr/share/remarkable/update.conf"
+ )
+ os_release_path = (
+ f"{base_path}/etc/os-release" if base_path else "/etc/os-release"
+ )
+
+ if ftp:
+
+ def file_exists(path: str) -> bool:
+ try:
+ ftp.stat(path)
+ return True
+ except FileNotFoundError:
+ return False
+
+ def read_file(path: str) -> str:
+ with ftp.file(path) as file:
+ return file.read().decode("utf-8")
+ else:
+ file_exists = os.path.exists
+
+ def read_file(path: str) -> str:
+ with open(path, encoding="utf-8") as file:
+ return file.read()
+
+ if file_exists(update_conf_path):
+ contents = read_file(update_conf_path).strip("\n")
+ match = re.search("(?<=REMARKABLE_RELEASE_VERSION=).*", contents)
+ if match:
+ return match.group(), True
+ raise SystemError(
+ f"REMARKABLE_RELEASE_VERSION not found in {update_conf_path}"
+ )
+
+ if file_exists(os_release_path):
+ contents = read_file(os_release_path)
+ match = re.search("(?<=IMG_VERSION=).*", contents)
+ if match:
+ return match.group().strip('"'), False
+ raise SystemError(f"IMG_VERSION not found in {os_release_path}")
+
+ raise SystemError(
+ f"Cannot read version from {base_path or 'current partition'}: no version file found"
+ )
+
+ def _get_active_device(self) -> str:
+ """Gets the active root device path.
+
+ Returns:
+ str: Active device path (e.g., /dev/mmcblk2p2)
+
+ Raises:
+ SystemError: If command fails or returns no output
+ """
+ if self.hardware in (
+ HardwareType.RMPP,
+ HardwareType.RMPPM,
+ HardwareType.RMPPURE,
+ ):
+ cmd = "swupdate -g"
+ else:
+ cmd = "rootdev"
+
+ if self.client:
+ _stdin, stdout, stderr = self.client.exec_command(cmd)
+ output = stdout.read().decode("utf-8").strip()
+ exit_status = stdout.channel.recv_exit_status()
+ if exit_status != 0 or not output:
+ error = stderr.read().decode("utf-8", errors="ignore")
+ raise SystemError(
+ f"Failed to get active device using '{cmd}': {error or 'no output'}"
+ )
+ return output
+ else:
+ result = subprocess.run(cmd.split(), capture_output=True, text=True)
+ if result.returncode != 0 or not result.stdout.strip():
+ raise SystemError(
+ f"Failed to get active device using '{cmd}': {result.stderr or 'no output'}"
+ )
+ return result.stdout.strip()
+
+ def _parse_partition_info(self, active_device: str) -> tuple[int, int, str]:
+ """Parse partition numbers from device path.
+
+ Args:
+ active_device: Device path (e.g., /dev/mmcblk2p2)
+
+ Returns:
+ tuple: (active_part, inactive_part, device_base)
+ """
+ active_part = int(active_device.split("p")[-1])
+ inactive_part = 3 if active_part == 2 else 2
+ device_base = re.sub(r"p\d+$", "", active_device)
+ return active_part, inactive_part, device_base
+
+ def _get_backup_partition_version(self) -> str:
+ """Gets the version installed on the backup (inactive) partition
+
+ Returns:
+ str: Version string (empty string for RM1/RM2 on failure)
+
+ Raises:
+ SystemError: If backup partition version cannot be determined (Paper Pro only)
+ """
+ try:
+ active_device = self._get_active_device()
+ _, inactive_part, device_base = self._parse_partition_info(active_device)
+ mount_point = f"/tmp/mount_p{inactive_part}"
+
+ if self.client:
+ ftp = self.client.open_sftp()
+ self.client.exec_command(f"mkdir -p {mount_point}")
+ _stdin, stdout, _stderr = self.client.exec_command(
+ f"mount -o ro {device_base}p{inactive_part} {mount_point}"
+ )
+ exit_status = stdout.channel.recv_exit_status()
+
+ if exit_status != 0:
+ error_msg = _stderr.read().decode("utf-8")
+ raise SystemError(f"Failed to mount backup partition: {error_msg}")
+
+ try:
+ version, _ = self._read_version_from_path(ftp, mount_point)
+ return version
+ finally:
+ self.client.exec_command(f"umount {mount_point}")
+ self.client.exec_command(f"rm -rf {mount_point}")
+ else:
+ os.makedirs(mount_point, exist_ok=True)
+ result = subprocess.run(
+ [
+ "mount",
+ "-o",
+ "ro",
+ f"{device_base}p{inactive_part}",
+ mount_point,
+ ],
+ capture_output=True,
+ text=True,
+ )
+ if result.returncode != 0:
+ raise SystemError(
+ f"Failed to mount backup partition: {result.stderr}"
+ )
+
+ try:
+ version, _ = self._read_version_from_path(base_path=mount_point)
+ return version
+ finally:
+ subprocess.run(["umount", mount_point])
+ subprocess.run(["rm", "-rf", mount_point])
+ except SystemError:
+ if self.hardware in (
+ HardwareType.RMPP,
+ HardwareType.RMPPM,
+ HardwareType.RMPPURE,
+ ):
+ raise
+ return ""
+
+ def _get_paper_pro_partition_info(
+ self, current_version: str
+ ) -> tuple[int, int, int]:
+ """Gets partition information for Paper Pro devices
+
+ Args:
+ current_version: Current OS version string for version-aware detection
+
+ Returns:
+ tuple: (current_partition, inactive_partition, next_boot_partition)
+ """
+ active_device = self._get_active_device()
+ current_part, inactive_part, _ = self._parse_partition_info(active_device)
+
+ parts = current_version.split(".")
+ if len(parts) >= 2 and parts[0].isdigit() and parts[1].isdigit():
+ is_new_version = [int(parts[0]), int(parts[1])] >= [3, 22]
+ else:
+ raise SystemError(
+ f"Cannot detect partition scheme: unexpected version format '{current_version}'"
+ )
+
+ next_boot_part = current_part
+
+ if self.client:
+ ftp = self.client.open_sftp()
+
+ def file_exists(path: str) -> bool:
+ try:
+ ftp.stat(path)
+ return True
+ except FileNotFoundError:
+ return False
+
+ def read_file(path: str) -> str:
+ with ftp.file(path) as file:
+ return file.read().decode("utf-8")
+ else:
+ file_exists = os.path.exists
+
+ def read_file(path: str) -> str:
+ with open(path, encoding="utf-8") as file:
+ return file.read()
+
+ if is_new_version:
+ boot_part_path = "/sys/bus/mmc/devices/mmc0:0001/boot_part"
+ try:
+ if file_exists(boot_part_path):
+ boot_part_value = read_file(boot_part_path).strip()
+ next_boot_part = 2 if boot_part_value == "1" else 3
+ else:
+ is_new_version = False
+ except (IOError, OSError):
+ is_new_version = False
+
+ if not is_new_version:
+ root_part_path = "/sys/devices/platform/lpgpr/root_part"
+ try:
+ if file_exists(root_part_path):
+ root_part_value = read_file(root_part_path).strip()
+ next_boot_part = 2 if root_part_value == "a" else 3
+ except (IOError, OSError) as e:
+ self.logger.debug(f"Failed to read next boot partition: {e}")
+
+ return current_part, inactive_part, next_boot_part
+
+ def get_device_status(self) -> tuple[str | None, str, str, str, str]:
+ """Gets the status of the device
+
+ Returns:
+ tuple: Beta status, old_update_engine, current version, version_id, backup version (in that order)
+ """
+ old_update_engine = True
+ version_id = ""
+ beta_contents = ""
+
+ if self.client:
+ self.logger.debug("Connecting to FTP")
+ ftp = self.client.open_sftp()
+ self.logger.debug("Connected")
+
+ xochitl_version, old_update_engine = self._read_version_from_path(ftp)
+
+ def exists(path: str) -> bool:
+ try:
+ _ = ftp.stat(path)
+ return True
+ except (FileNotFoundError, IOError):
+ return False
+
+ if exists("/etc/version"):
+ with ftp.file("/etc/version") as file:
+ version_id = file.read().decode("utf-8").strip("\n")
+
+ if exists("/home/root/.config/remarkable/xochitl.conf"):
+ with ftp.file("/home/root/.config/remarkable/xochitl.conf") as file:
+ beta_contents = file.read().decode("utf-8")
+
+ else:
+ xochitl_version, old_update_engine = self._read_version_from_path()
+
+ if os.path.exists("/etc/version"):
+ with open("/etc/version", encoding="utf-8") as file:
+ version_id = file.read().rstrip()
+
+ if os.path.exists("/home/root/.config/remarkable/xochitl.conf"):
+ with open(
+ "/home/root/.config/remarkable/xochitl.conf", encoding="utf-8"
+ ) as file:
+ beta_contents = file.read().rstrip()
+
+ beta_possible = re.search("(?<=GROUP=).*", beta_contents)
+ beta = "Release"
+
+ if beta_possible is not None:
+ beta = re.search("(?<=GROUP=).*", beta_contents).group()
+
+ backup_version = self._get_backup_partition_version()
+
+ return beta, old_update_engine, xochitl_version, version_id, backup_version
+
+ def set_server_config(self, contents: str, server_host_name: str) -> str:
+ """Converts the contents given to point to the given server IP and port
+
+ Args:
+ contents (str): Contents of the update.conf file
+ server_host_name (str): Hostname of the server
+
+ Returns:
+ str: Converted contents
+ """
+ data_attributes = contents.split("\n")
+ line = 0
+
+ self.logger.debug(f"Contents are:\n{contents}")
+
+ for i in range(0, len(data_attributes)):
+ if data_attributes[i].startswith("[General]"):
+ self.logger.debug("Found [General] line")
+ line = i + 1
+ if not data_attributes[i].startswith("SERVER="):
+ continue
+
+ data_attributes[i] = f"#{data_attributes[i]}"
+ self.logger.debug(f"Using {data_attributes[i]}")
+
+ data_attributes.insert(line, f"SERVER={server_host_name}")
+ converted = "\n".join(data_attributes)
+
+ self.logger.debug(f"Converted contents are:\n{converted}")
+
+ return converted
+
+ def edit_update_conf(self, server_ip: str, server_port: str) -> bool:
+ """Edits the update.conf file to point to the given server IP and port
+
+ Args:
+ server_ip (str): IP of update server
+ server_port (str): Port of update service
+
+ Returns:
+ bool: True if successful, False otherwise
+ """
+ server_host_name = f"http://{server_ip}:{server_port}"
+ self.logger.debug(f"Hostname is: {server_host_name}")
+ try:
+ if not self.client:
+ self.logger.debug("Detected running on local device")
+ with open(
+ "/usr/share/remarkable/update.conf", encoding="utf-8"
+ ) as file:
+ modified_conf_version = self.set_server_config(
+ file.read(), server_host_name
+ )
+
+ with open("/usr/share/remarkable/update.conf", "w") as file:
+ file.write(modified_conf_version)
+
+ return True
+
+ self.logger.debug("Connecting to FTP")
+ ftp = self.client.open_sftp() # or ssh
+ self.logger.debug("Connected")
+
+ with ftp.file("/usr/share/remarkable/update.conf") as update_conf_file:
+ modified_conf_version = self.set_server_config(
+ update_conf_file.read().decode("utf-8"), server_host_name
+ )
+
+ with ftp.file("/usr/share/remarkable/update.conf", "w") as update_conf_file:
+ update_conf_file.write(modified_conf_version)
+
+ return True
+ except Exception as error:
+ self.logger.error(f"Error while editing update.conf: {error}")
+ return False
+
+ def restore_previous_version(self) -> None:
+ """Restores the previous version of the device"""
+
+ RESTORE_CODE = """/sbin/fw_setenv "upgrade_available" "1"
+/sbin/fw_setenv "bootcount" "0"
+
+OLDPART=$(/sbin/fw_printenv -n active_partition)
+if [ $OLDPART == "2" ]; then
+ NEWPART="3"
+else
+ NEWPART="2"
+fi
+echo "new: ${NEWPART}"
+echo "fallback: ${OLDPART}"
+
+/sbin/fw_setenv "fallback_partition" "${OLDPART}"
+/sbin/fw_setenv "active_partition" "${NEWPART}\""""
+
+ if self.hardware in (
+ HardwareType.RMPP,
+ HardwareType.RMPPM,
+ HardwareType.RMPPURE,
+ ):
+ _, _, current_version, _, backup_version = self.get_device_status()
+ current_part, inactive_part, _ = self._get_paper_pro_partition_info(
+ current_version
+ )
+
+ new_part_label = "a" if inactive_part == 2 else "b"
+
+ parts = current_version.split(".")
+ if len(parts) < 2 or not parts[0].isdigit() or not parts[1].isdigit():
+ raise SystemError(
+ f"Cannot restore: unexpected current version format '{current_version}'"
+ )
+
+ current_is_new = [int(parts[0]), int(parts[1])] >= [3, 22]
+
+ parts = backup_version.split(".")
+ if len(parts) < 2 or not parts[0].isdigit() or not parts[1].isdigit():
+ raise SystemError(
+ f"Cannot restore: unexpected backup version format '{backup_version}'"
+ )
+
+ target_is_new = [int(parts[0]), int(parts[1])] >= [3, 22]
+
+ code = [
+ "#!/bin/bash",
+ f"echo 'Switching from partition {current_part} to partition {inactive_part}'",
+ f"echo 'Current version: {current_version}'",
+ f"echo 'Target version: {backup_version}'",
+ ]
+
+ if not current_is_new:
+ code.extend(
+ [
+ f"echo '{new_part_label}' > /sys/devices/platform/lpgpr/root_part",
+ "echo 'Set next boot via sysfs (legacy method)'",
+ ]
+ )
+
+ if target_is_new or current_is_new:
+ code.extend(
+ [
+ f"mmc bootpart enable {inactive_part - 1} 0 /dev/mmcblk0boot{inactive_part - 2}",
+ "echo 'Set next boot via mmc bootpart (new method)'",
+ ]
+ )
+
+ code.extend(
+ [
+ f"echo '0' > /sys/devices/platform/lpgpr/root{new_part_label}_errcnt 2>/dev/null || true",
+ "echo 'Partition switch complete'",
+ ]
+ )
+
+ RESTORE_CODE = "\n".join(code)
+
+ if self.client:
+ self.logger.debug("Connecting to FTP")
+ ftp = self.client.open_sftp()
+ self.logger.debug("Connected")
+
+ with ftp.file("/tmp/restore.sh", "w") as file:
+ file.write(RESTORE_CODE)
+
+ self.logger.debug("Setting permissions and running restore.sh")
+
+ self.client.exec_command("chmod +x /tmp/restore.sh")
+ self.client.exec_command("bash /tmp/restore.sh")
+ else:
+ with open("/tmp/restore.sh", "w") as file:
+ file.write(RESTORE_CODE)
+
+ self.logger.debug("Setting permissions and running restore.sh")
+
+ os.system("chmod +x /tmp/restore.sh")
+ os.system("/tmp/restore.sh")
+
+ self.logger.debug("Restore script ran")
+
+ def reboot_device(self) -> None:
+ REBOOT_CODE = """
+if systemctl is-active --quiet tarnish.service; then
+ rot system call reboot
+else
+ systemctl reboot
+fi
+"""
+ if self.client:
+ self.logger.debug("Connecting to FTP")
+ ftp = self.client.open_sftp()
+ self.logger.debug("Connected")
+ with ftp.file("/tmp/reboot.sh", "w") as file:
+ file.write(REBOOT_CODE)
+
+ self.logger.debug("Running reboot.sh")
+ self.client.exec_command("sh /tmp/reboot.sh")
+
+ else:
+ with open("/tmp/reboot.sh", "w") as file:
+ file.write(REBOOT_CODE)
+
+ self.logger.debug("Running reboot.sh")
+ os.system("sh /tmp/reboot.sh")
+
+ self.logger.debug("Device rebooted")
+
+ def install_sw_update(
+ self, version_file: str, bootloader_files: dict[str, bytes] | None = None
+ ) -> None:
+ """
+ Installs new version from version file path, utilising swupdate
+
+ Args:
+ version_file (str): Path to img file
+ bootloader_files (dict[str, bytes] | None): Bootloader files for Paper Pro downgrade
+
+ Raises:
+ SystemExit: If there was an error installing the update
+
+ """
+ if self.client:
+ ftp_client = self.client.open_sftp()
+
+ print(f"Uploading {version_file} image")
+
+ out_location = f"/tmp/{os.path.basename(version_file)}.swu"
+ ftp_client.put(
+ version_file, out_location, callback=self.output_put_progress
+ )
+
+ print("\nDone! Running swupdate (PLEASE BE PATIENT, ~5 MINUTES)")
+
+ command = f"bash -c 'source /usr/lib/swupdate/conf.d/09-swupdate-args && swupdate $SWUPDATE_ARGS -i {shlex.quote(out_location)}'"
+ self.logger.debug(command)
+ _stdin, stdout, _stderr = self.client.exec_command(command)
+
+ exit_status = stdout.channel.recv_exit_status()
+
+ if exit_status != 0:
+ print("".join(_stderr.readlines()))
+ raise SystemError("Update failed!")
+
+ if bootloader_files:
+ print("\nApplying bootloader update...")
+ self._update_paper_pro_bootloader(
+ bootloader_files["update-bootloader.sh"],
+ bootloader_files["imx-boot"],
+ )
+ print("✓ Bootloader update completed")
+
+ print("Done! Now rebooting the device and disabling update service")
+
+ #### Now disable automatic updates
+
+ self.client.exec_command("sleep 1 && reboot") # Should be enough
+ self.client.close()
+
+ time.sleep(
+ 2
+ ) # Somehow the code runs faster than the time it takes for the device to reboot
+
+ print("Trying to connect to device")
+
+ while not self.check_is_address_reachable(self.address):
+ time.sleep(1)
+
+ self.client = self.connect_to_device(
+ remote_address=self.address, authentication=self.authentication
+ )
+
+ self.client.exec_command("systemctl stop swupdate memfaultd")
+
+ print(
+ "Update complete and update service disabled, restart device to enable it"
+ )
+
+ else:
+ print("Running swupdate")
+ command = [
+ "bash",
+ "-c",
+ f"source /usr/lib/swupdate/conf.d/09-swupdate-args && swupdate $SWUPDATE_ARGS -i {shlex.quote(version_file)}",
+ ]
+ self.logger.debug(command)
+
+ try:
+ output = subprocess.check_output(
+ command,
+ stderr=subprocess.STDOUT,
+ text=True,
+ env={"PATH": "/bin:/usr/bin:/sbin:/usr/sbin"},
+ )
+ self.logger.debug(f"Stdout of swupdate: {output}")
+ except subprocess.CalledProcessError as e:
+ print(e.output)
+ raise SystemError("Update failed")
+
+ print("Update complete and device rebooting")
+ os.system("reboot")
+
+ def _update_paper_pro_bootloader(
+ self, bootloader_script: bytes, imx_boot: bytes
+ ) -> None:
+ """
+ Update bootloader on Paper Pro device for 3.22+ -> <3.22 downgrades.
+
+ This method uploads the bootloader script and image to the device,
+ then runs the update script twice (preinst and postinst) to update
+ both boot partitions.
+
+ Args:
+ bootloader_script: Contents of update-bootloader.sh
+ imx_boot: Contents of imx-boot image file
+
+ Raises:
+ SystemError: If bootloader update fails
+ """
+ self.logger.info("Starting bootloader update for Paper Pro")
+
+ if not self.client:
+ raise SystemError("No SSH connection to device")
+
+ ftp_client = None
+ try:
+ ftp_client = self.client.open_sftp()
+ except Exception:
+ raise SystemError("Failed to open SFTP connection for bootloader update")
+
+ script_path = "/tmp/update-bootloader.sh"
+ boot_image_path = "/tmp/imx-boot"
+
+ with tempfile.NamedTemporaryFile(
+ mode="wb", delete=False, suffix=".sh"
+ ) as tmp_script:
+ tmp_script.write(bootloader_script)
+ tmp_script_path = tmp_script.name
+
+ with tempfile.NamedTemporaryFile(
+ mode="wb", delete=False, suffix=".img"
+ ) as tmp_boot:
+ tmp_boot.write(imx_boot)
+ tmp_boot_path = tmp_boot.name
+
+ try:
+ self.logger.debug("Uploading bootloader script to device")
+ ftp_client.put(tmp_script_path, script_path)
+
+ self.logger.debug("Uploading imx-boot image to device")
+ ftp_client.put(tmp_boot_path, boot_image_path)
+
+ self.logger.debug("Making bootloader script executable")
+ _stdin, stdout, _stderr = self.client.exec_command(
+ f"chmod +x {script_path}"
+ )
+ stdout.channel.recv_exit_status()
+
+ self.logger.info("Running bootloader update script (preinst)")
+ _stdin, stdout, stderr = self.client.exec_command(
+ f"{script_path} preinst {boot_image_path}"
+ )
+ exit_status = stdout.channel.recv_exit_status()
+ if exit_status != 0:
+ error_msg = "".join(stderr.readlines())
+ raise SystemError(f"Bootloader preinst failed: {error_msg}")
+
+ self.logger.info("Running bootloader update script (postinst)")
+ _stdin, stdout, stderr = self.client.exec_command(
+ f"{script_path} postinst {boot_image_path}"
+ )
+ exit_status = stdout.channel.recv_exit_status()
+ if exit_status != 0:
+ error_msg = "".join(stderr.readlines())
+ raise SystemError(f"Bootloader postinst failed: {error_msg}")
+
+ self.logger.info("Bootloader update completed successfully")
+
+ finally:
+ self.logger.debug("Cleaning up temporary bootloader files on device")
+ self.client.exec_command(f"rm -f {script_path} {boot_image_path}")
+
+ self.logger.debug("Cleaning up local temporary files")
+ os.unlink(tmp_script_path)
+ os.unlink(tmp_boot_path)
+ if ftp_client:
+ ftp_client.close()
+
+ def install_ohma_update(self, version_available: dict) -> None:
+ """Installs version from update folder on the device
+
+ Args:
+ version_available (dict): Version available for installation from `get_available_version`
+
+ Raises:
+ SystemExit: If there was an error installing the update
+ """
+
+ server_host = self.get_host_address()
+
+ self.logger.debug("Editing config file")
+
+ if (
+ self.edit_update_conf(server_ip=server_host, server_port=8085) is False
+ ): # We want a port that probably isn't being used
+ self.logger.error("Error while editing update.conf")
+
+ return
+
+ thread = threading.Thread(
+ target=startUpdate, args=(version_available, server_host, 8085), daemon=True
+ )
+ thread.start()
+
+ self.logger.debug("Thread started")
+
+ if self.client:
+ print("Checking if device can connect to this machine")
+
+ _stdin, stdout, _stderr = self.client.exec_command(
+ f"sleep 2 && echo | nc {server_host} 8085"
+ )
+ check = stdout.channel.recv_exit_status()
+ self.logger.debug(f"Stdout of nc checking: {stdout.readlines()}")
+
+ if check != 0:
+ raise SystemError(
+ "Device cannot connect to this machine! Is the firewall blocking connections?"
+ )
+
+ print("Starting update service on device")
+
+ self.client.exec_command("systemctl start update-engine")
+
+ _stdin, stdout, _stderr = self.client.exec_command(
+ "/usr/bin/update_engine_client -update"
+ )
+ exit_status = stdout.channel.recv_exit_status()
+
+ if exit_status != 0:
+ print("".join(_stderr.readlines()))
+ raise SystemError("There was an error updating :(")
+
+ self.logger.debug(
+ f"Stdout of update checking service is {''.join(_stderr.readlines())}"
+ )
+
+ #### Now disable automatic updates
+
+ print("Done! Now rebooting the device and disabling update service")
+
+ self.client.exec_command("sleep 1 && reboot") # Should be enough
+ self.client.close()
+
+ time.sleep(
+ 2
+ ) # Somehow the code runs faster than the time it takes for the device to reboot
+
+ print("Trying to connect to device")
+
+ while not self.check_is_address_reachable(self.address):
+ time.sleep(1)
+
+ self.client = self.connect_to_device(
+ remote_address=self.address, authentication=self.authentication
+ )
+ self.client.exec_command("systemctl stop update-engine")
+
+ print(
+ "Update complete and update service disabled. Restart device to enable it"
+ )
+
+ else:
+ print("Enabling update service")
+
+ subprocess.run(
+ ["/bin/systemctl", "start", "update-engine"],
+ text=True,
+ check=True,
+ env={"PATH": "/bin:/usr/bin:/sbin"},
+ )
+
+ with subprocess.Popen(
+ ["/usr/bin/update_engine_client", "-update"],
+ text=True,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ env={"PATH": "/bin:/usr/bin:/sbin"},
+ ) as process:
+ if process.wait() != 0:
+ print("".join(process.stderr.readlines()))
+
+ raise SystemError("There was an error updating :(")
+
+ self.logger.debug(
+ f"Stdout of update checking service is {''.join(process.stderr.readlines())}"
+ )
+
+ print("Update complete and device rebooting")
+ os.system("reboot")
+
+ @staticmethod
+ def output_put_progress(transferred: int, toBeTransferred: int) -> None:
+ """Used for displaying progress for paramiko ftp.put function"""
+
+ print(
+ f"Transferring progress{int((transferred / toBeTransferred) * 100)}%",
+ end="\r",
+ )
diff --git a/codexctl/server.py b/codexctl/server.py
index 766d8f7..46072a4 100644
--- a/codexctl/server.py
+++ b/codexctl/server.py
@@ -1,172 +1,172 @@
-from http.server import HTTPServer, SimpleHTTPRequestHandler
-import xml.etree.ElementTree as ET
-import os
-import hashlib
-import binascii
-
-response_ok = """
-
-
-
-
-
-
-
-"""
-
-response_template = """
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-"""
-
-
-def getupdateinfo(platform, version, update_name):
- full_path = os.path.join("updates", update_name)
-
- update_size = str(os.path.getsize(full_path))
-
- BUF_SIZE = 8192
-
- sha1 = hashlib.sha1()
- sha256 = hashlib.sha256()
- with open(full_path, "rb") as f:
- while True:
- data = f.read(BUF_SIZE)
- if not data:
- break
- sha1.update(data)
- sha256.update(data)
- update_sha1 = binascii.b2a_base64(sha1.digest(), newline=False).decode()
- update_sha256 = binascii.b2a_base64(sha256.digest(), newline=False).decode()
- return (update_sha1, update_sha256, update_size)
-
-
-def get_available_version(version):
- available_versions = scanUpdates()
-
- for device, ids in available_versions.items():
- if version in ids:
- available_version = {device: ids}
-
- return available_version
-
-
-def scanUpdates():
- files = os.listdir("updates")
- versions = {}
-
- for f in files:
- p = f.split("_")
- if len(p) != 2:
- continue
- t = p[1].split(".")
- if len(t) != 2:
- continue
-
- z = t[0].split("-")
-
- version = p[0]
- # print(version)
- product = z[0]
-
- if product not in versions or versions[product][0] < version:
- versions[product] = (version, f)
-
- return versions
-
-
-class MySimpleHTTPRequestHandler(SimpleHTTPRequestHandler):
- def do_POST(self):
- length = int(self.headers.get("Content-Length"))
- body = self.rfile.read(length).decode("utf-8")
- # print(body)
- print("Updating...")
- xml = ET.fromstring(body)
- updatecheck_node = xml.find("app/updatecheck")
-
- # check for update
- if updatecheck_node is not None:
- version = xml.attrib["version"]
- platform = xml.find("os").attrib["platform"]
- print("requested: ", version)
- print("platform: ", platform)
-
- version, update_name = available_versions[platform]
-
- update_sha1, update_sha256, update_size = getupdateinfo(
- platform, version, update_name
- )
- params = {
- "version": version,
- "update_name": f"updates/{update_name}",
- "update_sha1": update_sha1,
- "update_sha256": update_sha256,
- "update_size": update_size,
- "codebase_url": host_url,
- }
-
- response = response_template.format(**params)
- # print("Response:")
- # print(response)
- self.send_response(200)
- self.end_headers()
- self.wfile.write(response.encode())
- return
-
- event_node = xml.find("app/event")
- event_type = int(event_node.attrib["eventtype"])
- event_result = int(event_node.attrib["eventresult"])
-
- # post install status
- if event_result != 0:
- print("Update downloaded, please wait for device to install...")
- if "errorcode" in event_node.attrib:
- print("With errorcode:", event_node.attrib["errorcode"])
- return
-
- # update done
- if event_type == 14:
- print("OK Response:")
- print(response_ok)
- self.send_response(200)
- self.end_headers()
- self.wfile.write(response_ok.encode())
- return
-
-
-def startUpdate(versionsGiven, host, port=8080):
- global available_versions
- global host_url # I am aware globals are generally bad practice, but this is a quick and dirty solution
-
- host_url = f"http://{host}:{port}/"
- available_versions = versionsGiven
-
- if not available_versions:
- raise FileNotFoundError("Could not find any update files")
-
- handler = MySimpleHTTPRequestHandler
- print(f"Starting fake updater at {host}:{port}")
- try:
- httpd = HTTPServer((host, port), handler)
- except OSError:
- print("Error: Could not start fake updater. Is the port already in use?")
- return
- httpd.serve_forever()
+from http.server import HTTPServer, SimpleHTTPRequestHandler
+import xml.etree.ElementTree as ET
+import os
+import hashlib
+import binascii
+
+response_ok = """
+
+
+
+
+
+
+
+"""
+
+response_template = """
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+"""
+
+
+def getupdateinfo(platform, version, update_name):
+ full_path = os.path.join("updates", update_name)
+
+ update_size = str(os.path.getsize(full_path))
+
+ BUF_SIZE = 8192
+
+ sha1 = hashlib.sha1()
+ sha256 = hashlib.sha256()
+ with open(full_path, "rb") as f:
+ while True:
+ data = f.read(BUF_SIZE)
+ if not data:
+ break
+ sha1.update(data)
+ sha256.update(data)
+ update_sha1 = binascii.b2a_base64(sha1.digest(), newline=False).decode()
+ update_sha256 = binascii.b2a_base64(sha256.digest(), newline=False).decode()
+ return (update_sha1, update_sha256, update_size)
+
+
+def get_available_version(version):
+ available_versions = scanUpdates()
+
+ for device, ids in available_versions.items():
+ if version in ids:
+ available_version = {device: ids}
+
+ return available_version
+
+
+def scanUpdates():
+ files = os.listdir("updates")
+ versions = {}
+
+ for f in files:
+ p = f.split("_")
+ if len(p) != 2:
+ continue
+ t = p[1].split(".")
+ if len(t) != 2:
+ continue
+
+ z = t[0].split("-")
+
+ version = p[0]
+ # print(version)
+ product = z[0]
+
+ if product not in versions or versions[product][0] < version:
+ versions[product] = (version, f)
+
+ return versions
+
+
+class MySimpleHTTPRequestHandler(SimpleHTTPRequestHandler):
+ def do_POST(self):
+ length = int(self.headers.get("Content-Length"))
+ body = self.rfile.read(length).decode("utf-8")
+ # print(body)
+ print("Updating...")
+ xml = ET.fromstring(body)
+ updatecheck_node = xml.find("app/updatecheck")
+
+ # check for update
+ if updatecheck_node is not None:
+ version = xml.attrib["version"]
+ platform = xml.find("os").attrib["platform"]
+ print("requested: ", version)
+ print("platform: ", platform)
+
+ version, update_name = available_versions[platform]
+
+ update_sha1, update_sha256, update_size = getupdateinfo(
+ platform, version, update_name
+ )
+ params = {
+ "version": version,
+ "update_name": f"updates/{update_name}",
+ "update_sha1": update_sha1,
+ "update_sha256": update_sha256,
+ "update_size": update_size,
+ "codebase_url": host_url,
+ }
+
+ response = response_template.format(**params)
+ # print("Response:")
+ # print(response)
+ self.send_response(200)
+ self.end_headers()
+ self.wfile.write(response.encode())
+ return
+
+ event_node = xml.find("app/event")
+ event_type = int(event_node.attrib["eventtype"])
+ event_result = int(event_node.attrib["eventresult"])
+
+ # post install status
+ if event_result != 0:
+ print("Update downloaded, please wait for device to install...")
+ if "errorcode" in event_node.attrib:
+ print("With errorcode:", event_node.attrib["errorcode"])
+ return
+
+ # update done
+ if event_type == 14:
+ print("OK Response:")
+ print(response_ok)
+ self.send_response(200)
+ self.end_headers()
+ self.wfile.write(response_ok.encode())
+ return
+
+
+def startUpdate(versionsGiven, host, port=8080):
+ global available_versions
+ global host_url # I am aware globals are generally bad practice, but this is a quick and dirty solution
+
+ host_url = f"http://{host}:{port}/"
+ available_versions = versionsGiven
+
+ if not available_versions:
+ raise FileNotFoundError("Could not find any update files")
+
+ handler = MySimpleHTTPRequestHandler
+ print(f"Starting fake updater at {host}:{port}")
+ try:
+ httpd = HTTPServer((host, port), handler)
+ except OSError:
+ print("Error: Could not start fake updater. Is the port already in use?")
+ return
+ httpd.serve_forever()
diff --git a/codexctl/sync.py b/codexctl/sync.py
index 5d80d37..95067c5 100644
--- a/codexctl/sync.py
+++ b/codexctl/sync.py
@@ -1,243 +1,246 @@
-import glob
-import logging
-import os
-import time
-
-import requests
-
-
-class RmWebInterfaceAPI: # TODO: Add docstrings
- def __init__(self, BASE="http://10.11.99.1/", logger=None):
- self.logger = logger
-
- if self.logger is None:
- self.logger = logging
-
- self.BASE = BASE
- self.ID_ATTRIBUTE = "ID"
- self.NAME_ATTRIBUTE = "VissibleName"
- self.MTIME_ATTRIBUTE = "ModifiedClient"
-
- self.logger.debug(f"Base is: {BASE}")
-
- def __POST(self, endpoint, data={}, fileUpload=False):
- try:
- logging.debug(
- f"Sending POST request to {self.BASE + endpoint} with data {data}"
- )
-
- if fileUpload:
- result = requests.post(self.BASE + endpoint, files=data)
- else:
- result = requests.post(self.BASE + endpoint, data=data)
-
- if result.status_code == 408:
- self.logger.error("Request timed out!")
-
- logging.debug(f"Result headers: {result.headers}")
- if "application/json" in result.headers["Content-Type"]:
- return result.json()
- return result.content
-
- except Exception:
- return None
-
- def __get_documents_recursive(
- self, folderId="", currentLocation="", currentDocuments=[]
- ):
- data = self.__POST(f"documents/{folderId}")
-
- for item in data:
- self.logger.debug(f"Checking item: {item}")
-
- if "fileType" in item:
- item["location"] = currentLocation
- currentDocuments.append(item)
- else:
- self.logger.debug(
- f"Getting documents over {item[self.ID_ATTRIBUTE]}, current location is {currentLocation}/{item[self.NAME_ATTRIBUTE]}"
- )
- self.__get_documents_recursive(
- item[self.ID_ATTRIBUTE],
- f"{currentLocation}/{item[self.NAME_ATTRIBUTE]}",
- currentDocuments,
- )
-
- return currentDocuments
-
- def __get_folder_id(self, folderName, _from=""):
- results = self.__POST(f"documents/{_from}")
-
- if results is None:
- return None
-
- results.reverse() # We only want folders
-
- for data in results:
- self.logger.debug(f"Folder: {data}")
-
- if "fileType" in data:
- return None
-
- if data[self.NAME_ATTRIBUTE].strip() == folderName.strip():
- return data[self.ID_ATTRIBUTE]
-
- self.logger.debug(
- f"Getting folders over {folderName}, {data[self.ID_ATTRIBUTE]}"
- )
-
- recursiveResults = self.__get_folder_id(folderName, data[self.ID_ATTRIBUTE])
- if recursiveResults is None:
- continue
- else:
- return recursiveResults
-
- def __get_docs(self, folderName="", recursive=True):
- folderId = ""
-
- if folderName:
- folderId = self.__get_folder_id(folderName)
-
- if folderId is None:
- return {}
-
- if recursive:
- self.logger.debug(f"Calling recursive function on {folderName}")
- return self.__get_documents_recursive(
- folderId=folderId, currentLocation=folderName
- )
-
- data = self.__POST(f"documents/{folderId}")
-
- for item in data:
- item["location"] = ""
-
- return [item for item in data if "fileType" in item]
-
- def download(self, document, location="", overwrite=False, incremental=False):
- filename = document[self.NAME_ATTRIBUTE]
- if "/" in filename:
- filename = filename.replace("/", "_")
-
- self.logger.debug(f"Downloading {filename}, location {location}")
-
- if not os.path.exists(location):
- self.logger.debug("Download folder does not exist, creating it")
- os.makedirs(location)
-
- try:
- fileLocation = f"{location}/{filename}.pdf"
- isFile = os.path.isfile(fileLocation)
-
- if isFile and not overwrite:
- self.logger.debug("Not overwriting file")
- return True
-
- if isFile and incremental and not self.__is_newer(document, fileLocation):
- self.logger.debug("Local file already exists and is newer, skipping")
- return True
-
- binaryData = self.__POST(
- f"download/{document[self.ID_ATTRIBUTE]}/placeholder"
- )
-
- if isinstance(binaryData, dict):
- print(f"Error trying to download {filename}: {binaryData}")
- return False
-
- with open(fileLocation, "wb") as outFile:
- outFile.write(binaryData)
-
- return True
-
- except Exception as error:
- print(f"Error trying to download {filename}: {error}")
- return False
-
- def __is_newer(self, document, fileLocation):
- remote_ts = document[self.MTIME_ATTRIBUTE]
-
- local_mtime = os.path.getmtime(fileLocation)
- local_ts = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(local_mtime))
-
- return remote_ts > local_ts
-
- def upload(self, input_paths, remoteFolder):
- folderId = ""
- if remoteFolder:
- folderId = self.__get_folder_id(remoteFolder)
-
- if folderId is None:
- raise SystemError(f"Error: Folder {remoteFolder} does not exist!")
-
- self.__POST(f"documents/{folderId}") # Setting up for upload...
-
- errors, documents = [], []
-
- for document in input_paths: # This needs improvement...
- if os.path.isdir(document):
- for file in glob.glob(f"{document}/*"):
- if not file.endswith(".pdf"):
- self.logger.error(f"Error: {document} is not a pdf!")
- else:
- documents.append(file)
- elif os.path.isfile(document):
- if not document.endswith(".pdf"):
- errors.append(document)
- self.logger.error(f"Error: {document} is not a pdf!")
- else:
- documents.append(document)
- else:
- errors.append(document)
- self.logger.error(f"Error: {document} is not a file or directory!")
-
- for document in documents:
- self.logger.debug(
- f"Uploading {document} to {remoteFolder if remoteFolder else 'root'}"
- )
- with open(document, "rb") as inFile:
- response = self.__POST("upload", data={"file": inFile}, fileUpload=True)
-
- if response is None:
- self.logger.error(
- f"Error: Unknown error while uploading {document}!"
- )
- errors.append(document)
- elif response == {"status": "Upload successful"}:
- self.logger.debug(f"Uploaded {document} successfully!")
-
- if len(errors) > 0:
- print("The following files failed to upload: " + ",".join(errors))
-
- print(f"Done! {len(documents)-len(errors)} files were uploaded.")
-
- def sync(
- self,
- localFolder,
- remoteFolder="",
- overwrite=False,
- incremental=False,
- recursive=True,
- ):
- count = 0
-
- if not os.path.exists(localFolder):
- self.logger.debug("Local folder does not exist, creating it")
- os.mkdir(localFolder)
-
- documents = self.__get_docs(remoteFolder, recursive)
-
- if documents == {}:
- print("No documents were found!")
-
- else:
- for doc in documents:
- self.logger.debug(f"Processing {doc}")
- count += 1
- self.download(
- document=doc,
- location=f"{localFolder}/{doc['location']}",
- overwrite=overwrite,
- incremental=incremental,
- )
- print(f"Done! {count} files were exported.")
+import glob
+import logging
+import os
+import time
+
+import requests
+
+
+class RmWebInterfaceAPI: # TODO: Add docstrings
+ def __init__(self, BASE="http://10.11.99.1/", logger=None):
+ self.logger = logger
+
+ if self.logger is None:
+ self.logger = logging
+
+ self.BASE = BASE
+ self.ID_ATTRIBUTE = "ID"
+ self.NAME_ATTRIBUTE = "VissibleName"
+ self.MTIME_ATTRIBUTE = "ModifiedClient"
+
+ self.logger.debug(f"Base is: {BASE}")
+
+ def __POST(self, endpoint, data={}, fileUpload=False):
+ try:
+ logging.debug(
+ f"Sending POST request to {self.BASE + endpoint} with data {data}"
+ )
+
+ if fileUpload:
+ result = requests.post(self.BASE + endpoint, files=data)
+ else:
+ result = requests.post(self.BASE + endpoint, data=data)
+
+ if result.status_code == 408:
+ self.logger.error("Request timed out!")
+
+ logging.debug(f"Result headers: {result.headers}")
+ if "application/json" in result.headers["Content-Type"]:
+ return result.json()
+ return result.content
+
+ except Exception:
+ return None
+
+ def __get_documents_recursive(
+ self, folderId="", currentLocation="", currentDocuments=None
+ ):
+ if currentDocuments is None:
+ currentDocuments = []
+
+ data = self.__POST(f"documents/{folderId}")
+
+ for item in data:
+ self.logger.debug(f"Checking item: {item}")
+
+ if "fileType" in item:
+ item["location"] = currentLocation
+ currentDocuments.append(item)
+ else:
+ self.logger.debug(
+ f"Getting documents over {item[self.ID_ATTRIBUTE]}, current location is {currentLocation}/{item[self.NAME_ATTRIBUTE]}"
+ )
+ self.__get_documents_recursive(
+ item[self.ID_ATTRIBUTE],
+ f"{currentLocation}/{item[self.NAME_ATTRIBUTE]}",
+ currentDocuments,
+ )
+
+ return currentDocuments
+
+ def __get_folder_id(self, folderName, _from=""):
+ results = self.__POST(f"documents/{_from}")
+
+ if results is None:
+ return None
+
+ results.reverse() # We only want folders
+
+ for data in results:
+ self.logger.debug(f"Folder: {data}")
+
+ if "fileType" in data:
+ return None
+
+ if data[self.NAME_ATTRIBUTE].strip() == folderName.strip():
+ return data[self.ID_ATTRIBUTE]
+
+ self.logger.debug(
+ f"Getting folders over {folderName}, {data[self.ID_ATTRIBUTE]}"
+ )
+
+ recursiveResults = self.__get_folder_id(folderName, data[self.ID_ATTRIBUTE])
+ if recursiveResults is None:
+ continue
+ else:
+ return recursiveResults
+
+ def __get_docs(self, folderName="", recursive=True):
+ folderId = ""
+
+ if folderName:
+ folderId = self.__get_folder_id(folderName)
+
+ if folderId is None:
+ return {}
+
+ if recursive:
+ self.logger.debug(f"Calling recursive function on {folderName}")
+ return self.__get_documents_recursive(
+ folderId=folderId, currentLocation=folderName
+ )
+
+ data = self.__POST(f"documents/{folderId}")
+
+ for item in data:
+ item["location"] = ""
+
+ return [item for item in data if "fileType" in item]
+
+ def download(self, document, location="", overwrite=False, incremental=False):
+ filename = document[self.NAME_ATTRIBUTE]
+ if "/" in filename:
+ filename = filename.replace("/", "_")
+
+ self.logger.debug(f"Downloading {filename}, location {location}")
+
+ if not os.path.exists(location):
+ self.logger.debug("Download folder does not exist, creating it")
+ os.makedirs(location)
+
+ try:
+ fileLocation = f"{location}/{filename}.pdf"
+ isFile = os.path.isfile(fileLocation)
+
+ if isFile and not overwrite:
+ self.logger.debug("Not overwriting file")
+ return True
+
+ if isFile and incremental and not self.__is_newer(document, fileLocation):
+ self.logger.debug("Local file already exists and is newer, skipping")
+ return True
+
+ binaryData = self.__POST(
+ f"download/{document[self.ID_ATTRIBUTE]}/placeholder"
+ )
+
+ if isinstance(binaryData, dict):
+ print(f"Error trying to download {filename}: {binaryData}")
+ return False
+
+ with open(fileLocation, "wb") as outFile:
+ outFile.write(binaryData)
+
+ return True
+
+ except Exception as error:
+ print(f"Error trying to download {filename}: {error}")
+ return False
+
+ def __is_newer(self, document, fileLocation):
+ remote_ts = document[self.MTIME_ATTRIBUTE]
+
+ local_mtime = os.path.getmtime(fileLocation)
+ local_ts = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(local_mtime))
+
+ return remote_ts > local_ts
+
+ def upload(self, input_paths, remoteFolder):
+ folderId = ""
+ if remoteFolder:
+ folderId = self.__get_folder_id(remoteFolder)
+
+ if folderId is None:
+ raise SystemError(f"Error: Folder {remoteFolder} does not exist!")
+
+ self.__POST(f"documents/{folderId}") # Setting up for upload...
+
+ errors, documents = [], []
+
+ for document in input_paths: # This needs improvement...
+ if os.path.isdir(document):
+ for file in glob.glob(f"{document}/*"):
+ if not file.endswith(".pdf"):
+ self.logger.error(f"Error: {document} is not a pdf!")
+ else:
+ documents.append(file)
+ elif os.path.isfile(document):
+ if not document.endswith(".pdf"):
+ errors.append(document)
+ self.logger.error(f"Error: {document} is not a pdf!")
+ else:
+ documents.append(document)
+ else:
+ errors.append(document)
+ self.logger.error(f"Error: {document} is not a file or directory!")
+
+ for document in documents:
+ self.logger.debug(
+ f"Uploading {document} to {remoteFolder if remoteFolder else 'root'}"
+ )
+ with open(document, "rb") as inFile:
+ response = self.__POST("upload", data={"file": inFile}, fileUpload=True)
+
+ if response is None:
+ self.logger.error(
+ f"Error: Unknown error while uploading {document}!"
+ )
+ errors.append(document)
+ elif response == {"status": "Upload successful"}:
+ self.logger.debug(f"Uploaded {document} successfully!")
+
+ if len(errors) > 0:
+ print("The following files failed to upload: " + ",".join(errors))
+
+ print(f"Done! {len(documents) - len(errors)} files were uploaded.")
+
+ def sync(
+ self,
+ localFolder,
+ remoteFolder="",
+ overwrite=False,
+ incremental=False,
+ recursive=True,
+ ):
+ count = 0
+
+ if not os.path.exists(localFolder):
+ self.logger.debug("Local folder does not exist, creating it")
+ os.mkdir(localFolder)
+
+ documents = self.__get_docs(remoteFolder, recursive)
+
+ if documents == {}:
+ print("No documents were found!")
+
+ else:
+ for doc in documents:
+ self.logger.debug(f"Processing {doc}")
+ count += 1
+ self.download(
+ document=doc,
+ location=f"{localFolder}/{doc['location']}",
+ overwrite=overwrite,
+ incremental=incremental,
+ )
+ print(f"Done! {count} files were exported.")
diff --git a/codexctl/updates.py b/codexctl/updates.py
index 80debc7..21500c3 100644
--- a/codexctl/updates.py
+++ b/codexctl/updates.py
@@ -1,436 +1,459 @@
-import os
-import requests
-import uuid
-import sys
-import json
-import hashlib
-import logging
-
-from pathlib import Path
-from datetime import datetime
-
-import xml.etree.ElementTree as ET
-
-from .device import HardwareType
-
-
-class UpdateManager:
- def __init__(self, logger=None) -> None:
- """Manager for downloading update versions
-
- Args:
- logger (logger, optional): Logger object for logging. Defaults to None.
- """
-
- self.logger = logger
-
- if self.logger is None:
- self.logger = logging
-
- (
- self.remarkablepp_versions,
- self.remarkableppm_versions,
- self.remarkable2_versions,
- self.remarkable1_versions,
- self.external_provider_urls,
- ) = self.get_remarkable_versions()
-
- def get_remarkable_versions(self) -> tuple[dict, dict, dict, dict, list]:
- """Gets the avaliable versions for the device, by checking the local version-ids.json file and then updating it if necessary
-
- Returns:
- tuple: A tuple containing the version ids for the remarkablepp, remarkableppm, remarkable2, remarkable1, and external provider urls (in that order)
- """
-
- if os.path.exists("data/version-ids.json"):
- file_location = "data/version-ids.json"
-
- self.logger.debug("Found version-ids at data/version-ids.json")
-
- else:
- if os.name == "nt": # Windows
- folder_location = os.getenv("APPDATA") + "/codexctl"
- elif os.name in ("posix", "darwin"): # Linux or MacOS
- folder_location = os.path.expanduser("~/.config/codexctl")
- else:
- raise SystemError("Unsupported OS")
-
- self.logger.debug(f"Version config folder location is {folder_location}")
- if not os.path.exists(folder_location):
- os.makedirs(folder_location, exist_ok=True)
-
- file_location = folder_location + "/version-ids.json"
-
- if not os.path.exists(file_location):
- self.update_version_ids(file_location)
-
- try:
- with open(file_location) as f:
- contents = json.load(f)
- except ValueError:
- raise SystemError(
- f"Version-ids.json @ {file_location} is corrupted! Please delete it and try again. Also, PLEASE open an issue on the repo showing the contents of the file."
- )
-
- if (
- int(datetime.now().timestamp()) - contents["last-updated"]
- > 5256000 # 2 months
- ):
- self.update_version_ids(file_location)
-
- with open(file_location) as f:
- contents = json.load(f)
-
- self.logger.debug(f"Version ids contents are {contents}")
-
- provider_urls = contents.get("external-provider-urls", contents.get("external-provider-url"))
- if provider_urls is None:
- raise SystemError(
- f"version-ids.json at {file_location} is missing external provider URLs. "
- "Please delete the file and try again, or open an issue on the repo."
- )
-
- if isinstance(provider_urls, str):
- provider_urls = [provider_urls]
-
- return (
- contents.get("remarkablepp", {}),
- contents.get("remarkableppm", {}),
- contents.get("remarkable2", {}),
- contents.get("remarkable1", {}),
- provider_urls,
- )
-
- def update_version_ids(self, location: str) -> None:
- """Updates the version-ids.json file
-
- Args:
- location (str): Location to save the file
-
- Raises:
- SystemExit: If the file cannot be updated
- """
- with open(location, "w", newline="\n") as f:
- try:
- self.logger.debug("Downloading version-ids.json")
- contents = requests.get(
- "https://raw.githubusercontent.com/Jayy001/codexctl/main/data/version-ids.json"
- ).json()
- json.dump(contents, f, indent=4)
- f.write("\n")
- except requests.exceptions.Timeout:
- raise SystemExit(
- "Connection timed out while downloading version-ids.json! Do you have an internet connection?"
- )
- except Exception as error:
- raise SystemExit(
- f"Unknown error while downloading version-ids.json! {error}"
- )
-
- def get_latest_version(self, hardware_type: HardwareType) -> str:
- """Gets the latest version available for the device
-
- Args:
- hardware_type (HardwareType enum): Type of the device
-
- Returns:
- str: Latest version available for the device
- """
- match hardware_type:
- case HardwareType.RM1:
- versions = self.remarkable1_versions
- case HardwareType.RM2:
- versions = self.remarkable2_versions
- case HardwareType.RMPP:
- versions = self.remarkablepp_versions
- case HardwareType.RMPPM:
- versions = self.remarkableppm_versions
-
- return self.__max_version(versions.keys())
-
- def get_toltec_version(self, hardware_type: HardwareType) -> str:
- """Gets the latest version available toltec for the device
-
- Args:
- hardware_type (HardwareType enum): Type of the device
-
- Returns:
- str: Latest version available for the device
- """
-
- try:
- toltec_type = hardware_type.toltec_type
- except ValueError as ex:
- raise SystemExit(*ex.args)
-
- response = requests.get("https://toltec-dev.org/stable/Compatibility")
- if response.status_code != 200:
- raise SystemExit(
- f"Error: Failed to get toltec compatibility table: {response.status_code}"
- )
-
- return self.__max_version(
- [
- x.split("=")[1]
- for x in response.text.splitlines()
- if x.startswith(f"{toltec_type}=")
- ]
- )
-
- def download_version(
- self, hardware_type: HardwareType, update_version: str, download_folder: str|None = None
- ) -> str | None:
- """Downloads the specified version of the update
-
- Args:
- hardware_type (HardwareType enum): Type of the device
- update_version (str): Id of version to download.
- download_folder (str, optional): Location of download folder. Defaults to download folder for OS.
-
- Returns:
- str | None: Location of the file if the download was successful, None otherwise
- """
-
- if download_folder is None:
- download_folder = str(Path(
- os.environ["XDG_DOWNLOAD_DIR"]
- if (
- "XDG_DOWNLOAD_DIR" in os.environ
- and os.path.exists(os.environ["XDG_DOWNLOAD_DIR"])
- )
- else Path.home() / "Downloads"
- ))
-
- if not os.path.exists(download_folder):
- self.logger.error(
- f"Download folder {download_folder} does not exist! Creating it now."
- )
- os.makedirs(download_folder)
-
- BASE_URL = "https://updates-download.cloud.remarkable.engineering/build/reMarkable%20Device%20Beta/RM110" # Default URL for v2 versions
- BASE_URL_V3 = "https://updates-download.cloud.remarkable.engineering/build/reMarkable%20Device/reMarkable"
-
- match hardware_type:
- case HardwareType.RMPP:
- version_lookup = self.remarkablepp_versions
-
- case HardwareType.RMPPM:
- version_lookup = self.remarkableppm_versions
-
- case HardwareType.RM2:
- version_lookup = self.remarkable2_versions
- BASE_URL_V3 += "2"
-
- case HardwareType.RM1:
- version_lookup = self.remarkable1_versions
-
- if update_version not in version_lookup:
- self.logger.error(
- f"Version {update_version} not found in version-ids.json! Please update your version-ids.json file."
- )
- return
-
- version_id, version_checksum = version_lookup[update_version]
- version = tuple([int(x) for x in update_version.split(".")])
- if version >= (3,):
- BASE_URL = BASE_URL_V3
-
- if version <= (3, 11, 2, 5):
- file_name = f"{update_version}_{hardware_type.old_download_hw}-{version_id}.signed"
- file_url = f"{BASE_URL}/{update_version}/{file_name}"
- self.logger.debug(f"File URL is {file_url}, File name is {file_name}")
- return self.__download_version_file(
- file_url, file_name, download_folder, version_checksum
- )
-
- else:
- file_name = f"remarkable-production-memfault-image-{update_version}-{hardware_type.new_download_hw}-public"
-
- for provider_url in self.external_provider_urls:
- file_url = provider_url.replace("REPLACE_ID", version_id)
- self.logger.debug(f"Trying to download from {file_url}")
-
- result = self.__download_version_file(
- file_url, file_name, download_folder, version_checksum
- )
-
- if result is not None:
- self.logger.debug(f"Successfully downloaded from {provider_url}")
- return result
-
- self.logger.debug(f"Failed to download from {provider_url}, trying next source...")
-
- self.logger.error(f"Failed to download {file_name} from all sources")
- return None
-
- def __generate_xml_data(self) -> str:
- """Generates and returns XML data for the update request"""
- params = {
- "installsource": "scheduler",
- "requestid": str(uuid.uuid4()),
- "sessionid": str(uuid.uuid4()),
- "machineid": "00".zfill(32),
- "oem": "RM100-753-12345",
- "appid": "98DA7DF2-4E3E-4744-9DE6-EC931886ABAB",
- "bootid": str(uuid.uuid4()),
- "current": "3.2.3.1595",
- "group": "Prod",
- "platform": "reMarkable2",
- }
-
- return """
-
-
-
-
-
-""".format(**params)
-
- def __parse_response(self, resp: str) -> tuple[str, str, str] | None:
- """Parses the response from the update server and returns the file name, uri, and version if an update is available
-
- Args:
- resp (str): Response from the server
-
- Returns:
- tuple[str, str, str] | None: File name, uri, and version if an update is available, None otherwise
- """
- xml_data = ET.fromstring(resp)
-
- if "noupdate" in resp or xml_data is None:
- return None
-
- file_name = xml_data.find("app/updatecheck/manifest/packages/package").attrib[
- "name"
- ]
- file_uri = (
- f"{xml_data.find('app/updatecheck/urls/url').attrib['codebase']}{file_name}"
- )
- file_version = xml_data.find("app/updatecheck/manifest").attrib["version"]
-
- self.logger.debug(
- f"File version is {file_version}, file uri is {file_uri}, file name is {file_name}"
- )
- return file_version, file_uri, file_name
-
- def __download_version_file(
- self, uri: str, name: str, download_folder: str, checksum: str
- ) -> str | None:
- """Downloads the version file from the server and checks the checksum
-
- Args:
- uri (str): Location to the file
- name (str): Name of the file
- download_folder (str): Location of download folder
- checksum (str): Sha256 Checksum of the file
-
- Returns:
- str | None: Location of the file if the checksum matches, None otherwise
- """
- response = requests.get(uri, stream=True)
- if response.status_code != 200:
- self.logger.debug(f"Unable to download update file: {response.status_code}")
- return None
-
- file_length = response.headers.get("content-length")
-
- self.logger.debug(f"Downloading {name} from {uri} to {download_folder}")
- try:
- file_length = int(file_length)
-
- if int(file_length) < 10000000: # 10MB, invalid version file
- self.logger.error(
- f"File {name} is too small to be a valid version file"
- )
- return None
- except TypeError:
- self.logger.error(
- f"Could not get content length for {name}. Do you have an internet connection?"
- )
- return None
-
- self.logger.debug(f"{name} is {file_length} bytes")
-
- filename = f"{download_folder}/{name}"
- with open(filename, "wb") as out_file:
- dl = 0
-
- for data in response.iter_content(chunk_size=4096):
- dl += len(data)
- out_file.write(data)
- if sys.stdout.isatty():
- done = int(50 * dl / file_length)
- sys.stdout.write("\r[%s%s]" % ("=" * done, " " * (50 - done)))
- sys.stdout.flush()
-
- if sys.stdout.isatty():
- print(end="\r\n")
-
- self.logger.debug(f"Downloaded {name}")
-
- with open(filename, "rb") as f:
- file_checksum = hashlib.sha256(f.read()).hexdigest()
-
- if file_checksum != checksum:
- os.remove(filename)
- self.logger.error(
- f"File checksum mismatch! Expected {checksum}, got {file_checksum}"
- )
- return None
-
- return filename
-
- @staticmethod
- def __max_version(versions: list) -> str:
- """Returns the highest avaliable version from a list with semantic versioning"""
- return sorted(versions, key=lambda v: tuple(map(int, v.split("."))))[-1]
-
- @staticmethod
- def uses_new_update_engine(version: str) -> bool:
- """
- Checks if the version given is above 3.11 and so requires the newer update engine
-
- Args:
- version (str): version to check against
-
- Returns:
- bool: If it uses the new update engine or not
- """
- return tuple([int(x) for x in version.split(".")]) >= (3, 11, 2, 5)
-
- @staticmethod
- def is_bootloader_boundary_downgrade(current_version: str, target_version: str) -> bool:
- """
- Checks if downgrade crosses the 3.22 bootloader boundary (3.22+ -> <3.22).
-
- Paper Pro devices require bootloader updates when downgrading from
- version 3.22 or higher to any version below 3.22.
-
- Args:
- current_version (str): Currently installed version
- target_version (str): Target version to install
-
- Returns:
- bool: True if crossing boundary downward (3.22+ -> <3.22)
-
- Raises:
- ValueError: If either version is empty or has invalid format
- """
- if not current_version or not current_version.strip():
- raise ValueError("current_version cannot be empty")
- if not target_version or not target_version.strip():
- raise ValueError("target_version cannot be empty")
-
- try:
- current_parts = [int(x) for x in current_version.split('.')]
- target_parts = [int(x) for x in target_version.split('.')]
- except ValueError as e:
- raise ValueError(f"Invalid version format: {e}") from e
-
- if len(current_parts) < 2 or len(target_parts) < 2:
- raise ValueError("Version must have at least 2 components (e.g., '3.22')")
-
- current_is_322_or_higher = current_parts >= [3, 22]
- target_is_below_322 = target_parts < [3, 22]
-
- return current_is_322_or_higher and target_is_below_322
+import hashlib
+import json
+import logging
+import os
+import sys
+import uuid
+import xml.etree.ElementTree as ET
+from datetime import datetime
+from pathlib import Path
+
+import requests
+
+from .device import HardwareType
+
+
+class UpdateManager:
+ def __init__(self, logger=None) -> None:
+ """Manager for downloading update versions
+
+ Args:
+ logger (logger, optional): Logger object for logging. Defaults to None.
+ """
+
+ self.logger = logger
+
+ if self.logger is None:
+ self.logger = logging
+
+ (
+ self.remarkableppure_versions,
+ self.remarkablepp_versions,
+ self.remarkableppm_versions,
+ self.remarkable2_versions,
+ self.remarkable1_versions,
+ self.external_provider_urls,
+ ) = self.get_remarkable_versions()
+
+ def get_remarkable_versions(self) -> tuple[dict, dict, dict, dict, list]:
+ """Gets the avaliable versions for the device, by checking the local version-ids.json file and then updating it if necessary
+
+ Returns:
+ tuple: A tuple containing the version ids for the remarkablepp, remarkableppm, remarkable2, remarkable1, and external provider urls (in that order)
+ """
+
+ if os.path.exists("data/version-ids.json"):
+ file_location = "data/version-ids.json"
+
+ self.logger.debug("Found version-ids at data/version-ids.json")
+
+ else:
+ if os.name == "nt": # Windows
+ folder_location = os.getenv("APPDATA") + "/codexctl"
+ elif os.name in ("posix", "darwin"): # Linux or MacOS
+ folder_location = os.path.expanduser("~/.config/codexctl")
+ else:
+ raise SystemError("Unsupported OS")
+
+ self.logger.debug(f"Version config folder location is {folder_location}")
+ if not os.path.exists(folder_location):
+ os.makedirs(folder_location, exist_ok=True)
+
+ file_location = folder_location + "/version-ids.json"
+
+ if not os.path.exists(file_location):
+ self.update_version_ids(file_location)
+
+ try:
+ with open(file_location) as f:
+ contents = json.load(f)
+ except ValueError:
+ raise SystemError(
+ f"Version-ids.json @ {file_location} is corrupted! Please delete it and try again. Also, PLEASE open an issue on the repo showing the contents of the file."
+ )
+
+ if (
+ int(datetime.now().timestamp()) - contents["last-updated"]
+ > 5256000 # 2 months
+ ):
+ self.update_version_ids(file_location)
+
+ with open(file_location) as f:
+ contents = json.load(f)
+
+ self.logger.debug(f"Version ids contents are {contents}")
+
+ provider_urls = contents.get(
+ "external-provider-urls", contents.get("external-provider-url")
+ )
+ if provider_urls is None:
+ raise SystemError(
+ f"version-ids.json at {file_location} is missing external provider URLs. "
+ "Please delete the file and try again, or open an issue on the repo."
+ )
+
+ if isinstance(provider_urls, str):
+ provider_urls = [provider_urls]
+
+ return (
+ contents.get("remarkableppure", {}),
+ contents.get("remarkablepp", {}),
+ contents.get("remarkableppm", {}),
+ contents.get("remarkable2", {}),
+ contents.get("remarkable1", {}),
+ provider_urls,
+ )
+
+ def update_version_ids(self, location: str) -> None:
+ """Updates the version-ids.json file
+
+ Args:
+ location (str): Location to save the file
+
+ Raises:
+ SystemExit: If the file cannot be updated
+ """
+ with open(location, "w", newline="\n") as f:
+ try:
+ self.logger.debug("Downloading version-ids.json")
+ contents = requests.get(
+ "https://raw.githubusercontent.com/Jayy001/codexctl/main/data/version-ids.json"
+ ).json()
+ json.dump(contents, f, indent=4)
+ f.write("\n")
+ except requests.exceptions.Timeout:
+ raise SystemExit(
+ "Connection timed out while downloading version-ids.json! Do you have an internet connection?"
+ )
+ except Exception as error:
+ raise SystemExit(
+ f"Unknown error while downloading version-ids.json! {error}"
+ )
+
+ def get_latest_version(self, hardware_type: HardwareType) -> str:
+ """Gets the latest version available for the device
+
+ Args:
+ hardware_type (HardwareType enum): Type of the device
+
+ Returns:
+ str: Latest version available for the device
+ """
+ match hardware_type:
+ case HardwareType.RM1:
+ versions = self.remarkable1_versions
+
+ case HardwareType.RM2:
+ versions = self.remarkable2_versions
+
+ case HardwareType.RMPP:
+ versions = self.remarkablepp_versions
+
+ case HardwareType.RMPPM:
+ versions = self.remarkableppm_versions
+
+ case HardwareType.RMPPURE:
+ versions = self.remarkableppure_versions
+
+ return self.__max_version(versions.keys())
+
+ def get_toltec_version(self, hardware_type: HardwareType) -> str:
+ """Gets the latest version available toltec for the device
+
+ Args:
+ hardware_type (HardwareType enum): Type of the device
+
+ Returns:
+ str: Latest version available for the device
+ """
+
+ try:
+ toltec_type = hardware_type.toltec_type
+ except ValueError as ex:
+ raise SystemExit(*ex.args)
+
+ response = requests.get("https://toltec-dev.org/stable/Compatibility")
+ if response.status_code != 200:
+ raise SystemExit(
+ f"Error: Failed to get toltec compatibility table: {response.status_code}"
+ )
+
+ return self.__max_version(
+ [
+ x.split("=")[1]
+ for x in response.text.splitlines()
+ if x.startswith(f"{toltec_type}=")
+ ]
+ )
+
+ def download_version(
+ self,
+ hardware_type: HardwareType,
+ update_version: str,
+ download_folder: str | None = None,
+ ) -> str | None:
+ """Downloads the specified version of the update
+
+ Args:
+ hardware_type (HardwareType enum): Type of the device
+ update_version (str): Id of version to download.
+ download_folder (str, optional): Location of download folder. Defaults to download folder for OS.
+
+ Returns:
+ str | None: Location of the file if the download was successful, None otherwise
+ """
+
+ if download_folder is None:
+ download_folder = str(
+ Path(
+ os.environ["XDG_DOWNLOAD_DIR"]
+ if (
+ "XDG_DOWNLOAD_DIR" in os.environ
+ and os.path.exists(os.environ["XDG_DOWNLOAD_DIR"])
+ )
+ else Path.home() / "Downloads"
+ )
+ )
+
+ if not os.path.exists(download_folder):
+ self.logger.error(
+ f"Download folder {download_folder} does not exist! Creating it now."
+ )
+ os.makedirs(download_folder)
+
+ BASE_URL = "https://updates-download.cloud.remarkable.engineering/build/reMarkable%20Device%20Beta/RM110" # Default URL for v2 versions
+ BASE_URL_V3 = "https://updates-download.cloud.remarkable.engineering/build/reMarkable%20Device/reMarkable"
+
+ match hardware_type:
+ case HardwareType.RMPPURE:
+ version_lookup = self.remarkableppure_versions
+
+ case HardwareType.RMPP:
+ version_lookup = self.remarkablepp_versions
+
+ case HardwareType.RMPPM:
+ version_lookup = self.remarkableppm_versions
+
+ case HardwareType.RM2:
+ version_lookup = self.remarkable2_versions
+ BASE_URL_V3 += "2"
+
+ case HardwareType.RM1:
+ version_lookup = self.remarkable1_versions
+
+ if update_version not in version_lookup:
+ self.logger.error(
+ f"Version {update_version} not found in version-ids.json! Please update your version-ids.json file."
+ )
+ return
+
+ version_id, version_checksum = version_lookup[update_version]
+ version = tuple([int(x) for x in update_version.split(".")])
+ if version >= (3,):
+ BASE_URL = BASE_URL_V3
+
+ if version <= (3, 11, 2, 5):
+ file_name = (
+ f"{update_version}_{hardware_type.old_download_hw}-{version_id}.signed"
+ )
+ file_url = f"{BASE_URL}/{update_version}/{file_name}"
+ self.logger.debug(f"File URL is {file_url}, File name is {file_name}")
+ return self.__download_version_file(
+ file_url, file_name, download_folder, version_checksum
+ )
+
+ else:
+ file_name = f"remarkable-production-memfault-image-{update_version}-{hardware_type.new_download_hw}-public"
+
+ for provider_url in self.external_provider_urls:
+ file_url = provider_url.replace("REPLACE_ID", version_id)
+ self.logger.debug(f"Trying to download from {file_url}")
+
+ result = self.__download_version_file(
+ file_url, file_name, download_folder, version_checksum
+ )
+
+ if result is not None:
+ self.logger.debug(f"Successfully downloaded from {provider_url}")
+ return result
+
+ self.logger.debug(
+ f"Failed to download from {provider_url}, trying next source..."
+ )
+
+ self.logger.error(f"Failed to download {file_name} from all sources")
+ return None
+
+ def __generate_xml_data(self) -> str:
+ """Generates and returns XML data for the update request"""
+ params = {
+ "installsource": "scheduler",
+ "requestid": str(uuid.uuid4()),
+ "sessionid": str(uuid.uuid4()),
+ "machineid": "00".zfill(32),
+ "oem": "RM100-753-12345",
+ "appid": "98DA7DF2-4E3E-4744-9DE6-EC931886ABAB",
+ "bootid": str(uuid.uuid4()),
+ "current": "3.2.3.1595",
+ "group": "Prod",
+ "platform": "reMarkable2",
+ }
+
+ return """
+
+
+
+
+
+""".format(**params)
+
+ def __parse_response(self, resp: str) -> tuple[str, str, str] | None:
+ """Parses the response from the update server and returns the file name, uri, and version if an update is available
+
+ Args:
+ resp (str): Response from the server
+
+ Returns:
+ tuple[str, str, str] | None: File name, uri, and version if an update is available, None otherwise
+ """
+ xml_data = ET.fromstring(resp)
+
+ if "noupdate" in resp or xml_data is None:
+ return None
+
+ file_name = xml_data.find("app/updatecheck/manifest/packages/package").attrib[
+ "name"
+ ]
+ file_uri = (
+ f"{xml_data.find('app/updatecheck/urls/url').attrib['codebase']}{file_name}"
+ )
+ file_version = xml_data.find("app/updatecheck/manifest").attrib["version"]
+
+ self.logger.debug(
+ f"File version is {file_version}, file uri is {file_uri}, file name is {file_name}"
+ )
+ return file_version, file_uri, file_name
+
+ def __download_version_file(
+ self, uri: str, name: str, download_folder: str, checksum: str
+ ) -> str | None:
+ """Downloads the version file from the server and checks the checksum
+
+ Args:
+ uri (str): Location to the file
+ name (str): Name of the file
+ download_folder (str): Location of download folder
+ checksum (str): Sha256 Checksum of the file
+
+ Returns:
+ str | None: Location of the file if the checksum matches, None otherwise
+ """
+ response = requests.get(uri, stream=True)
+ if response.status_code != 200:
+ self.logger.debug(f"Unable to download update file: {response.status_code}")
+ return None
+
+ file_length = response.headers.get("content-length")
+
+ self.logger.debug(f"Downloading {name} from {uri} to {download_folder}")
+ try:
+ file_length = int(file_length)
+
+ if int(file_length) < 10000000: # 10MB, invalid version file
+ self.logger.error(
+ f"File {name} is too small to be a valid version file"
+ )
+ return None
+ except TypeError:
+ self.logger.error(
+ f"Could not get content length for {name}. Do you have an internet connection?"
+ )
+ return None
+
+ self.logger.debug(f"{name} is {file_length} bytes")
+
+ filename = f"{download_folder}/{name}"
+ with open(filename, "wb") as out_file:
+ dl = 0
+
+ for data in response.iter_content(chunk_size=4096):
+ dl += len(data)
+ out_file.write(data)
+ if sys.stdout.isatty():
+ done = int(50 * dl / file_length)
+ sys.stdout.write("\r[%s%s]" % ("=" * done, " " * (50 - done)))
+ sys.stdout.flush()
+
+ if sys.stdout.isatty():
+ print(end="\r\n")
+
+ self.logger.debug(f"Downloaded {name}")
+
+ with open(filename, "rb") as f:
+ file_checksum = hashlib.sha256(f.read()).hexdigest()
+
+ if file_checksum != checksum:
+ os.remove(filename)
+ self.logger.error(
+ f"File checksum mismatch! Expected {checksum}, got {file_checksum}"
+ )
+ return None
+
+ return filename
+
+ @staticmethod
+ def __max_version(versions: list) -> str:
+ """Returns the highest avaliable version from a list with semantic versioning"""
+ return sorted(versions, key=lambda v: tuple(map(int, v.split("."))))[-1]
+
+ @staticmethod
+ def uses_new_update_engine(version: str) -> bool:
+ """
+ Checks if the version given is above 3.11 and so requires the newer update engine
+
+ Args:
+ version (str): version to check against
+
+ Returns:
+ bool: If it uses the new update engine or not
+ """
+ return tuple([int(x) for x in version.split(".")]) >= (3, 11, 2, 5)
+
+ @staticmethod
+ def is_bootloader_boundary_downgrade(
+ current_version: str, target_version: str
+ ) -> bool:
+ """
+ Checks if downgrade crosses the 3.22 bootloader boundary (3.22+ -> <3.22).
+
+ Paper Pro devices require bootloader updates when downgrading from
+ version 3.22 or higher to any version below 3.22.
+
+ Args:
+ current_version (str): Currently installed version
+ target_version (str): Target version to install
+
+ Returns:
+ bool: True if crossing boundary downward (3.22+ -> <3.22)
+
+ Raises:
+ ValueError: If either version is empty or has invalid format
+ """
+ if not current_version or not current_version.strip():
+ raise ValueError("current_version cannot be empty")
+ if not target_version or not target_version.strip():
+ raise ValueError("target_version cannot be empty")
+
+ try:
+ current_parts = [int(x) for x in current_version.split(".")]
+ target_parts = [int(x) for x in target_version.split(".")]
+ except ValueError as e:
+ raise ValueError(f"Invalid version format: {e}") from e
+
+ if len(current_parts) < 2 or len(target_parts) < 2:
+ raise ValueError("Version must have at least 2 components (e.g., '3.22')")
+
+ current_is_322_or_higher = current_parts >= [3, 22]
+ target_is_below_322 = target_parts < [3, 22]
+
+ return current_is_322_or_higher and target_is_below_322
diff --git a/data/version-ids.json b/data/version-ids.json
index c969ea5..a58d52d 100644
--- a/data/version-ids.json
+++ b/data/version-ids.json
@@ -555,6 +555,12 @@
"b494b36e6c1749bbe205ef26f2cea31728f6fe82aa5abf5057e37a09ec326ffa"
]
},
+ "remarkableppure": {
+ "3.27.1.0": [
+ "remarkable-production-image-3.27.1.0-tatsu-public.swu",
+ "7008c8376e49a65b766bbd672f83454d55290af73b099bba396f3d9b2276ce3f"
+ ]
+ },
"last-updated": 1778173895,
"external-provider-urls": [
"https://storage.googleapis.com/remarkable-versions/REPLACE_ID",
diff --git a/pyproject.toml b/pyproject.toml
index 30651b1..a4022a2 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -12,8 +12,8 @@ paramiko = "3.4.1"
psutil = "6.0.0"
requests = "2.32.4"
loguru = "0.7.3"
-remarkable-update-image = { version = "1.3", markers = "sys_platform != 'linux'" }
-remarkable-update-fuse = { version = "1.3", markers = "sys_platform == 'linux'" }
+remarkable-update-image = { version = "1.4.1", markers = "sys_platform != 'linux'" }
+remarkable-update-fuse = { version = "1.4.2", markers = "sys_platform == 'linux'" }
[build-system]
requires = ["poetry-core"]
diff --git a/requirements.txt b/requirements.txt
index 3b66c54..efabadb 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,4 +1,4 @@
requests==2.32.5
loguru==0.7.3
-remarkable-update-image==1.4; sys_platform != 'linux'
-remarkable-update-fuse==1.4.1; sys_platform == 'linux'
+remarkable-update-image==1.4.1; sys_platform != 'linux'
+remarkable-update-fuse==1.4.2; sys_platform == 'linux'
diff --git a/tests/test.py b/tests/test.py
index 3a5d0ad..8170884 100644
--- a/tests/test.py
+++ b/tests/test.py
@@ -1,252 +1,252 @@
-import os
-import sys
-import difflib
-import contextlib
-import logging
-from unittest.mock import NonCallableMock
-
-sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
-
-from codexctl.device import HardwareType, DeviceManager
-from codexctl.updates import UpdateManager
-from codexctl import Manager
-
-# Mock device manager object, only the `logger` field is accessed by `set_server_config`
-device_manager = NonCallableMock(["logger"])
-
-set_server_config = DeviceManager.set_server_config
-codexctl = Manager(device="reMarkable2", logger=logging.getLogger(__name__))
-updater = UpdateManager(logger=logging.getLogger(__name__))
-
-from io import StringIO
-from io import BytesIO
-
-FAILED = False
-UPDATE_FILE_PATH = ".venv/2.15.1.1189_reMarkable2-wVbHkgKisg-.signed"
-
-assert os.path.exists(UPDATE_FILE_PATH), "Update image missing"
-
-class BufferWriter:
- def __init__(self, buffer):
- self._buffer = buffer
-
- def write(self, data):
- self._buffer.write(data)
-
-
-class BufferBytesIO(BytesIO):
- @property
- def buffer(self):
- return BufferWriter(self)
-
-
-def assert_value(msg, value, expected):
- global FAILED
- print(f"Testing {msg}: ", end="")
- if value == expected:
- print("pass")
- return
-
- FAILED = True
- print("fail")
- print(f" {value} != {expected}")
-
-
-def assert_gt(msg, value, expected):
- global FAILED
- print(f"Testing {msg}: ", end="")
- if value >= expected:
- print("pass")
- return
-
- FAILED = True
- print("fail")
- print(f" {value} != {expected}")
-
-@contextlib.contextmanager
-def assert_raises(msg, expected):
- global FAILED
- print(f"Testing {msg}: ", end="")
- try:
- yield
- got = "no exception"
- except expected:
- print("pass")
- return
- except Exception as e:
- got = e.__class__.__name__
-
- FAILED = True
- print("fail")
- print(f" {got} != {expected.__name__}")
-
-def test_set_server_config(original, expected):
- global FAILED
- print("Testing set_server_config: ", end="")
- result = set_server_config(device_manager, original, "test")
- if result == expected:
- print("pass")
- return
-
- FAILED = True
- print("fail")
- for diff in difflib.ndiff(
- expected.splitlines(keepends=True), result.splitlines(keepends=True)
- ):
- print(f" {diff}")
-
-
-def test_ls(path, expected):
- global FAILED
- global UPDATE_FILE_PATH
- print(f"Testing ls {path}: ", end="")
- with contextlib.redirect_stdout(StringIO()) as f:
- try:
- codexctl.call_func("ls", {'file': UPDATE_FILE_PATH, 'target_path': path})
-
- except SystemExit:
- pass
-
- result = f.getvalue()
- if result == expected:
- print("pass")
- return
-
- FAILED = True
- print("fail")
- for diff in difflib.ndiff(
- expected.splitlines(keepends=True), result.splitlines(keepends=True)
- ):
- print(f" {diff}")
-
-
-def test_cat(path, expected):
- global FAILED
- global UPDATE_FILE_PATH
- print(f"Testing cat {path}: ", end="")
- with contextlib.redirect_stdout(BufferBytesIO()) as f:
- try:
- codexctl.call_func("cat", {'file': UPDATE_FILE_PATH, 'target_path': path})
- except SystemExit:
- pass
-
- result = f.getvalue()
- if result == expected:
- print("pass")
- return
-
- FAILED = True
- print("fail")
- for diff in difflib.ndiff(
- expected.decode("utf-8").splitlines(keepends=True),
- result.decode("utf-8").splitlines(keepends=True),
- ):
- print(f" {diff}")
-
-
-test_set_server_config(
- "",
- "SERVER=test\n",
-)
-
-test_set_server_config(
- """[General]
-#REMARKABLE_RELEASE_APPID={98DA7DF2-4E3E-4744-9DE6-EC931886ABAB}
-#SERVER=https://updates.cloud.remarkable.engineering/service/update2
-#GROUP=Prod
-#PLATFORM=reMarkable2
-REMARKABLE_RELEASE_VERSION=3.9.5.2026
-""",
- """[General]
-SERVER=test
-#REMARKABLE_RELEASE_APPID={98DA7DF2-4E3E-4744-9DE6-EC931886ABAB}
-#SERVER=https://updates.cloud.remarkable.engineering/service/update2
-#GROUP=Prod
-#PLATFORM=reMarkable2
-REMARKABLE_RELEASE_VERSION=3.9.5.2026
-""",
-)
-
-test_set_server_config(
- """[General]
-SERVER=testing
-#REMARKABLE_RELEASE_APPID={98DA7DF2-4E3E-4744-9DE6-EC931886ABAB}
-#SERVER=https://updates.cloud.remarkable.engineering/service/update2
-#GROUP=Prod
-#PLATFORM=reMarkable2
-REMARKABLE_RELEASE_VERSION=3.9.5.2026
-""",
- """[General]
-SERVER=test
-#SERVER=testing
-#REMARKABLE_RELEASE_APPID={98DA7DF2-4E3E-4744-9DE6-EC931886ABAB}
-#SERVER=https://updates.cloud.remarkable.engineering/service/update2
-#GROUP=Prod
-#PLATFORM=reMarkable2
-REMARKABLE_RELEASE_VERSION=3.9.5.2026
-""",
-)
-
-test_ls(
- "/",
- ". .. lost+found bin boot dev etc home lib media mnt postinst proc run sbin sys tmp uboot-postinst uboot-version usr var\n",
-)
-test_ls(
- "/mnt",
- ". ..\n",
-)
-
-test_cat("/etc/version", b"20221026104022\n")
-
-assert_gt(
- "toltec rm1 version",
- updater.get_toltec_version(HardwareType.RM1),
- "3.3.2.1666"
-)
-assert_gt(
- "toltec rm2 version",
- updater.get_toltec_version(HardwareType.RM2),
- "3.3.2.1666"
-)
-with assert_raises("toltec rmpp version", SystemExit):
- updater.get_toltec_version(HardwareType.RMPP)
-with assert_raises("toltec rmppm version", SystemExit):
- updater.get_toltec_version(HardwareType.RMPPM)
-
-assert_value(
- "boundary cross 3.23->3.20",
- UpdateManager.is_bootloader_boundary_downgrade("3.23.0.64", "3.20.0.92"),
- True
-)
-assert_value(
- "boundary cross 3.22->3.20",
- UpdateManager.is_bootloader_boundary_downgrade("3.22.0.64", "3.20.0.92"),
- True
-)
-assert_value(
- "no boundary 3.23->3.22",
- UpdateManager.is_bootloader_boundary_downgrade("3.23.0.64", "3.22.0.64"),
- False
-)
-assert_value(
- "no boundary 3.20->3.19",
- UpdateManager.is_bootloader_boundary_downgrade("3.20.0.92", "3.19.0.82"),
- False
-)
-assert_value(
- "upgrade 3.20->3.22",
- UpdateManager.is_bootloader_boundary_downgrade("3.20.0.92", "3.22.0.64"),
- False
-)
-assert_value(
- "same version 3.22->3.22",
- UpdateManager.is_bootloader_boundary_downgrade("3.22.0.64", "3.22.0.64"),
- False
-)
-with assert_raises("empty string current", ValueError):
- UpdateManager.is_bootloader_boundary_downgrade("", "3.20.0.92")
-with assert_raises("non-numeric version", ValueError):
- UpdateManager.is_bootloader_boundary_downgrade("abc.def", "3.20.0.92")
-
-if FAILED:
- sys.exit(1)
+import contextlib
+import difflib
+import logging
+import os
+import sys
+from unittest.mock import NonCallableMock
+
+sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
+
+from codexctl import Manager
+from codexctl.device import DeviceManager, HardwareType
+from codexctl.updates import UpdateManager
+
+# Mock device manager object, only the `logger` field is accessed by `set_server_config`
+device_manager = NonCallableMock(["logger"])
+
+set_server_config = DeviceManager.set_server_config
+codexctl = Manager(device="reMarkable2", logger=logging.getLogger(__name__))
+updater = UpdateManager(logger=logging.getLogger(__name__))
+
+from io import BytesIO, StringIO
+
+FAILED = False
+UPDATE_FILE_PATH = ".venv/2.15.1.1189_reMarkable2-wVbHkgKisg-.signed"
+
+assert os.path.exists(UPDATE_FILE_PATH), "Update image missing"
+
+
+class BufferWriter:
+ def __init__(self, buffer):
+ self._buffer = buffer
+
+ def write(self, data):
+ self._buffer.write(data)
+
+
+class BufferBytesIO(BytesIO):
+ @property
+ def buffer(self):
+ return BufferWriter(self)
+
+
+def assert_value(msg, value, expected):
+ global FAILED
+ print(f"Testing {msg}: ", end="")
+ if value == expected:
+ print("pass")
+ return
+
+ FAILED = True
+ print("fail")
+ print(f" {value} != {expected}")
+
+
+def assert_gt(msg, value, expected):
+ global FAILED
+ print(f"Testing {msg}: ", end="")
+ if value >= expected:
+ print("pass")
+ return
+
+ FAILED = True
+ print("fail")
+ print(f" {value} != {expected}")
+
+
+@contextlib.contextmanager
+def assert_raises(msg, expected):
+ global FAILED
+ print(f"Testing {msg}: ", end="")
+ try:
+ yield
+ got = "no exception"
+ except expected:
+ print("pass")
+ return
+ except Exception as e:
+ got = e.__class__.__name__
+
+ FAILED = True
+ print("fail")
+ print(f" {got} != {expected.__name__}")
+
+
+def test_set_server_config(original, expected):
+ global FAILED
+ print("Testing set_server_config: ", end="")
+ result = set_server_config(device_manager, original, "test")
+ if result == expected:
+ print("pass")
+ return
+
+ FAILED = True
+ print("fail")
+ for diff in difflib.ndiff(
+ expected.splitlines(keepends=True), result.splitlines(keepends=True)
+ ):
+ print(f" {diff}")
+
+
+def test_ls(path, expected):
+ global FAILED
+ global UPDATE_FILE_PATH
+ print(f"Testing ls {path}: ", end="")
+ with contextlib.redirect_stdout(StringIO()) as f:
+ try:
+ codexctl.call_func("ls", {"file": UPDATE_FILE_PATH, "target_path": path})
+
+ except SystemExit:
+ pass
+
+ result = f.getvalue()
+ if result == expected:
+ print("pass")
+ return
+
+ FAILED = True
+ print("fail")
+ for diff in difflib.ndiff(
+ expected.splitlines(keepends=True), result.splitlines(keepends=True)
+ ):
+ print(f" {diff}")
+
+
+def test_cat(path, expected):
+ global FAILED
+ global UPDATE_FILE_PATH
+ print(f"Testing cat {path}: ", end="")
+ with contextlib.redirect_stdout(BufferBytesIO()) as f:
+ try:
+ codexctl.call_func("cat", {"file": UPDATE_FILE_PATH, "target_path": path})
+ except SystemExit:
+ pass
+
+ result = f.getvalue()
+ if result == expected:
+ print("pass")
+ return
+
+ FAILED = True
+ print("fail")
+ for diff in difflib.ndiff(
+ expected.decode("utf-8").splitlines(keepends=True),
+ result.decode("utf-8").splitlines(keepends=True),
+ ):
+ print(f" {diff}")
+
+
+test_set_server_config(
+ "",
+ "SERVER=test\n",
+)
+
+test_set_server_config(
+ """[General]
+#REMARKABLE_RELEASE_APPID={98DA7DF2-4E3E-4744-9DE6-EC931886ABAB}
+#SERVER=https://updates.cloud.remarkable.engineering/service/update2
+#GROUP=Prod
+#PLATFORM=reMarkable2
+REMARKABLE_RELEASE_VERSION=3.9.5.2026
+""",
+ """[General]
+SERVER=test
+#REMARKABLE_RELEASE_APPID={98DA7DF2-4E3E-4744-9DE6-EC931886ABAB}
+#SERVER=https://updates.cloud.remarkable.engineering/service/update2
+#GROUP=Prod
+#PLATFORM=reMarkable2
+REMARKABLE_RELEASE_VERSION=3.9.5.2026
+""",
+)
+
+test_set_server_config(
+ """[General]
+SERVER=testing
+#REMARKABLE_RELEASE_APPID={98DA7DF2-4E3E-4744-9DE6-EC931886ABAB}
+#SERVER=https://updates.cloud.remarkable.engineering/service/update2
+#GROUP=Prod
+#PLATFORM=reMarkable2
+REMARKABLE_RELEASE_VERSION=3.9.5.2026
+""",
+ """[General]
+SERVER=test
+#SERVER=testing
+#REMARKABLE_RELEASE_APPID={98DA7DF2-4E3E-4744-9DE6-EC931886ABAB}
+#SERVER=https://updates.cloud.remarkable.engineering/service/update2
+#GROUP=Prod
+#PLATFORM=reMarkable2
+REMARKABLE_RELEASE_VERSION=3.9.5.2026
+""",
+)
+
+test_ls(
+ "/",
+ ". .. lost+found bin boot dev etc home lib media mnt postinst proc run sbin sys tmp uboot-postinst uboot-version usr var\n",
+)
+test_ls(
+ "/mnt",
+ ". ..\n",
+)
+
+test_cat("/etc/version", b"20221026104022\n")
+
+assert_gt(
+ "toltec rm1 version", updater.get_toltec_version(HardwareType.RM1), "3.3.2.1666"
+)
+assert_gt(
+ "toltec rm2 version", updater.get_toltec_version(HardwareType.RM2), "3.3.2.1666"
+)
+with assert_raises("toltec rmpp version", SystemExit):
+ updater.get_toltec_version(HardwareType.RMPP)
+with assert_raises("toltec rmppm version", SystemExit):
+ updater.get_toltec_version(HardwareType.RMPPM)
+with assert_raises("toltec rmppure version", SystemExit):
+ updater.get_toltec_version(HardwareType.RMPPURE)
+
+assert_value(
+ "boundary cross 3.23->3.20",
+ UpdateManager.is_bootloader_boundary_downgrade("3.23.0.64", "3.20.0.92"),
+ True,
+)
+assert_value(
+ "boundary cross 3.22->3.20",
+ UpdateManager.is_bootloader_boundary_downgrade("3.22.0.64", "3.20.0.92"),
+ True,
+)
+assert_value(
+ "no boundary 3.23->3.22",
+ UpdateManager.is_bootloader_boundary_downgrade("3.23.0.64", "3.22.0.64"),
+ False,
+)
+assert_value(
+ "no boundary 3.20->3.19",
+ UpdateManager.is_bootloader_boundary_downgrade("3.20.0.92", "3.19.0.82"),
+ False,
+)
+assert_value(
+ "upgrade 3.20->3.22",
+ UpdateManager.is_bootloader_boundary_downgrade("3.20.0.92", "3.22.0.64"),
+ False,
+)
+assert_value(
+ "same version 3.22->3.22",
+ UpdateManager.is_bootloader_boundary_downgrade("3.22.0.64", "3.22.0.64"),
+ False,
+)
+with assert_raises("empty string current", ValueError):
+ UpdateManager.is_bootloader_boundary_downgrade("", "3.20.0.92")
+with assert_raises("non-numeric version", ValueError):
+ UpdateManager.is_bootloader_boundary_downgrade("abc.def", "3.20.0.92")
+
+if FAILED:
+ sys.exit(1)