Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add missing transition AsyncBackpressuredStream state machine #27

Conversation

simonjbeaumont
Copy link
Collaborator

@simonjbeaumont simonjbeaumont commented Nov 23, 2023

Motivation

Some of our bidirectional streaming tests were failing intermittently. When they failed, the symptom was that more bytes were received by the end user (in the test) than the server sent.

For example, in the test testStreamingDownload_1kChunk_10kChunks_100BDownloadWatermark, we expect:

  • ✅ The server sends 10,000 chunks of 1024 bytes:

    cat repro.txt | grep -i "server sent body chunk" | head -3
    Server sent body chunk 1/10000 of 1024
    Server sent body chunk 2/10000 of 1024
    Server sent body chunk 3/10000 of 1024cat repro.txt | grep -i "server sent body chunk" | wc -l
    10000
  • ✅ URLSession didReceive data callback called a non-deterministic number of times because it may re-chunk the bytes internally, but the total number of bytes through the delegate calls is 10,240,000:

    cat repro.txt | grep "didReceive data" | head -3
    Task delegate: didReceive data (numBytes: 1024)
    Task delegate: didReceive data (numBytes: 2048)
    Task delegate: didReceive data (numBytes: 1024)cat repro.txt | grep "didReceive data" | awk '{ print $6 }' | tr -d \) | paste -sd+ | bc -l
    10240000
  • ❌ The response body chunks emitted by the AsyncBackpressuredStream to the user (in the test) match 1:1 those received by the delegate callback:

    cat repro.txt | grep "Client received some response body bytes" | head -3
    Client received some response body bytes (numBytes: 1024)
    Client received some response body bytes (numBytes: 2048)
    Client received some response body bytes (numBytes: 1024)cat repro.txt | grep "didReceive data" | awk '{ print $6 }' | tr -d \) | wc -l
    333cat repro.txt | grep "Client received some response body bytes" | wc -l
    334
  • ❌ The total number of bytes emitted by the AsyncBackpressuredStream to the user (in the test) match is 10,240,000 and it can then reconstruct 10,000 chunks of 1024 to match what the server sent:

    cat repro.txt | grep "Client received some response body bytes" | awk '{ print $8 }' | tr -d \) | paste -sd+ | bc -l
    10280960cat repro.txt | grep "Client reconstructing" | tail -3
    Client reconstructing and verifying chunk 10038/10000
    Client reconstructing and verifying chunk 10039/10000
    Client reconstructing and verifying chunk 10040/10000

So we see that there was one more element emitted from the AsyncBackpressuredStream than the delegate callback wrote, which meant the test saw an additional 40960 bytes than it expected to and consequently reconstructed an additional 40 chunks of size 1024 over what the server sent.

We can see that the AsyncBackpressuredStream duplicates an element along the way, of 40960 bytes,

❯ diff -u --label=received-in-delegate-callback <(cat repro.txt | grep "didReceive data" | awk '{ print $6 }' | tr -d \)) --label=received-through-async-sequence <(cat repro.txt | grep "Client received some response body bytes" | awk '{ print $8 }' | tr -d \))
--- received-in-delegate-callback
+++ received-through-async-sequence
@@ -305,6 +305,7 @@
 2048
 1024
 40960
+40960
 24576
 2048
 34841

After some investigation, it turned out there was a missing transition in the state machine that underlies the AsyncBackpressuredStream. When calling suspendNext when there are buffered elements, but we are above the watermark, we popped the first item from the buffer and returned it without updating the state (with the new buffer, without the popped element).

Consequently, this meant that the next call to next() would return the same element again.

Modifications

The following modifications have been made in separate commits to aid review:

  • Add debug logging to the state machine functions, logging the old state, event, new state, and resulting action.
  • Add two tests which reproduce this error.
  • Add the missing state transition (which causes the newly added tests to reliably pass).

Result

Duplicate elements are no longer emitted from the response body.

Test Plan

  • Unit tests were added that fail without the fix, that now pass reliably.

Additional notes

The implementation we are using for AsyncBackpressuredStream was taken from an early draft of SE-0406. We should probably move to using something closer matching that of the current PR to the Swift tree, or that used by swift-grpc, which has also adopted this code and cleaned it up to remove the dependencies on the standard library internals. Additionally, that implementation does not have this missing state transition and also adds an intermediate state to the state machine to avoid unintended copy-on-write.

@simonjbeaumont simonjbeaumont changed the title Sb/missing state transition in async backpressured stream Add missing transition AsyncBackpressuredStream state machine Nov 23, 2023
@simonjbeaumont simonjbeaumont marked this pull request as ready for review November 23, 2023 11:43
@simonjbeaumont simonjbeaumont added the 🔨 semver/patch No public API change. label Nov 23, 2023
@simonjbeaumont simonjbeaumont merged commit b1d5cfa into apple:main Nov 23, 2023
simonjbeaumont added a commit that referenced this pull request Nov 23, 2023
### Motivation

As a follow up to #27, we noted it would be good to align on the latest
draft implementation of SE-0406 (AsyncStream with backpressure) to both
pickup the latest improvements in performance and correctness, and to
minimise the churn if/when this lands in the standard library or
standalone package.

### Modifications

In order to simplify reviewing the following modifications have been
made in independent commits:

* `Add updated SE-0406 implementation as BufferedStream, incl. tests`:
Skim over this—it's vendored in wholesale.
* `Port the custom watermark support to BufferedStream`: Skim over
this—it's a 1:1 port of the logic that was added to
`AsyncBackpressuredStream`.
* `Switch from AsyncBackpressuredStream to BufferedStream in delegate`:
Review this—it's a minimal change.
* `Remove AsyncBackpressuredStream and its vendored locks`: Skim over
this—it's removing the old implementation.

### Result

No functional change, but the internal async sequence we're using should
be more robust, performant, and more likely to match a future standard
library type.

### Test Plan

- The new vendored `BufferedStream` actually comes with a much greater
number of vendored tests than the previous revision.
- Also ported the tests from this repo for the custom watermark logic.
- All our URLSessionTransport-specific tests continue to pass.

---------

Signed-off-by: Si Beaumont <beaumont@apple.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
🔨 semver/patch No public API change.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants