Conversation
There was a problem hiding this comment.
Code Review
This pull request refactors the import differ logic to return a summary dictionary and updates the validation workflow to handle this new structure. It also introduces a conditional post-processing step for Spanner ingestion, allowing for status updates to 'STAGING' and corresponding filtering in the Spanner client. I have no feedback to provide.
| logging.info("Marking import as SKIP due to no data diff.") | ||
| import_summary.status = ImportStatus.SKIP | ||
| else: | ||
| import_summary.status = ImportStatus.STAGING |
| 'import_version', | ||
| datetime.now(timezone.utc).strftime("%Y-%m-%d")) | ||
| run_ingestion = True | ||
| post_process = attributes.get('post_process', '') |
There was a problem hiding this comment.
Is this a new attribute? How is this used?
The name seems to indicate this is post running import workflow.
Can we rename this to run_process so when set to spanner_ingestion_workflow it is clear that is it only running a dataflow ingestion?
| import_input=import_input, | ||
| absolute_import_dir=absolute_import_dir) | ||
| if differ_summary is not None: | ||
| diff_found = (differ_summary['obs_diff_size'] != 0 or |
There was a problem hiding this comment.
can we use .get() instead of []?
diff_summary.get('obs_diff_size', 0) != 0 or differ_summary,get('schema_diff_size', 0) != 0
Uh oh!
There was an error while loading. Please reload this page.