Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 46 additions & 7 deletions irods/data_object.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import ast

Check failure on line 1 in irods/data_object.py

View workflow job for this annotation

GitHub Actions / ruff-lint / ruff-check

Ruff D100

D100: Missing docstring in public module [pydocstyle:undocumented-public-module]
from datetime import datetime, UTC
import enum
import io
import sys
import logging
import os
import ast
import sys

from irods.models import DataObject
from irods.meta import iRODSMetaCollection
Expand Down Expand Up @@ -41,11 +43,49 @@
return "<{}.{} {}>".format(self.__class__.__module__, self.__class__.__name__, self.resource_name)


class _repl_status(enum.Enum):

Check failure on line 46 in irods/data_object.py

View workflow job for this annotation

GitHub Actions / ruff-lint / ruff-check

Ruff N801

N801: Class name `_repl_status` should use CapWords convention [pep8-naming:invalid-class-name]
STALE_REPLICA, \
GOOD_REPLICA, \
INTERMEDIATE_REPLICA, \
READ_LOCKED, \
WRITE_LOCKED = range(5)


# An ordering of the various replica status values, by descending fitness for use/interface
_REPL_STATUSES = tuple(
getattr(_repl_status, ident).value for ident in (
"GOOD_REPLICA",

Check failure on line 57 in irods/data_object.py

View workflow job for this annotation

GitHub Actions / ruff-lint / ruff-check

Ruff W291

W291: Trailing whitespace [pycodestyle:trailing-whitespace]
"STALE_REPLICA",
"INTERMEDIATE_REPLICA",
"READ_LOCKED",
"WRITE_LOCKED",
)
)

# An appropriate reference datetime value for gauging replica age as part of
# the default sort key in PRC4 and onward.
_REFERENCE_DATETIME = datetime.fromtimestamp(0, UTC)

def _REPLICA_NUMBER_SORT_KEY_FN(row):

Check failure on line 69 in irods/data_object.py

View workflow job for this annotation

GitHub Actions / ruff-lint / ruff-check

Ruff N802

N802: Function name `_REPLICA_NUMBER_SORT_KEY_FN` should be lowercase [pep8-naming:invalid-function-name]
return row[DataObject.replica_number]

Check failure on line 70 in irods/data_object.py

View workflow job for this annotation

GitHub Actions / ruff-lint / ruff-format

Ruff format

Improper formatting

def _REPLICA_FITNESS_SORT_KEY_FN(row):

Check failure on line 72 in irods/data_object.py

View workflow job for this annotation

GitHub Actions / ruff-lint / ruff-check

Ruff N802

N802: Function name `_REPLICA_FITNESS_SORT_KEY_FN` should be lowercase [pep8-naming:invalid-function-name]
repl_status = int(row[DataObject.replica_status])

repl_status_rank = _REPL_STATUSES.index(repl_status) if _REPL_STATUSES.count(repl_status) else sys.maxsize

return (repl_status_rank, _REFERENCE_DATETIME - row[DataObject.modify_time])


_DEFAULT_SORT_KEY_FN = _REPLICA_NUMBER_SORT_KEY_FN


class iRODSDataObject:
def __init__(self, manager, parent=None, results=None):
def __init__(self, manager, parent=None, results=None, replica_sort_function=None):

Check failure on line 84 in irods/data_object.py

View workflow job for this annotation

GitHub Actions / ruff-lint / ruff-check

Ruff D107

D107: Missing docstring in `__init__` [pydocstyle:undocumented-public-init]
self.manager = manager
if parent and results:
self.collection = parent
results = sorted(results, key=(replica_sort_function or _DEFAULT_SORT_KEY_FN))
for attr, value in DataObject.__dict__.items():
if not attr.startswith("_"):
try:
Expand All @@ -54,9 +94,8 @@
# backward compatibility with older schema versions
pass
self.path = self.collection.path + "/" + self.name
replicas = sorted(results, key=lambda r: r[DataObject.replica_number])

