Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 31 additions & 2 deletions sigmf/convert/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@
"""Convert non-SigMF recordings to SigMF format"""

from pathlib import Path
from typing import Optional

from ..error import SigMFConversionError


def get_magic_bytes(file_path: Path, count: int = 4, offset: int = 0) -> bytes:
def get_magic_bytes(file_path: Path, count: int = 4, offset: int = 0, magic_bytes: Optional[bytes] = None) -> bytes:
"""
Get magic bytes from a file to help identify file type.

Expand All @@ -23,6 +24,8 @@ def get_magic_bytes(file_path: Path, count: int = 4, offset: int = 0) -> bytes:
Number of bytes to read. Default is 4.
offset : int, optional
Byte offset to start reading from. Default is 0.
magic_bytes : str, optional
If provided, search the entire file for this byte sequence.

Returns
-------
Expand All @@ -36,11 +39,25 @@ def get_magic_bytes(file_path: Path, count: int = 4, offset: int = 0) -> bytes:
"""
try:
with open(file_path, "rb") as handle:
# If magic_bytes is provided, search anywhere in the file
if magic_bytes is not None:
data = handle.read()
idx = data.find(magic_bytes)
if idx != -1:
return magic_bytes
raise SigMFConversionError(
f"Magic bytes {magic_bytes} not found anywhere in {file_path}"
)

# Otherwise: read bytes at the given offset
handle.seek(offset)
magic_bytes = handle.read(count)
if len(magic_bytes) < count:
raise SigMFConversionError(f"File {file_path} too small to read {count} magic bytes at offset {offset}")
raise SigMFConversionError(
f"File {file_path} too small to read {count} bytes at offset {offset}"
)
return magic_bytes

except (IOError, OSError) as err:
raise SigMFConversionError(f"Cannot read magic bytes from {file_path}: {err}") from err

Expand Down Expand Up @@ -84,6 +101,18 @@ def detect_converter(file_path: Path):
f"Expected SignalHoundIQFile for Signal Hound Spike files."
)

elif file_path.suffix in [".tar"]:
# iq.tar file extensions are used by Rohde & Schwarz for their IQ data, but the .tar extension is also used by other formats.
# So parse the tar file to determine if it is a Rohde & Schwarz file or not.
rohde_schwarz_expanded_magic_bytes = get_magic_bytes(file_path, count=20, offset=0,magic_bytes=b"RS_IQ_TAR_FileFormat") # <RS_IQ_TAR_FileFormat>
if rohde_schwarz_expanded_magic_bytes == b"RS_IQ_TAR_FileFormat":
return "rohdeschwarz"
else:
raise SigMFConversionError(
f"Unsupported XML file format. Root element: {rohde_schwarz_expanded_magic_bytes}. "
f"Expected RS_IQ_TAR_FileFormat for IQ.TAR files."
)

else:
raise SigMFConversionError(
f"Unsupported file format. Magic bytes: {magic_bytes}. "
Expand Down
15 changes: 10 additions & 5 deletions sigmf/convert/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,16 @@
from . import detect_converter
from .blue import blue_to_sigmf
from .signalhound import signalhound_to_sigmf
from .signalhound import signalhound_to_sigmf
from .wav import wav_to_sigmf

from .rohdeschwarz import rohdeschwarz_to_sigmf

def main() -> None:
"""
Unified entry-point for SigMF conversion of non-SigMF recordings.

This command-line interface converts various non-SigMF file formats into SigMF-compliant datasets.
It currently supports WAV and BLUE/Platinum file formats.
It currently supports WAV and BLUE/Platinum, Signal Hound Spike and Rohde and Schwarz IQ.TAR file formats.
The converter detects the file type based on magic bytes and invokes the appropriate conversion function.

By default it will output a SigMF pair (.sigmf-meta and .sigmf-data).
Expand Down Expand Up @@ -94,10 +95,14 @@ def main() -> None:
elif converter_type == "blue":
_ = blue_to_sigmf(blue_path=input_path, out_path=output_path, create_archive=args.archive, create_ncd=args.ncd)
elif converter_type == "signalhound":
_ = signalhound_to_sigmf(
signalhound_path=input_path, out_path=output_path, create_archive=args.archive, create_ncd=args.ncd
)
_ = signalhound_to_sigmf(signalhound_path=input_path, out_path=output_path, create_archive=args.archive, create_ncd=args.ncd)
elif converter_type == "rohdeschwarz":
_ = rohdeschwarz_to_sigmf(rohdeschwarz_path=input_path, out_path=output_path, create_archive=args.archive, create_ncd=args.ncd)

else:
raise SigMFConversionError(
f"Supported formats for conversion are WAV, BLUE/Platinum, Signal Hound Spike and Rohde and Schwarz IQ.TAR."
)

if __name__ == "__main__":
main()
Loading
Loading