Skip to content

Commit

Permalink
Add missing transition AsyncBackpressuredStream state machine (#27)
Browse files Browse the repository at this point in the history
### Motivation

Some of our bidirectional streaming tests were failing intermittently.
When they failed, the symptom was that more bytes were received by the
end user (in the test) than the server sent.

For example, in the test
`testStreamingDownload_1kChunk_10kChunks_100BDownloadWatermark`, we
expect:

- ✅ The server sends 10,000 chunks of 1024 bytes:
  ```console
  ❯ cat repro.txt | grep -i "server sent body chunk" | head -3
  Server sent body chunk 1/10000 of 1024
  Server sent body chunk 2/10000 of 1024
  Server sent body chunk 3/10000 of 1024

  ❯ cat repro.txt | grep -i "server sent body chunk" | wc -l
  10000
  ```

- ✅ URLSession `didReceive data` callback called a non-deterministic
number of times because it may re-chunk the bytes internally, but the
total number of bytes through the delegate calls is 10,240,000:
  ```console
  ❯ cat repro.txt | grep "didReceive data" | head -3
  Task delegate: didReceive data (numBytes: 1024)
  Task delegate: didReceive data (numBytes: 2048)
  Task delegate: didReceive data (numBytes: 1024)

❯ cat repro.txt | grep "didReceive data" | awk '{ print $6 }' | tr -d \)
| paste -sd+ | bc -l
  10240000
  ```

- ❌ The response body chunks emitted by the `AsyncBackpressuredStream`
to the user (in the test) match 1:1 those received by the delegate
callback:
  ```console
❯ cat repro.txt | grep "Client received some response body bytes" | head
-3
  Client received some response body bytes (numBytes: 1024)
  Client received some response body bytes (numBytes: 2048)
  Client received some response body bytes (numBytes: 1024)

❯ cat repro.txt | grep "didReceive data" | awk '{ print $6 }' | tr -d \)
| wc -l
  333

❯ cat repro.txt | grep "Client received some response body bytes" | wc
-l
  334
  ```

- ❌ The total number of bytes emitted by the `AsyncBackpressuredStream`
to the user (in the test) match is 10,240,000 and it can then
reconstruct 10,000 chunks of 1024 to match what the server sent:
  ```console
❯ cat repro.txt | grep "Client received some response body bytes" | awk
'{ print $8 }' | tr -d \) | paste -sd+ | bc -l
  10280960

  ❯ cat repro.txt | grep "Client reconstructing" | tail -3
  Client reconstructing and verifying chunk 10038/10000
  Client reconstructing and verifying chunk 10039/10000
  Client reconstructing and verifying chunk 10040/10000
  ```

So we see that there was one more element emitted from the
`AsyncBackpressuredStream` than the delegate callback wrote, which meant
the test saw an additional 40960 bytes than it expected to and
consequently reconstructed an additional 40 chunks of size 1024 over
what the server sent.

We can see that the `AsyncBackpressuredStream` duplicates an element
along the way, of 40960 bytes,

```diff
❯ diff -u --label=received-in-delegate-callback <(cat repro.txt | grep "didReceive data" | awk '{ print $6 }' | tr -d \)) --label=received-through-async-sequence <(cat repro.txt | grep "Client received some response body bytes" | awk '{ print $8 }' | tr -d \))
--- received-in-delegate-callback
+++ received-through-async-sequence
@@ -305,6 +305,7 @@
 2048
 1024
 40960
+40960
 24576
 2048
 34841
 ```

After some investigation, it turned out there was a missing transition in the state machine that underlies the `AsyncBackpressuredStream`. When calling `suspendNext` when there are buffered elements, but we are above the watermark, we popped the first item from the buffer and returned it _without_ updating the state (with the new buffer, without the popped element).

Consequently, this meant that the _next_ call to `next()` would return the same element again. 

### Modifications

The following modifications have been made in separate commits to aid review:
- Add debug logging to the state machine functions, logging the old state, event, new state, and resulting action.
- Add two tests which reproduce this error.
- Add the missing state transition (which causes the newly added tests to reliably pass).

### Result

Duplicate elements are no longer emitted from the response body.

### Test Plan

- Unit tests were added that fail without the fix, that now pass reliably.

### Additional notes

The implementation we are using for `AsyncBackpressuredStream` was taken from an early draft of SE-0406. We should probably move to using something closer matching that of the current PR to the Swift tree, or that used by swift-grpc, which has also adopted this code and cleaned it up to remove the dependencies on the standard library internals. Additionally, that implementation does not have this missing state transition and also adds an intermediate state to the state machine to avoid unintended copy-on-write.
  • Loading branch information
simonjbeaumont authored Nov 23, 2023
1 parent 8464a53 commit b1d5cfa
Show file tree
Hide file tree
Showing 2 changed files with 215 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,20 @@ extension AsyncBackpressuredStream {
}

func write<S: Sequence>(contentsOf sequence: S) throws -> Source.WriteResult where S.Element == Element {
let action = self.lock.withLock { return self.stateMachine.write(sequence) }
let action = self.lock.withLock {
let stateBefore = self.stateMachine.state
let action = self.stateMachine.write(sequence)
let stateAfter = self.stateMachine.state
debug("""
---
event: write
state before: \(stateBefore)
state after: \(stateAfter)
action: \(action)
---
""")
return action
}

switch action {
case .returnProduceMore: return .produceMore
Expand Down Expand Up @@ -385,7 +398,18 @@ extension AsyncBackpressuredStream {
onProduceMore: @escaping @Sendable (Result<Void, any Error>) -> Void
) {
let action = self.lock.withLock {
return self.stateMachine.enqueueProducer(writeToken: writeToken, onProduceMore: onProduceMore)
let stateBefore = self.stateMachine.state
let action = self.stateMachine.enqueueProducer(writeToken: writeToken, onProduceMore: onProduceMore)
let stateAfter = self.stateMachine.state
debug("""
---
event: \(#function)
state before: \(stateBefore)
state after: \(stateAfter)
action: \(action)
---
""")
return action
}

switch action {
Expand Down Expand Up @@ -449,7 +473,20 @@ extension AsyncBackpressuredStream {
}

func next() async throws -> Element? {
let action = self.lock.withLock { return self.stateMachine.next() }
let action = self.lock.withLock {
let stateBefore = self.stateMachine.state
let action = self.stateMachine.next()
let stateAfter = self.stateMachine.state
debug("""
---
event: next
state before: \(stateBefore)
state after: \(stateAfter)
action: \(action)
---
""")
return action
}

switch action {
case .returnElement(let element): return element
Expand All @@ -476,7 +513,20 @@ extension AsyncBackpressuredStream {
func suspendNext() async throws -> Element? {
return try await withTaskCancellationHandler {
return try await withCheckedThrowingContinuation { continuation in
let action = self.lock.withLock { return self.stateMachine.suspendNext(continuation: continuation) }
let action = self.lock.withLock {
let stateBefore = self.stateMachine.state
let action = self.stateMachine.suspendNext(continuation: continuation)
let stateAfter = self.stateMachine.state
debug("""
---
event: \(#function)
state before: \(stateBefore)
state after: \(stateAfter)
action: \(action)
---
""")
return action
}

switch action {
case .resumeContinuationWithElement(let continuation, let element):
Expand Down Expand Up @@ -1239,6 +1289,16 @@ extension AsyncBackpressuredStream {

guard shouldProduceMore else {
// We don't have any new demand, so we can just return the element.
self.state = .streaming(
backPressureStrategy: backPressureStrategy,
buffer: buffer,
consumerContinuation: nil,
producerContinuations: producerContinuations,
cancelledAsyncProducers: cancelledAsyncProducers,
hasOutstandingDemand: hasOutstandingDemand,
iteratorInitialized: iteratorInitialized,
onTerminate: onTerminate
)
return .resumeContinuationWithElement(continuation, element)
}
let producers = Array(producerContinuations.map { $0.1 })
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,120 @@ final class AsyncBackpressuredStreamTests: XCTestCase {
XCTAssertEqual(strategy.didConsume(elements: Slice([])), true)
XCTAssertEqual(strategy.currentWatermark, 0)
}

func testWritingOverWatermark() async throws {
try await withThrowingTaskGroup(of: Void.self) { group in
let (stream, continuation) = AsyncBackpressuredStream<Int, any Error>
.makeStream(backPressureStrategy: .highLowWatermark(lowWatermark: 1, highWatermark: 1))

group.addTask {
for i in 1...10 {
debug("Producer writing element \(i)...")
let writeResult = try continuation.write(contentsOf: CollectionOfOne(i))
debug("Producer wrote element \(i), result = \(writeResult)")
// ignore backpressure result and write again anyway
}
debug("Producer finished")
continuation.finish(throwing: nil)
}

var iterator = stream.makeAsyncIterator()
var numElementsConsumed = 0
var expectedNextValue = 1
while true {
debug("Consumer reading element...")
guard let element = try await iterator.next() else { break }
XCTAssertEqual(element, expectedNextValue)
debug("Consumer read element: \(element), expected: \(expectedNextValue)")
numElementsConsumed += 1
expectedNextValue += 1
}
XCTAssertEqual(numElementsConsumed, 10)

group.cancelAll()
}
}

func testStateMachineSuspendNext() async throws {
typealias Stream = AsyncBackpressuredStream<Int, any Error>

var strategy = Stream.InternalBackPressureStrategy.highLowWatermark(.init(lowWatermark: 1, highWatermark: 1))
_ = strategy.didYield(elements: Slice([1, 2, 3]))
var stateMachine = Stream.StateMachine(backPressureStrategy: strategy, onTerminate: nil)
stateMachine.state = .streaming(
backPressureStrategy: strategy,
buffer: [1, 2, 3],
consumerContinuation: nil,
producerContinuations: [],
cancelledAsyncProducers: [],
hasOutstandingDemand: false,
iteratorInitialized: true,
onTerminate: nil
)

guard case .streaming(_, let buffer, let consumerContinuation, _, _, _, _, _) = stateMachine.state else {
XCTFail("Unexpected state: \(stateMachine.state)")
return
}
XCTAssertEqual(buffer, [1, 2, 3])
XCTAssertNil(consumerContinuation)

_ = try await withCheckedThrowingContinuation { continuation in
let action = stateMachine.suspendNext(continuation: continuation)

guard case .resumeContinuationWithElement(_, let element) = action else {
XCTFail("Unexpected action: \(action)")
return
}
XCTAssertEqual(element, 1)

guard case .streaming(_, let buffer, let consumerContinuation, _, _, _, _, _) = stateMachine.state else {
XCTFail("Unexpected state: \(stateMachine.state)")
return
}
XCTAssertEqual(buffer, [2, 3])
XCTAssertNil(consumerContinuation)

continuation.resume(returning: element)
}
}
}

extension AsyncBackpressuredStream.Source.WriteResult: CustomStringConvertible {
// swift-format-ignore: AllPublicDeclarationsHaveDocumentation
public var description: String {
switch self {
case .enqueueCallback: return "enqueueCallBack"
case .produceMore: return "produceMore"
}
}
}

extension AsyncBackpressuredStream.StateMachine.SuspendNextAction: CustomStringConvertible {
// swift-format-ignore: AllPublicDeclarationsHaveDocumentation
public var description: String {
switch self {
case .none: return "none"
case .resumeContinuationWithElement: return "resumeContinuationWithElement"
case .resumeContinuationWithElementAndProducers: return "resumeContinuationWithElementAndProducers"
case .resumeContinuationWithFailureAndCallOnTerminate: return "resumeContinuationWithFailureAndCallOnTerminate"
case .resumeContinuationWithNil: return "resumeContinuationWithNil"
}
}
}

extension AsyncBackpressuredStream.StateMachine.State: CustomStringConvertible {
// swift-format-ignore: AllPublicDeclarationsHaveDocumentation
public var description: String {
switch self {
case .initial: return "initial"
case .streaming(_, let buffer, let consumer, let producers, _, let demand, _, _):
return
"streaming(buffer.count: \(buffer.count), consumer: \(consumer != nil ? "yes" : "no"), producers: \(producers), demand: \(demand))"
case .finished: return "finished"
case .sourceFinished: return "sourceFinished"
}
}
}

extension AsyncSequence {
Expand All @@ -206,3 +320,40 @@ extension AsyncSequence {
try await self.reduce(into: []) { accumulated, next in accumulated.append(next) }
}
}

extension AsyncBackpressuredStream.StateMachine.NextAction: CustomStringConvertible {
// swift-format-ignore: AllPublicDeclarationsHaveDocumentation
public var description: String {
switch self {
case .returnNil: return "returnNil"
case .returnElementAndResumeProducers: return "returnElementAndResumeProducers"
case .returnFailureAndCallOnTerminate: return "returnFailureAndCallOnTerminate"
case .returnElement: return "returnElement"
case .suspendTask: return "suspendTask"
}
}
}

extension AsyncBackpressuredStream.StateMachine.WriteAction: CustomStringConvertible {
// swift-format-ignore: AllPublicDeclarationsHaveDocumentation
public var description: String {
switch self {
case .returnProduceMore: return "returnProduceMore"
case .returnEnqueue: return "returnEnqueue"
case .resumeConsumerContinuationAndReturnProduceMore: return "resumeConsumerContinuationAndReturnProduceMore"
case .resumeConsumerContinuationAndReturnEnqueue: return "resumeConsumerContinuationAndReturnEnqueue"
case .throwFinishedError: return "throwFinishedError"
}
}
}

extension AsyncBackpressuredStream.StateMachine.EnqueueProducerAction: CustomStringConvertible {
// swift-format-ignore: AllPublicDeclarationsHaveDocumentation
public var description: String {
switch self {
case .resumeProducer: return "resumeProducer"
case .resumeProducerWithCancellationError: return "resumeProducerWithCancellationError"
case .none: return "none"
}
}
}

0 comments on commit b1d5cfa

Please sign in to comment.