-
Notifications
You must be signed in to change notification settings - Fork 1.7k
feat(auth): Add blocking Regional Access Boundary Lookup and Seed Support #16720
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
5c3c331
c6c140f
b0163b1
17a1c3d
78bd7e6
9bbba81
5a8ad8b
87880a6
ac15926
1872dd1
28541d4
d2258f4
a70c23f
bf41e71
4e2bc63
d4ab1eb
2f53d4f
53bb27e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,11 +20,15 @@ | |
| import logging | ||
| import os | ||
| import threading | ||
| from typing import NamedTuple, Optional | ||
| from typing import NamedTuple, Optional, TYPE_CHECKING | ||
|
|
||
| from google.auth import _helpers | ||
| from google.auth import environment_vars | ||
|
|
||
| if TYPE_CHECKING: | ||
| import google.auth.credentials | ||
| import google.auth.transport | ||
|
|
||
| _LOGGER = logging.getLogger(__name__) | ||
|
|
||
|
|
||
|
|
@@ -97,6 +101,7 @@ def __init__(self): | |
| ) | ||
| self.refresh_manager = _RegionalAccessBoundaryRefreshManager() | ||
| self._update_lock = threading.Lock() | ||
| self._use_blocking_regional_access_boundary_lookup = False | ||
|
|
||
| def __getstate__(self): | ||
| """Pickle helper that serializes the _update_lock attribute.""" | ||
|
|
@@ -109,6 +114,43 @@ def __setstate__(self, state): | |
| self.__dict__.update(state) | ||
| self._update_lock = threading.Lock() | ||
|
|
||
| def __eq__(self, other): | ||
| """Checks if two managers are equal.""" | ||
| if not isinstance(other, _RegionalAccessBoundaryManager): | ||
| return NotImplemented | ||
| return ( | ||
| self._data == other._data | ||
| and self._use_blocking_regional_access_boundary_lookup | ||
| == other._use_blocking_regional_access_boundary_lookup | ||
| ) | ||
|
|
||
| def enable_blocking_lookup(self): | ||
| """Enables blocking Regional Access Boundary lookup. | ||
|
|
||
| When enabled, the Regional Access Boundary lookup will be performed | ||
| synchronously in the calling thread instead of asynchronously in a | ||
| background thread. | ||
| """ | ||
| self._use_blocking_regional_access_boundary_lookup = True | ||
|
|
||
| def set_initial_regional_access_boundary(self, encoded_locations=None, expiry=None): | ||
| """Manually sets the regional access boundary to the client provided seed. | ||
|
|
||
| Args: | ||
| encoded_locations (Optional[str]): The encoded locations string. | ||
| expiry (Optional[datetime.datetime]): The expiry time for the boundary. | ||
| If encoded_locations is not provided, expiry is ignored. | ||
| """ | ||
| if not encoded_locations: | ||
| expiry = None | ||
|
|
||
| self._data = _RegionalAccessBoundaryData( | ||
| encoded_locations=encoded_locations, | ||
| expiry=expiry, | ||
| cooldown_expiry=None, | ||
| cooldown_duration=DEFAULT_REGIONAL_ACCESS_BOUNDARY_COOLDOWN, | ||
| ) | ||
|
|
||
| def apply_headers(self, headers): | ||
| """Applies the Regional Access Boundary header to the provided dictionary. | ||
|
|
||
|
|
@@ -151,48 +193,50 @@ def maybe_start_refresh(self, credentials, request): | |
| return | ||
|
|
||
| # If all checks pass, start the background refresh. | ||
| self.refresh_manager.start_refresh(credentials, request, self) | ||
|
|
||
|
|
||
| class _RegionalAccessBoundaryRefreshThread(threading.Thread): | ||
| """Thread for background refreshing of the Regional Access Boundary.""" | ||
| if self._use_blocking_regional_access_boundary_lookup: | ||
| self.start_blocking_refresh(credentials, request) | ||
| else: | ||
| self.refresh_manager.start_refresh(credentials, request, self) | ||
|
|
||
| def __init__(self, credentials, request, rab_manager): | ||
| super().__init__() | ||
| self.daemon = True | ||
| self._credentials = credentials | ||
| self._request = request | ||
| self._rab_manager = rab_manager | ||
| def start_blocking_refresh(self, credentials, request): | ||
| """Initiates a blocking lookup of the Regional Access Boundary. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: the docstring should probably mention what happens on an exception |
||
|
|
||
| def run(self): | ||
| """ | ||
| Performs the Regional Access Boundary lookup and updates the state. | ||
| If the lookup raises an exception, it is caught and logged as a warning, | ||
| and the lookup is treated as a failure (entering cooldown). Exceptions | ||
| are not propagated to the caller. | ||
|
|
||
| This method is run in a separate thread. It delegates the actual lookup | ||
| to the credentials object's `_lookup_regional_access_boundary` method. | ||
| Based on the lookup's outcome (success or complete failure after retries), | ||
| it updates the cached Regional Access Boundary information, | ||
| its expiry, its cooldown expiry, and its exponential cooldown duration. | ||
| Args: | ||
| credentials (google.auth.credentials.Credentials): The credentials to refresh. | ||
| request (google.auth.transport.Request): The object used to make HTTP requests. | ||
| """ | ||
| # Catch exceptions (e.g., from the underlying transport) to prevent the | ||
| # background thread from crashing. This ensures we can gracefully enter | ||
| # an exponential cooldown state on failure. | ||
| try: | ||
| # The fail_fast parameter is set to True to ensure we don't block the calling | ||
| # thread for too long. This will do two things: 1) set a timeout to 3s | ||
| # instead of the default 120s and 2) ensure we do not retry at all | ||
| regional_access_boundary_info = ( | ||
| self._credentials._lookup_regional_access_boundary(self._request) | ||
| credentials._lookup_regional_access_boundary(request, fail_fast=True) | ||
| ) | ||
| except Exception as e: | ||
| if _helpers.is_logging_enabled(_LOGGER): | ||
| _LOGGER.warning( | ||
| "Asynchronous Regional Access Boundary lookup raised an exception: %s", | ||
| "Blocking Regional Access Boundary lookup raised an exception: %s", | ||
| e, | ||
| exc_info=True, | ||
| ) | ||
| regional_access_boundary_info = None | ||
|
|
||
| with self._rab_manager._update_lock: | ||
| self.process_regional_access_boundary_info(regional_access_boundary_info) | ||
|
|
||
| def process_regional_access_boundary_info(self, regional_access_boundary_info): | ||
| """Processes the regional access boundary info and updates the state. | ||
|
|
||
| Args: | ||
| regional_access_boundary_info (Optional[Mapping[str, str]]): The regional access | ||
| boundary info to process. | ||
| """ | ||
| with self._update_lock: | ||
| # Capture the current state before calculating updates. | ||
| current_data = self._rab_manager._data | ||
| current_data = self._data | ||
|
|
||
| if regional_access_boundary_info: | ||
| # On success, update the boundary and its expiry, and clear any cooldown. | ||
|
|
@@ -206,14 +250,12 @@ def run(self): | |
| cooldown_duration=DEFAULT_REGIONAL_ACCESS_BOUNDARY_COOLDOWN, | ||
| ) | ||
| if _helpers.is_logging_enabled(_LOGGER): | ||
| _LOGGER.debug( | ||
| "Asynchronous Regional Access Boundary lookup successful." | ||
| ) | ||
| _LOGGER.debug("Regional Access Boundary lookup successful.") | ||
| else: | ||
| # On failure, calculate cooldown and update state. | ||
| if _helpers.is_logging_enabled(_LOGGER): | ||
| _LOGGER.warning( | ||
| "Asynchronous Regional Access Boundary lookup failed. Entering cooldown." | ||
| "Regional Access Boundary lookup failed. Entering cooldown." | ||
| ) | ||
|
|
||
| next_cooldown_expiry = ( | ||
|
|
@@ -241,7 +283,53 @@ def run(self): | |
| ) | ||
|
|
||
| # Perform the atomic swap of the state object. | ||
| self._rab_manager._data = updated_data | ||
| self._data = updated_data | ||
|
|
||
|
|
||
| class _RegionalAccessBoundaryRefreshThread(threading.Thread): | ||
| """Thread for background refreshing of the Regional Access Boundary.""" | ||
|
|
||
| def __init__( | ||
| self, | ||
| credentials: "google.auth.credentials.CredentialsWithRegionalAccessBoundary", # noqa: F821 | ||
| request: "google.auth.transport.Request", # noqa: F821 | ||
| rab_manager: "_RegionalAccessBoundaryManager", | ||
| ): | ||
| super().__init__() | ||
| self.daemon = True | ||
| self._credentials = credentials | ||
| self._request = request | ||
| self._rab_manager = rab_manager | ||
|
|
||
| def run(self): | ||
| """ | ||
| Performs the Regional Access Boundary lookup and updates the state. | ||
|
|
||
| This method is run in a separate thread. It delegates the actual lookup | ||
| to the credentials object's `_lookup_regional_access_boundary` method. | ||
| Based on the lookup's outcome (success or complete failure after retries), | ||
| it updates the cached Regional Access Boundary information, | ||
| its expiry, its cooldown expiry, and its exponential cooldown duration. | ||
| """ | ||
| # Catch exceptions (e.g., from the underlying transport) to prevent the | ||
| # background thread from crashing. This ensures we can gracefully enter | ||
| # an exponential cooldown state on failure. | ||
| try: | ||
| regional_access_boundary_info = ( | ||
| self._credentials._lookup_regional_access_boundary(self._request) | ||
| ) | ||
| except Exception as e: | ||
| if _helpers.is_logging_enabled(_LOGGER): | ||
| _LOGGER.warning( | ||
| "Asynchronous Regional Access Boundary lookup raised an exception: %s", | ||
| e, | ||
| exc_info=True, | ||
| ) | ||
| regional_access_boundary_info = None | ||
|
|
||
| self._rab_manager.process_regional_access_boundary_info( | ||
| regional_access_boundary_info | ||
| ) | ||
|
|
||
|
|
||
| class _RegionalAccessBoundaryRefreshManager(object): | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -361,6 +361,40 @@ def _copy_regional_access_boundary_manager(self, target): | |
| new_manager._data = self._rab_manager._data | ||
| target._rab_manager = new_manager | ||
|
|
||
| def _with_regional_access_boundary(self, seed): | ||
| """Returns a copy of these credentials with the the regional_access_boundary | ||
| set to the provided seed. This is intended for internal use only as invalid | ||
| seeds would produce unexpected results until automatic recovery is supported. | ||
| Currently this is used by the gcloud CLI and therefore changes to the | ||
| contract MUST be backwards compatible (e.g. the method signature must be | ||
| unchanged and a copy of the credenials with the RAB set must be returned). | ||
|
|
||
|
|
||
| Returns: | ||
| google.auth.credentials.Credentials: A new credentials instance. | ||
| """ | ||
| creds = self._make_copy() | ||
| creds._rab_manager.set_initial_regional_access_boundary( | ||
| encoded_locations=seed.get("encodedLocations", None), | ||
| expiry=seed.get("expiry", None), | ||
| ) | ||
| return creds | ||
|
|
||
| def _with_blocking_regional_access_boundary_lookup(self): | ||
| """Returns a copy of these credentials with the blocking lookup mode enabled. | ||
| This is intended for internal use only as blocking lookup requires additional | ||
| care and consideration. Currently this is used by the gcloud CLI and | ||
| therefore changes to the contract MUST be backwards compatible (e.g. the | ||
| method signature must be unchanged and a copy of the credentials with the | ||
| blocking lookup flag set to true must be returned). | ||
|
|
||
| Returns: | ||
| google.auth.credentials.Credentials: A new credentials instance. | ||
| """ | ||
| creds = self._make_copy() | ||
| creds._rab_manager.enable_blocking_lookup() | ||
| return creds | ||
|
|
||
| def _maybe_start_regional_access_boundary_refresh(self, request, url): | ||
| """ | ||
| Starts a background thread to refresh the Regional Access Boundary if needed. | ||
|
|
@@ -421,11 +455,16 @@ def before_request(self, request, method, url, headers): | |
| """Refreshes the access token and triggers the Regional Access Boundary | ||
| lookup if necessary. | ||
| """ | ||
| super(CredentialsWithRegionalAccessBoundary, self).before_request( | ||
|
nbayati marked this conversation as resolved.
|
||
| request, method, url, headers | ||
| ) | ||
| if self._use_non_blocking_refresh: | ||
| self._non_blocking_refresh(request) | ||
| else: | ||
| self._blocking_refresh(request) | ||
|
Comment on lines
+458
to
+461
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This implementation of Instead of duplicating the refresh logic, you should call super(CredentialsWithRegionalAccessBoundary, self).before_request(
request, method, url, headers
)
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is worth confirming, but from my understanding, I'm not sure if this is an issue. CredentialsWithRegionalAccessBoundary inherits from Credentials, so it should have those internal attributes. It does seem a little dangerous that before_request seems to be mostly copy/pasted from the superclass here though. I wonder if that can be cleaned up somehow? I could see these drifting over time
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes I think Gemini's assessment is incorrect. We had to duplicate the parent's logic here because we needed to ensure that the RAB lookup (which might be blocking) happens after the access token is refreshed but before headers are applied to the request. Credentials.before_request does both (refresh and apply) in one go. I agree with the concern that they might drift over time. I think this can be refactored in the future and extract the common logic into a helper method. Would you be ok with us merging it as-is now, and doing the refactoring in the future? |
||
|
|
||
| self._maybe_start_regional_access_boundary_refresh(request, url) | ||
|
|
||
| metrics.add_metric_header(headers, self._metric_header_for_usage()) | ||
| self.apply(headers) | ||
|
|
||
| def refresh(self, request): | ||
| """Refreshes the access token. | ||
|
|
||
|
|
@@ -435,13 +474,16 @@ def refresh(self, request): | |
| self._perform_refresh_token(request) | ||
|
|
||
| def _lookup_regional_access_boundary( | ||
| self, request: "google.auth.transport.Request" # noqa: F821 | ||
| self, | ||
| request: "google.auth.transport.Request", # noqa: F821 | ||
| fail_fast: bool = False, | ||
| ) -> "Optional[Dict[str, str]]": | ||
| """Calls the Regional Access Boundary lookup API to retrieve the Regional Access Boundary information. | ||
|
|
||
| Args: | ||
| request (google.auth.transport.Request): The object used to make | ||
| HTTP requests. | ||
| fail_fast (bool): Whether the lookup should fail fast (short timeout, no retries). | ||
|
|
||
| Returns: | ||
| Optional[Dict[str, str]]: The Regional Access Boundary information returned by the lookup API, or None if the lookup failed. | ||
|
|
@@ -456,7 +498,9 @@ def _lookup_regional_access_boundary( | |
| headers: Dict[str, str] = {} | ||
| self._apply(headers) | ||
| self._rab_manager.apply_headers(headers) | ||
| return _client._lookup_regional_access_boundary(request, url, headers=headers) | ||
| return _client._lookup_regional_access_boundary( | ||
| request, url, headers=headers, fail_fast=fail_fast | ||
| ) | ||
|
|
||
| @abc.abstractmethod | ||
| def _build_regional_access_boundary_lookup_url( | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this naming could get very confusing, since this is a state-changing function with an (almost) identical name to an internal data attribute. And the docstring isn't helpful
Can we change the function name to something like
enable_blocking_lookup? And make the docstring explain what happens when this is enabled?