# The status quo before iRODS 5
# Copy pre-iRODS 5 fields

replica_args = [
(
Expand All @@ -75,13 +114,13 @@
modify_time=r[DataObject.modify_time],
),
)
for r in replicas
for r in results
]

# Adjust for adding access_time in the iRODS 5 case.

if self.manager.sess.server_version >= (5,):
for n, r in enumerate(replicas):
for n, r in enumerate(results):
replica_args[n][1]['access_time'] = r[DataObject.access_time]
self.replicas = [iRODSReplica(*a, **k) for a, k in replica_args]

Expand Down
33 changes: 19 additions & 14 deletions irods/manager/data_object_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,13 +218,13 @@
if size is not None and isinstance(open_options, dict):
open_options[kw.DATA_SIZE_KW] = size

def _download(self, obj, local_path, num_threads, updatables=(), **options):
def _download(self, obj_path, local_path, num_threads, updatables=(), **options):
"""Transfer the contents of a data object to a local file.

Called from get() when a local path is named.
"""
if os.path.isdir(local_path):
local_file = os.path.join(local_path, irods_basename(obj))
local_file = os.path.join(local_path, irods_basename(obj_path))

Check failure on line 227 in irods/manager/data_object_manager.py

View workflow job for this annotation

GitHub Actions / ruff-lint / ruff-check

Ruff PTH118

PTH118: `os.path.join()` should be replaced by `Path` with `/` operator [flake8-use-pathlib:os-path-join]
else:
local_file = local_path

Check failure on line 229 in irods/manager/data_object_manager.py

View workflow job for this annotation

GitHub Actions / ruff-lint / ruff-check

Ruff SIM108

SIM108: Use ternary operator `local_file = os.path.join(local_path, irods_basename(obj_path)) if os.path.isdir(local_path) else local_path` instead of `if`-`else`-block [flake8-simplify:if-else-block-instead-of-if-exp]

Expand All @@ -233,12 +233,12 @@
raise ex.OVERWRITE_WITHOUT_FORCE_FLAG

