Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 25 additions & 16 deletions canopen/pdo/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,23 +40,32 @@ def __iter__(self):
return iter(self.map)

def __getitem__(self, key: Union[int, str]):
if isinstance(key, int):
if key == 0:
raise KeyError("PDO index zero requested for 1-based sequence")
if (
0 < key <= 512 # By PDO Index
or 0x1600 <= key <= 0x17FF # By RPDO ID (512)
or 0x1A00 <= key <= 0x1BFF # By TPDO ID (512)
):
return self.map[key]
for pdo_map in self.map.values():
try:
return pdo_map[key]
except KeyError:
# ignore if one specific PDO does not have the key and try the next one
continue
if isinstance(key, int):
if key == 0:
raise KeyError("PDO index zero requested for 1-based sequence")
if 0 < key <= 512:
# Sequential 1-based index, direct lookup
return self.map[key]
elif 0x1400 <= key <= 0x15FF:
# RPDO communication parameter records (CiA 301)
return self.map[(key - 0x1400) + 1]
elif 0x1600 <= key <= 0x17FF:
# RPDO mapping parameter records (CiA 301)
return self.map[(key - 0x1600) + 1]
elif 0x1800 <= key <= 0x19FF:
# TPDO communication parameter records (CiA 301)
return self.map[(key - 0x1800) + 1]
elif 0x1A00 <= key <= 0x1BFF:
# TPDO mapping parameter records (CiA 301)
return self.map[(key - 0x1A00) + 1]
raise KeyError(f"PDO: {key} was not found in any map")

for pdo_map in self.map.values():
try:
return pdo_map[key]
except KeyError:
continue
raise KeyError(f"PDO: {key} was not found in any map")

def __len__(self):
return len(self.map)

Expand Down
14 changes: 12 additions & 2 deletions test/test_pdo.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,18 @@ def test_pdo_map_getitem(self):
self.assertEqual(pdo['INTEGER32 value'].raw, 0x01020304)
self.assertEqual(pdo['BOOLEAN value'].raw, False)
self.assertEqual(pdo['BOOLEAN value 2'].raw, True)

def test_pdo_getitem(self):
def test_pdo_access_by_index(network):
node = network[1]
# Should work by sequential index
assert node.rpdo[1] is not None
# Should also work by CiA 301 mapping record index
assert node.rpdo[0x1600] is node.rpdo[1]
# And by communication record index (the flaw in PR #613)
assert node.rpdo[0x1400] is node.rpdo[1]
# Same for TPDO
assert node.tpdo[0x1A00] is node.tpdo[1]
assert node.tpdo[0x1800] is node.tpdo[1]
def test_pdo_getitem(self):
node = self.node
self.assertEqual(node.tpdo[1]['INTEGER16 value'].raw, -3)
self.assertEqual(node.tpdo[1]['UNSIGNED8 value'].raw, 0xf)
Expand Down