-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Revamp component model stream/future host API (again) #11515
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Revamp component model stream/future host API (again) #11515
Conversation
f542163 to
f45b3d0
Compare
This changes the host APIs for dealing with futures and streams from a
"rendezvous"-style API to a callback-oriented one.
Previously you would create e.g. a `StreamReader`/`StreamWriter` pair and call
their `read` and `write` methods, respectively, and those methods would return
`Future`s that resolved when the operation was matched with a corresponding
`write` or `read` operation on the other end.
With the new API, you instead provide a `StreamProducer` trait implementation
whe creating the stream, whose `produce` method will be called as soon as a read
happens, giving the implementation a chance to respond immediately without
making the reader wait for a rendezvous. Likewise, you can match the read end
of a stream to a `StreamConsumer` to respond immediately to writes. This model
should reduce scheduling overhead and make it easier to e.g. pipe items to/from
`AsyncWrite`/`AsyncRead` or `Sink`/`Stream` implementations without needing to
explicitly spawn background tasks. In addition, the new API provides direct
access to guest read and write buffers for `stream<u8>` operations, enabling
zero-copy operations.
Other changes:
- I've removed the `HostTaskOutput`; we were using it to run extra code with
access to the store after a host task completes, but we can do that more
elegantly inside the future using `tls::get`. This also allowed me to
simplify `Instance::poll_until` a bit.
- I've removed the `watch_{reader,writer}` functionality; it's not needed now
given that the runtime will automatically dispose of the producer or consumer
when the other end of the stream or future is closed -- no need for embedder
code to manage that.
- In order to make `UntypedWriteBuffer` `Send`, I had to wrap its raw pointer
`buf` field in a `SendSyncPtr`.
- I've removed `{Future,Stream}Writer` entirely and moved
`Instance::{future,stream}` to `{Future,Stream}Reader::new`, respectively.
- I've added a bounds check to the beginnings of `Instance::guest_read` and
`Instance::guest_write` so that we need not do it later in
`Guest{Source,Destination}::remaining`, meaning those functions can be
infallible.
Note that I haven't updated `wasmtime-wasi` yet to match; that will happen in
one or more follow-up commits.
Signed-off-by: Joel Dice <joel.dice@fermyon.com>
f45b3d0 to
548e398
Compare
alexcrichton
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it worth it to resolve the zero-length read/write TODOs in the code before landing? We don't have many users of that right now so it also seems ok to defer that too.
Otherwise though this all seems reasonable to me, although I'm mostly relying on tests. The interfaces we've talked about historically and I think are ok to land. I'm also happy to help out with the porting of wasmtime-wasi later this afternoon
Yup, I'll do that. Testing it will be mildly tedious, but might as well bite the bullet.
@rvolosatovs is planning to take a crack at updating |
crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs
Outdated
Show resolved
Hide resolved
Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net>
I've added a test to cover this; it also tests direct buffer access for `stream<u8>`, which I realized I forgot to cover earlier. And of course there was a bug 🤦. Signed-off-by: Joel Dice <joel.dice@fermyon.com>
crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs
Outdated
Show resolved
Hide resolved
crates/wasmtime/src/runtime/component/concurrent/futures_and_streams.rs
Outdated
Show resolved
Hide resolved
This can help `Stream{Producer,Consumer}` implementations determine how many
items to write or read, respectively.
Signed-off-by: Joel Dice <joel.dice@fermyon.com>
Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net>
crates/test-programs/src/bin/p3_sockets_tcp_sample_application.rs
Outdated
Show resolved
Hide resolved
Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net>
Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net>
| Ok(Ok(..)) if self.buffer.is_empty() => return Ok(StreamState::Open), | ||
| Ok(Ok(n)) => { | ||
| let mut buf = &self.buffer[n..]; | ||
| while !buf.is_empty() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that the consumer must loop here to ensure that all of the data already read from the guest has been written - otherwise, if the consumer were to buffer, how would the guest know that a partial write occurred?
It seems that what we really want is to have a way for the consumer to report back the amount of elements actually read, replicating something like https://doc.rust-lang.org/nightly/std/io/trait.Read.html#tymethod.read
Given the existing API, it looks like the consumer should be able to return the buffer, just like the Destination::write would currently do.
This is, of course, already addressed for byte-buffers originating from the guest, but this seems problematic in general case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I can see how that could be awkward -- you read items from the Source and then try to write them to some kind of sink, but that sink might not be immediately ready to accept all the items. In that case do you just .await until the sink has accepted them all? If you do, then you're kind of blocking the original writer more than you should -- you'd rather just say you didn't read all the items and let the writer write them again if and when it wants to. Also, what if the sink closes before you can write them all?
So yeah, I think we might need to change this API as well as the StreamProducer one.
Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net>
Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net>
Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net>
Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net>
Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net>
|
All existing WASI tests pass now for me locally. I'm a bit surprised that it looks like CI does not run on this PR anymore? I've removed the reuseaddr test workaround, since this change set should remove the need for it and close #11342 |
|
The main remaining item left here is the cancellation-safety of I will meanwhile work on migrating #11440 to this API |
|
On a second thought, I'm guessing that "lying" about the stream being open is probably not the expected behavior, so I'll work on driving I/O in these and buffering first |
Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net>
Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net>
Note, that this removes the read optimization - let's get the implementation complete first and optimize later Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net>
Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net>
Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net>
Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net>
Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net>
Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net>
Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net>
Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net>
Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net>
|
I'm signing off for today and have just finished the WASI crate adaptation a few minutes ago. I have not even really proof-read, for example, the That all said, all tests pass (at least locally, will have to wait for CI) and IMO this is "good enough" to get this PR merged. Feel free to clean-up as desired, otherwise I will do that tomorrow. |
|
I talked with @dicej about this and we concluded that let's go ahead and land this. I've got follow-up feedback and Joel's got some follow-up implementation work but we feel it's best to land this and iterate rather than continuing to block this. The hope is that by landing this @rvolosatovs you're more-or-less unblocke to continue to work on wasi-http while we continue to smith some details here in parallel |
| dst: &'a mut Destination<'a, Self::Item, Self::Buffer>, | ||
| finish: bool, | ||
| ) -> Poll<wasmtime::Result<StreamResult>> { | ||
| if let Some(task) = self.task.as_mut() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm going to look into restructuring this function to hopefully not need duplication in this match here and the one far below after the spawn, but that's just refactoring a bit.
…ce#11515) * Revamp component model stream/future host API (again) This changes the host APIs for dealing with futures and streams from a "rendezvous"-style API to a callback-oriented one. Previously you would create e.g. a `StreamReader`/`StreamWriter` pair and call their `read` and `write` methods, respectively, and those methods would return `Future`s that resolved when the operation was matched with a corresponding `write` or `read` operation on the other end. With the new API, you instead provide a `StreamProducer` trait implementation whe creating the stream, whose `produce` method will be called as soon as a read happens, giving the implementation a chance to respond immediately without making the reader wait for a rendezvous. Likewise, you can match the read end of a stream to a `StreamConsumer` to respond immediately to writes. This model should reduce scheduling overhead and make it easier to e.g. pipe items to/from `AsyncWrite`/`AsyncRead` or `Sink`/`Stream` implementations without needing to explicitly spawn background tasks. In addition, the new API provides direct access to guest read and write buffers for `stream<u8>` operations, enabling zero-copy operations. Other changes: - I've removed the `HostTaskOutput`; we were using it to run extra code with access to the store after a host task completes, but we can do that more elegantly inside the future using `tls::get`. This also allowed me to simplify `Instance::poll_until` a bit. - I've removed the `watch_{reader,writer}` functionality; it's not needed now given that the runtime will automatically dispose of the producer or consumer when the other end of the stream or future is closed -- no need for embedder code to manage that. - In order to make `UntypedWriteBuffer` `Send`, I had to wrap its raw pointer `buf` field in a `SendSyncPtr`. - I've removed `{Future,Stream}Writer` entirely and moved `Instance::{future,stream}` to `{Future,Stream}Reader::new`, respectively. - I've added a bounds check to the beginnings of `Instance::guest_read` and `Instance::guest_write` so that we need not do it later in `Guest{Source,Destination}::remaining`, meaning those functions can be infallible. Note that I haven't updated `wasmtime-wasi` yet to match; that will happen in one or more follow-up commits. Signed-off-by: Joel Dice <joel.dice@fermyon.com> * Add `Accessor::getter`, rename `with_data` to `with_getter` * fixup bindgen invocation Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net> * add support for zero-length writes/reads to/from host I've added a test to cover this; it also tests direct buffer access for `stream<u8>`, which I realized I forgot to cover earlier. And of course there was a bug 🤦. Signed-off-by: Joel Dice <joel.dice@fermyon.com> * add `{Destination,Source}::remaining` methods This can help `Stream{Producer,Consumer}` implementations determine how many items to write or read, respectively. Signed-off-by: Joel Dice <joel.dice@fermyon.com> * wasi: migrate sockets to new API Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net> * tests: read the socket stream until EOF Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net> * p3-sockets: account for cancellation Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net> * p3-sockets: mostly ensure byte buffer cancellation-safety Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net> * p3-filesystem: switch to new API Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net> * fixup! p3-sockets: mostly ensure byte buffer cancellation-safety * p3-cli: switch to new API Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net> * p3: limit maximum buffer size Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net> * p3-sockets: remove reuseaddr test loop workaround Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net> * p3: drive I/O in `when_ready` Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net> * fixup! p3: drive I/O in `when_ready` * Refine `Stream{Producer,Consumer}` APIs Per conversations last week with Roman, Alex, and Lann, I've updated these traits to present a lower-level API based on `poll_{consume,produce}` functions and have documented the implementation requirements for various scenarios which have come up in `wasmtime-wasi`, particularly around graceful cancellation. See the doc comments for those functions for details. Signed-off-by: Joel Dice <joel.dice@fermyon.com> * being integration of new API Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net> * update wasi/src/p3/filesystem to use new stream API This is totally untested so far; I'll run the tests once we have everything else compiling. Signed-off-by: Joel Dice <joel.dice@fermyon.com> * update wasi/src/p3/cli to use new stream API This is totally untested and doesn't even compile yet due to a lifetime issue I don't have time to address yet. I'll follow up later with a fix. Signed-off-by: Joel Dice <joel.dice@fermyon.com> * fix: remove `'a` bound on `&self` Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net> * finish `wasi:sockets` adaptation Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net> * finish `wasi:cli` adaptation Note, that this removes the read optimization - let's get the implementation complete first and optimize later Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net> * remove redundant loop in sockets Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net> * wasi: buffer on 0-length reads Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net> * finish `wasi:filesystem` adaptation Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net> * remove `MAX_BUFFER_CAPACITY` Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net> * refactor `Cursor` usage Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net> * impl Default for VecBuffer Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net> * refactor: use consistent import styling Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net> * feature-gate fs Arc accessors Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net> * Update test expectations --------- Signed-off-by: Joel Dice <joel.dice@fermyon.com> Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net> Co-authored-by: Alex Crichton <alex@alexcrichton.com> Co-authored-by: Roman Volosatovs <rvolosatovs@riseup.net>
This changes the host APIs for dealing with futures and streams from a "rendezvous"-style API to a callback-oriented one.
Previously you would create e.g. a
StreamReader/StreamWriterpair and call theirreadandwritemethods, respectively, and those methods would returnFutures that resolved when the operation was matched with a correspondingwriteorreadoperation on the other end.With the new API, you instead provide a
StreamProducertrait implementation whe creating the stream, whoseproducemethod will be called as soon as a read happens, giving the implementation a chance to respond immediately without making the reader wait for a rendezvous. Likewise, you can match the read end of a stream to aStreamConsumerto respond immediately to writes. This model should reduce scheduling overhead and make it easier to e.g. pipe items to/fromAsyncWrite/AsyncReadorSink/Streamimplementations without needing to explicitly spawn background tasks. In addition, the new API provides direct access to guest read and write buffers forstream<u8>operations, enabling zero-copy operations.Other changes:
I've removed the
HostTaskOutput; we were using it to run extra code with access to the store after a host task completes, but we can do that more elegantly inside the future usingtls::get. This also allowed me to simplifyInstance::poll_untila bit.I've removed the
watch_{reader,writer}functionality; it's not needed now given that the runtime will automatically dispose of the producer or consumer when the other end of the stream or future is closed -- no need for embedder code to manage that.In order to make
UntypedWriteBufferSend, I had to wrap its raw pointerbuffield in aSendSyncPtr.I've removed
{Future,Stream}Writerentirely and movedInstance::{future,stream}to{Future,Stream}Reader::new, respectively.I've added a bounds check to the beginnings of
Instance::guest_readandInstance::guest_writeso that we need not do it later inGuest{Source,Destination}::remaining, meaning those functions can be infallible.Note that I haven't updated
wasmtime-wasiyet to match; that will happen in one or more follow-up commits.