data_open_returned_values_ = {}
with self.open(obj, "r", returned_values=data_open_returned_values_, **options) as o:
with self.open(obj_path, "r", returned_values=data_open_returned_values_, **options) as o:
if self.should_parallelize_transfer(num_threads, o, open_options=options.items()):
error = RuntimeError("parallel get failed")
try:
if not self.parallel_get(
(obj, o),
(obj_path, o),
local_file,
num_threads=num_threads,
target_resource_name=options.get(kw.RESC_NAME_KW, ""),
Expand All @@ -265,6 +265,8 @@
"""
parent = self.sess.collections.get(irods_dirname(path))

replica_sort_function = options.pop('replica_sort_function', None)

# TODO: optimize
if local_path:
self._download(path, local_path, num_threads=num_threads, updatables=updatables, **options)
Expand All @@ -284,7 +286,7 @@
results = query.all() # get up to max_rows replicas
if len(results) <= 0:
raise ex.DataObjectDoesNotExist()
return iRODSDataObject(self, parent, results)
return iRODSDataObject(self, parent, results, replica_sort_function=replica_sort_function)

@staticmethod
def _resolve_force_put_option(options, default_setting=None, true_value=""):
Expand All @@ -304,66 +306,68 @@
else:
del options[kw.FORCE_FLAG_KW]

def put(
self,
local_path,
irods_path,
return_data_object=False,
num_threads=DEFAULT_NUMBER_OF_THREADS,
updatables=(),
**options,
):
# Decide if a put option should be used and modify options accordingly.
self._resolve_force_put_option(options, default_setting=client_config.data_objects.force_put_by_default)

if self.sess.collections.exists(irods_path):
obj = iRODSCollection.normalize_path(irods_path, os.path.basename(local_path))
obj_path = iRODSCollection.normalize_path(irods_path, os.path.basename(local_path))
else:
obj = irods_path
if kw.FORCE_FLAG_KW not in options and self.exists(obj):
obj_path = irods_path
if kw.FORCE_FLAG_KW not in options and self.exists(obj_path):
raise ex.OVERWRITE_WITHOUT_FORCE_FLAG
options.pop(kw.FORCE_FLAG_KW, None)

replica_sort_function = options.pop('replica_sort_function', None)

with open(local_path, "rb") as f:
sizelist = []
if self.should_parallelize_transfer(num_threads, f, measured_obj_size=sizelist, open_options=options):
o = deferred_call(self.open, (obj, "w"), options)
o = deferred_call(self.open, (obj_path, "w"), options)
f.close()
error = RuntimeError("parallel put failed")
try:
if not self.parallel_put(
local_path,
(obj, o),
(obj_path, o),
total_bytes=sizelist[0],
num_threads=num_threads,
target_resource_name=options.get(kw.RESC_NAME_KW, "") or options.get(kw.DEST_RESC_NAME_KW, ""),
open_options=options,
updatables=updatables,
):
raise error
except ex.iRODSException as e:
raise e
except BaseException as e:
raise error from e
else:
with self.open(obj, "w", **options) as o:
with self.open(obj_path, "w", **options) as o:
# Set operation type to trigger acPostProcForPut
if kw.OPR_TYPE_KW not in options:
options[kw.OPR_TYPE_KW] = 1 # PUT_OPR
for chunk in chunks(f, self.WRITE_BUFFER_SIZE):
o.write(chunk)
do_progress_updates(updatables, len(chunk))
if kw.ALL_KW in options:
repl_options = options.copy()
repl_options[kw.UPDATE_REPL_KW] = ""
# Leaving REG_CHKSUM_KW set would raise the error:
# Requested to register checksum without verifying, but source replica has a checksum. This can result
# in multiple replicas being marked good with different checksums, which is an inconsistency.
del repl_options[kw.REG_CHKSUM_KW]
self.replicate(obj, **repl_options)
self.replicate(obj_path, **repl_options)

if return_data_object:
return self.get(obj)
return self.get(obj_path, replica_sort_function=replica_sort_function)

Check failure on line 370 in irods/manager/data_object_manager.py

View workflow job for this annotation

GitHub Actions / ruff-lint / ruff-check

Ruff RET503

RET503: Missing explicit `return` at the end of function able to return non-`None` value [flake8-return:implicit-return]

def chksum(self, path, **options):
"""
Expand Down Expand Up @@ -480,6 +484,7 @@
raise ex.DataObjectExistsAtLogicalPath

options = {**options, kw.DATA_TYPE_KW: "generic"}
replica_sort_function = options.pop('replica_sort_function', None)

if resource:
options[kw.DEST_RESC_NAME_KW] = resource
Expand Down Expand Up @@ -508,7 +513,7 @@
desc = response.int_info
conn.close_file(desc)

return self.get(path)
return self.get(path, replica_sort_function=replica_sort_function)

def open_with_FileRaw(self, *arg, **kw_options):
holder = []
Expand Down
71 changes: 60 additions & 11 deletions irods/test/data_obj_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
from irods.test.helpers import iRODSUserLogins
import irods.exception as ex
from irods.column import Criterion
from irods.data_object import chunks, irods_dirname
from irods.data_object import chunks, irods_dirname, _REPLICA_FITNESS_SORT_KEY_FN
import irods.test.helpers as helpers
import irods.test.modules as test_modules
import irods.keywords as kw
Expand Down Expand Up @@ -1253,8 +1253,7 @@

# assertions on replicas
self.assertEqual(len(obj.replicas), number_of_replicas)
for i, replica in enumerate(obj.replicas):
self.assertEqual(replica.number, i)
self.assertEqual({repl.number for repl in obj.replicas}, {*range(len(obj.replicas))})

# now trim odd-numbered replicas
# note (see irods/irods#4861): COPIES_KW might disappear in the future
Expand All @@ -1267,10 +1266,7 @@
obj = session.data_objects.get(obj_path)

# check remaining replica numbers
replica_numbers = []
for replica in obj.replicas:
replica_numbers.append(replica.number)
self.assertEqual(replica_numbers, [0, 2, 4, 6])
self.assertEqual({r.number for r in obj.replicas}, {0, 2, 4, 6})

# remove object
obj.unlink(force=True)
Expand Down Expand Up @@ -1728,11 +1724,12 @@
self.assertIsNotNone(obj.replicas[1].__getattribute__(i))

# ensure replica info is sensible
replicas = sorted(obj.replicas, key=lambda repl: repl.number)
for i in range(2):
self.assertEqual(obj.replicas[i].number, i)
self.assertEqual(obj.replicas[i].status, "1")
self.assertEqual(obj.replicas[i].path.split("/")[-1], filename)
self.assertEqual(obj.replicas[i].resc_hier.split(";")[-1], ufs_resources[i].name)
self.assertEqual(replicas[i].number, i)
self.assertEqual(replicas[i].status, "1")
self.assertEqual(replicas[i].path.split("/")[-1], filename)
self.assertEqual(replicas[i].resc_hier.split(";")[-1], ufs_resources[i].name)

self.assertEqual(obj.replicas[0].resource_name, ufs_resources[0].name)
if self.sess.server_version < (4, 2, 0):
Expand Down Expand Up @@ -2992,6 +2989,58 @@

test_put__issue_722(self)

def test_modified_sorting_of_replicas__issue_746(self):
basename = unique_name(my_function_name(), datetime.now()) + '_dataobj_647'
with self.create_simple_resc() as newResc1, self.create_simple_resc() as newResc2:
data = helpers.make_object(self.sess, f'{helpers.home_collection(self.sess)}/{basename}')

# Precondition for an eventual total of 3 replicas: initial data replica is not on either of the new resources.
self.assertFalse({repl.resource_name for repl in data.replicas} & {newResc1, newResc2})
try:
data.replicate(resource=newResc1)

# Ensure that one of the replicas is stale, to test proper sorting.
with data.open('a', **{kw.RESC_NAME_KW: newResc1}) as f:
f.write(b'.')

# Sleep to ensure different replica modify timestamps.
time.sleep(2)
Comment thread
d-w-moore marked this conversation as resolved.

data.replicate(resource=newResc2)

# At this point, there should ensure exactly two good replicas of the three.
# Assert exactly one replica is stale, to corroborate
data = self.sess.data_objects.get(
data.path,
replica_sort_function=lambda row: int(row[DataObject.replica_status])
)
self.assertEqual(
[repl.status for repl in data.replicas],
['0', '1', '1']
)

Check failure on line 3020 in irods/test/data_obj_test.py

View workflow job for this annotation

GitHub Actions / ruff-lint / ruff-format

Ruff format

Improper formatting

# Get a data object with the PRC3-default sort order. Ordering is expected to
# be ascending by replica number.
if irods.version.version_as_tuple() < (4,):
data = self.sess.data_objects.get(data.path)
for i, repl in enumerate(data.replicas):
self.assertEqual(repl.number, i)

options = {}
if irods.version.version_as_tuple() < (4,):
options['replica_sort_function'] = _REPLICA_FITNESS_SORT_KEY_FN

# Get a data object with the PRC3-alternative/PRC4-default sort order.
data = self.sess.data_objects.get(data.path, **options)

# Test default replica sorting.
self.assertEqual(data.replicas[0].status, '1')
self.assertEqual(data.replicas[0].modify_time, data.modify_time)
self.assertGreater(data.replicas[0].modify_time, data.replicas[1].modify_time)
finally:
if data:
data.unlink(force=True)


if __name__ == "__main__":
# let the tests find the parent irods lib
Expand Down
Loading