Skip to content
52 changes: 27 additions & 25 deletions canopen/pdo/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import binascii
import contextlib
import logging
import math
import threading
from collections.abc import Iterator, Mapping
from typing import Callable, Optional, TYPE_CHECKING, Union
Expand Down Expand Up @@ -290,7 +289,7 @@ def _fill_map(self, needed):
self.map.append(var)

def _update_data_size(self):
self.data = bytearray(int(math.ceil(self.length / 8.0)))
self.data = bytearray(-(-self.length // 8)) # rounds up

@property
def name(self) -> str:
Expand Down Expand Up @@ -612,23 +611,29 @@ def get_data(self) -> bytes:
"""
byte_offset, bit_offset = divmod(self.offset, 8)

byte_last = -(-(self.offset + self.length) // 8) # rounds up
if bit_offset or self.length % 8:
# Need information of the current variable type (unsigned vs signed)
data_type = self.od.data_type
if data_type == objectdictionary.BOOLEAN:
# A boolean type needs to be treated as an U08
data_type = objectdictionary.UNSIGNED8
od_struct = self.od.STRUCT_TYPES[data_type]
data = od_struct.unpack_from(self.pdo_parent.data, byte_offset)[0]
# Mask of relevant bits for this mapping, starting at bit 0
mask = (1 << self.length) - 1
# Extract all needed bytes and convert to a number for bit operations
cur_msg_data = self.pdo_parent.data[byte_offset:byte_last]
cur_msg_bits = int.from_bytes(cur_msg_data, byteorder="little")
# Shift and mask to get the correct values
data = (data >> bit_offset) & ((1 << self.length) - 1)
data_bits = (cur_msg_bits >> bit_offset) & mask

# Check if the variable is signed and if the data is negative prepend signedness
if od_struct.format.islower() and (1 << (self.length - 1)) < data:
if od_struct.format.islower() and (1 << (self.length - 1)) < data_bits:
# fill up the rest of the bits to get the correct signedness
data = data | (~((1 << self.length) - 1))
data = od_struct.pack(data)
data_bits |= ~mask
data = od_struct.pack(data_bits)
else:
data = self.pdo_parent.data[byte_offset:byte_offset + len(self.od) // 8]
data = self.pdo_parent.data[byte_offset:byte_last]

return data

Expand All @@ -641,27 +646,24 @@ def set_data(self, data: bytes):
logger.debug("Updating %s to %s in %s",
self.name, binascii.hexlify(data), self.pdo_parent.name)

byte_last = -(-(self.offset + self.length) // 8) # rounds up
if bit_offset or self.length % 8:
cur_msg_data = self.pdo_parent.data[byte_offset:byte_offset + len(self.od) // 8]
# Need information of the current variable type (unsigned vs signed)
data_type = self.od.data_type
if data_type == objectdictionary.BOOLEAN:
# A boolean type needs to be treated as an U08
data_type = objectdictionary.UNSIGNED8
od_struct = self.od.STRUCT_TYPES[data_type]
cur_msg_data = od_struct.unpack(cur_msg_data)[0]
# data has to have the same size as old_data
data = od_struct.unpack(data)[0]
# Mask of relevant bits for this mapping, starting at bit 0
mask = (1 << self.length) - 1
# Extract all needed bytes and convert to a number for bit operations
cur_msg_data = self.pdo_parent.data[byte_offset:byte_last]
cur_msg_bits = int.from_bytes(cur_msg_data, byteorder="little")
# Mask out the old data value
# At the end we need to mask for correct variable length (bitwise operation failure)
shifted = (((1 << self.length) - 1) << bit_offset) & ((1 << len(self.od)) - 1)
bitwise_not = (~shifted) & ((1 << len(self.od)) - 1)
cur_msg_data = cur_msg_data & bitwise_not
cur_msg_bits &= ~(mask << bit_offset)
# Convert new value to a number for generic bit operations
data_bits = int.from_bytes(data, byteorder="little") & mask
# Set the new data on the correct position
data = (data << bit_offset) | cur_msg_data
od_struct.pack_into(self.pdo_parent.data, byte_offset, data)
data_bits = (data_bits << bit_offset) | cur_msg_bits

merged_data = data_bits.to_bytes(byte_last - byte_offset, byteorder="little")
self.pdo_parent.data[byte_offset:byte_last] = merged_data
else:
self.pdo_parent.data[byte_offset:byte_offset + len(data)] = data
self.pdo_parent.data[byte_offset:byte_last] = data[0:byte_last]

self.pdo_parent.update()

Expand Down
14 changes: 14 additions & 0 deletions test/sample.eds
Original file line number Diff line number Diff line change
Expand Up @@ -884,6 +884,20 @@ DataType=0x0001
AccessType=rw
PDOMapping=1

[2007]
ParameterName=UNSIGNED16 value
ObjectType=0x7
DataType=0x0006
AccessType=rw
PDOMapping=1

[2008]
ParameterName=UNSIGNED32 value
ObjectType=0x7
DataType=0x0007
AccessType=rw
PDOMapping=1

[2020]
ParameterName=Complex data type
ObjectType=0x7
Expand Down
28 changes: 28 additions & 0 deletions test/test_pdo.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,34 @@ def test_pdo_maps_iterate(self):
pdo = node.tpdo[1]
self.assertEqual(len(pdo), sum(1 for _ in pdo))

def test_pdo_bit_offsets(self):
pdo = self.node.pdo.tx[2]
pdo.add_variable('UNSIGNED8 value', length=4) # byte-aligned, partial byte length
pdo.add_variable('INTEGER8 value') # non-byte-aligned, one whole byte length
pdo.add_variable('UNSIGNED32 value', length=24) # non-aligned, partial last byte
pdo.add_variable('UNSIGNED16 value', length=12) # non-aligned, whole last byte
pdo.add_variable('INTEGER16 value', length=3) # byte-aligned, partial byte length
pdo.add_variable('INTEGER32 value', length=13) # non-aligned, whole last byte

# Write some values
pdo['UNSIGNED8 value'].raw = 3
pdo['INTEGER8 value'].raw = -2
pdo['UNSIGNED32 value'].raw = 0x987654
pdo['UNSIGNED16 value'].raw = 0x321
pdo['INTEGER16 value'].raw = -1
pdo['INTEGER32 value'].raw = -1071

# Check expected data
self.assertEqual(pdo.data, b'\xe3\x4f\x65\x87\x19\x32\x8f\xde')
Comment thread
acolomb marked this conversation as resolved.

# Read values from data
self.assertEqual(pdo['UNSIGNED8 value'].raw, 3)
self.assertEqual(pdo['INTEGER8 value'].raw, -2)
self.assertEqual(pdo['UNSIGNED32 value'].raw, 0x987654)
self.assertEqual(pdo['UNSIGNED16 value'].raw, 0x321)
self.assertEqual(pdo['INTEGER16 value'].raw, -1)
self.assertEqual(pdo['INTEGER32 value'].raw, -1071)

def test_pdo_save(self):
self.node.tpdo.save()
self.node.rpdo.save()
Expand Down
Loading