-
Notifications
You must be signed in to change notification settings - Fork 167
feat(experimental): add write resumption strategy #1663
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Summary of ChangesHello @Pulkit0110, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the reliability of asynchronous bidirectional streaming operations by introducing a comprehensive write resumption strategy and a generic retry manager. This refactoring centralizes error handling and retry logic, making both read and write operations more resilient to transient failures and network interruptions. The changes ensure that data transfers can seamlessly resume from the last known good state, improving the overall robustness of the system. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a generic retry manager for bidi streams and applies it to read operations, which is a great improvement for resiliency. It also adds a resumption strategy for write operations. The overall structure is well-designed and the new tests are comprehensive. I've found one critical issue that will cause a TypeError at runtime due to a method being called with an unexpected argument. I've also included a couple of medium-severity comments to improve documentation clarity and maintainability in the new strategy classes.
google/cloud/storage/_experimental/asyncio/async_multi_range_downloader.py
Show resolved
Hide resolved
google/cloud/storage/_experimental/asyncio/retry/reads_resumption_strategy.py
Show resolved
Hide resolved
google/cloud/storage/_experimental/asyncio/retry/writes_resumption_strategy.py
Show resolved
Hide resolved
7d63612 to
8c2a143
Compare
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a write resumption strategy for handling bidi write operations, which is a valuable addition for improving error handling in experimental async features. The implementation is well-structured and accompanied by a comprehensive set of unit tests that cover various scenarios, including initial uploads, resumption, and failure recovery. My feedback focuses on enhancing type safety, improving code clarity by addressing a potentially unused flag, and increasing the robustness of the tests by using mocks instead of None.
google/cloud/storage/_experimental/asyncio/retry/writes_resumption_strategy.py
Outdated
Show resolved
Hide resolved
google/cloud/storage/_experimental/asyncio/retry/writes_resumption_strategy.py
Outdated
Show resolved
Hide resolved
|
/gemini summary |
Summary of ChangesThis pull request introduces an experimental write resumption strategy for asynchronous bidirectional write operations within the Google Cloud Storage client library. This new strategy significantly enhances the reliability of data uploads by providing robust mechanisms to handle errors, track the state of ongoing writes, and seamlessly resume operations from the last successfully persisted byte, thereby improving data integrity and resilience against transient network issues or server redirects. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
|
google/cloud/storage/_experimental/asyncio/retry/writes_resumption_strategy.py
Outdated
Show resolved
Hide resolved
| Attributes: | ||
| spec (AppendObjectSpec): The specification for the object to write. | ||
| chunk_size (int): The size of chunks to read from the buffer. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to read ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| write_state.write_handle = response.write_handle | ||
|
|
||
| if response.resource: | ||
| write_state.is_complete = True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
resource is obtained only when object is finalized. Finalization != closing stream (or finishing uploading for a particular session)
as per the doc string is_complete (bool): Whether the upload has finished .
What do you mean here , when you set is_complete to True?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It means the upload is complete and the object is finalized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why don't we use the same terminology as backend ? is_finalized ?
otherwise it'll create confusion. ( already we've a lot of terms, finalize / close etc)
google/cloud/storage/_experimental/asyncio/retry/writes_resumption_strategy.py
Show resolved
Hide resolved
| self.assertEqual(requests[3].checksummed_data.content, b"89") | ||
|
|
||
| self.assertEqual(requests[4].write_offset, 10) | ||
| self.assertTrue(requests[4].finish_write) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it shouldn't always be true.
It should be true only when user explicitly provides finalize_on_close=True
same commment for all other tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| def test_generate_requests_resumption(self): | ||
| """ | ||
| Verify request sequence when resuming an upload. | ||
| - First request is AppendObjectSpec with write_handle and state_lookup=True. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why state_lookup=True ? . If it's to fetch persisted_size then AFAIR, when opening a bidi-stream to write with write_handle persisted_size is obtained in first BidiWriteObjectResponse message
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that opening the stream with write_handle guarantees to return the persisted_size. state_lookup makes sure that the persisted_size is always returned. Also, state_lookup will be passed while opening the stream with write_handle, so there won't be any additional request.
| write_state.is_complete = True | ||
| write_state.persisted_size = response.resource.size | ||
|
|
||
| async def recover_state_on_failure( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(maybe little late to notice) - why async def ? there's no await anywhere here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I'll keep it for now. Once the implementation is complete. I'll change it accordingly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why ? are you worried of failing tests ?
| write_state = state["write_state"] | ||
| write_state.persisted_size = 2048 | ||
|
|
||
| response = storage_type.BidiWriteObjectResponse(persisted_size=1024) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when this scenario will happen ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When there's out of order or delayed response from the server. For example, a response confirming 2048 bytes have been persisted might arrive before a delayed response that confirms only 1024 bytes were persisted at an earlier point in time.
| write_state.is_complete = True | ||
| yield storage_type.BidiWriteObjectRequest( | ||
| write_offset=write_state.bytes_sent, | ||
| finish_write=True, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this way object is always finalized after doing writer.append , right ?
we should not finalize. We should keep the object in unfinalize state always unless users explicitly specifies.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| yield storage_type.BidiWriteObjectRequest( | ||
| append_object_spec=write_state.spec, state_lookup=do_state_lookup | ||
|
|
||
| # Determine if we need to send WriteObjectSpec or AppendObjectSpec |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this comment should be on top of L82, right?
| write_state.write_handle = response.write_handle | ||
|
|
||
| if response.resource: | ||
| write_state.is_complete = True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why don't we use the same terminology as backend ? is_finalized ?
otherwise it'll create confusion. ( already we've a lot of terms, finalize / close etc)
| write_state.is_complete = True | ||
| write_state.persisted_size = response.resource.size | ||
|
|
||
| async def recover_state_on_failure( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why ? are you worried of failing tests ?
|
|
||
| def __init__( | ||
| self, | ||
| spec: storage_type.AppendObjectSpec, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed offline, it could be both storage_type.AppendObjectSpec | storage_type.WriteObjectSpec
WriteObjectSpec in the first requests.
|
|
||
| # Initial request of the stream must provide the specification. | ||
| # If we have a write_handle, we request a state lookup to verify persisted offset. | ||
| do_state_lookup = write_state.write_handle is not None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: is_state_lookup_required is a good variable name in my opinion.
| write_state: _WriteState = state["write_state"] | ||
|
|
||
| if response.persisted_size is not None: | ||
| if response.persisted_size > write_state.persisted_size: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this check is not required.
| write_state.write_handle = response.write_handle | ||
|
|
||
| if response.resource: | ||
| write_state.is_complete = True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Important: resource is also returned on the first response, in both Write & Append object spec. (see internal code link , sent via chat)
| write_state.write_handle = response.write_handle | ||
|
|
||
| if response.resource: | ||
| write_state.is_complete = True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can check if an object is finalized or not by checking the presence of https://github.com/googleapis/googleapis/blob/9a477cd3c26a704130e2a2fb44a40281d9312e4c/google/storage/v2/storage.proto#L2947
| if hasattr(cause, "write_handle") and cause.write_handle: | ||
| write_state.write_handle = cause.write_handle | ||
| redirect_handle = getattr(cause, "write_handle", None) | ||
| if redirect_handle: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(note/things to keep in mind while integrating with AppendableWriter) to fetch write_handle, you may have to do something similar to reads's _handle_redirect
| else: | ||
| initial_request.append_object_spec = write_state.spec | ||
|
|
||
| yield initial_request |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this doesn't handle the 'open' scenario , so please add a comment for that.
Adding writes resumption strategy which will be used for error handling of bidi writes operation.