From b1d5cfaf6e59289406575f731d2030711f895559 Mon Sep 17 00:00:00 2001 From: Si Beaumont Date: Thu, 23 Nov 2023 14:05:24 +0000 Subject: [PATCH] Add missing transition AsyncBackpressuredStream state machine (#27) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### Motivation Some of our bidirectional streaming tests were failing intermittently. When they failed, the symptom was that more bytes were received by the end user (in the test) than the server sent. For example, in the test `testStreamingDownload_1kChunk_10kChunks_100BDownloadWatermark`, we expect: - ✅ The server sends 10,000 chunks of 1024 bytes: ```console ❯ cat repro.txt | grep -i "server sent body chunk" | head -3 Server sent body chunk 1/10000 of 1024 Server sent body chunk 2/10000 of 1024 Server sent body chunk 3/10000 of 1024 ❯ cat repro.txt | grep -i "server sent body chunk" | wc -l 10000 ``` - ✅ URLSession `didReceive data` callback called a non-deterministic number of times because it may re-chunk the bytes internally, but the total number of bytes through the delegate calls is 10,240,000: ```console ❯ cat repro.txt | grep "didReceive data" | head -3 Task delegate: didReceive data (numBytes: 1024) Task delegate: didReceive data (numBytes: 2048) Task delegate: didReceive data (numBytes: 1024) ❯ cat repro.txt | grep "didReceive data" | awk '{ print $6 }' | tr -d \) | paste -sd+ | bc -l 10240000 ``` - ❌ The response body chunks emitted by the `AsyncBackpressuredStream` to the user (in the test) match 1:1 those received by the delegate callback: ```console ❯ cat repro.txt | grep "Client received some response body bytes" | head -3 Client received some response body bytes (numBytes: 1024) Client received some response body bytes (numBytes: 2048) Client received some response body bytes (numBytes: 1024) ❯ cat repro.txt | grep "didReceive data" | awk '{ print $6 }' | tr -d \) | wc -l 333 ❯ cat repro.txt | grep "Client received some response body bytes" | wc -l 334 ``` - ❌ The total number of bytes emitted by the `AsyncBackpressuredStream` to the user (in the test) match is 10,240,000 and it can then reconstruct 10,000 chunks of 1024 to match what the server sent: ```console ❯ cat repro.txt | grep "Client received some response body bytes" | awk '{ print $8 }' | tr -d \) | paste -sd+ | bc -l 10280960 ❯ cat repro.txt | grep "Client reconstructing" | tail -3 Client reconstructing and verifying chunk 10038/10000 Client reconstructing and verifying chunk 10039/10000 Client reconstructing and verifying chunk 10040/10000 ``` So we see that there was one more element emitted from the `AsyncBackpressuredStream` than the delegate callback wrote, which meant the test saw an additional 40960 bytes than it expected to and consequently reconstructed an additional 40 chunks of size 1024 over what the server sent. We can see that the `AsyncBackpressuredStream` duplicates an element along the way, of 40960 bytes, ```diff ❯ diff -u --label=received-in-delegate-callback <(cat repro.txt | grep "didReceive data" | awk '{ print $6 }' | tr -d \)) --label=received-through-async-sequence <(cat repro.txt | grep "Client received some response body bytes" | awk '{ print $8 }' | tr -d \)) --- received-in-delegate-callback +++ received-through-async-sequence @@ -305,6 +305,7 @@ 2048 1024 40960 +40960 24576 2048 34841 ``` After some investigation, it turned out there was a missing transition in the state machine that underlies the `AsyncBackpressuredStream`. When calling `suspendNext` when there are buffered elements, but we are above the watermark, we popped the first item from the buffer and returned it _without_ updating the state (with the new buffer, without the popped element). Consequently, this meant that the _next_ call to `next()` would return the same element again. ### Modifications The following modifications have been made in separate commits to aid review: - Add debug logging to the state machine functions, logging the old state, event, new state, and resulting action. - Add two tests which reproduce this error. - Add the missing state transition (which causes the newly added tests to reliably pass). ### Result Duplicate elements are no longer emitted from the response body. ### Test Plan - Unit tests were added that fail without the fix, that now pass reliably. ### Additional notes The implementation we are using for `AsyncBackpressuredStream` was taken from an early draft of SE-0406. We should probably move to using something closer matching that of the current PR to the Swift tree, or that used by swift-grpc, which has also adopted this code and cleaned it up to remove the dependencies on the standard library internals. Additionally, that implementation does not have this missing state transition and also adds an intermediate state to the state machine to avoid unintended copy-on-write. --- .../AsyncBackpressuredStream.swift | 68 +++++++- .../AsyncBackpressuredStreamTests.swift | 151 ++++++++++++++++++ 2 files changed, 215 insertions(+), 4 deletions(-) diff --git a/Sources/OpenAPIURLSession/AsyncBackpressuredStream/AsyncBackpressuredStream.swift b/Sources/OpenAPIURLSession/AsyncBackpressuredStream/AsyncBackpressuredStream.swift index 6aa7cd8..2c792b4 100644 --- a/Sources/OpenAPIURLSession/AsyncBackpressuredStream/AsyncBackpressuredStream.swift +++ b/Sources/OpenAPIURLSession/AsyncBackpressuredStream/AsyncBackpressuredStream.swift @@ -346,7 +346,20 @@ extension AsyncBackpressuredStream { } func write(contentsOf sequence: S) throws -> Source.WriteResult where S.Element == Element { - let action = self.lock.withLock { return self.stateMachine.write(sequence) } + let action = self.lock.withLock { + let stateBefore = self.stateMachine.state + let action = self.stateMachine.write(sequence) + let stateAfter = self.stateMachine.state + debug(""" + --- + event: write + state before: \(stateBefore) + state after: \(stateAfter) + action: \(action) + --- + """) + return action + } switch action { case .returnProduceMore: return .produceMore @@ -385,7 +398,18 @@ extension AsyncBackpressuredStream { onProduceMore: @escaping @Sendable (Result) -> Void ) { let action = self.lock.withLock { - return self.stateMachine.enqueueProducer(writeToken: writeToken, onProduceMore: onProduceMore) + let stateBefore = self.stateMachine.state + let action = self.stateMachine.enqueueProducer(writeToken: writeToken, onProduceMore: onProduceMore) + let stateAfter = self.stateMachine.state + debug(""" + --- + event: \(#function) + state before: \(stateBefore) + state after: \(stateAfter) + action: \(action) + --- + """) + return action } switch action { @@ -449,7 +473,20 @@ extension AsyncBackpressuredStream { } func next() async throws -> Element? { - let action = self.lock.withLock { return self.stateMachine.next() } + let action = self.lock.withLock { + let stateBefore = self.stateMachine.state + let action = self.stateMachine.next() + let stateAfter = self.stateMachine.state + debug(""" + --- + event: next + state before: \(stateBefore) + state after: \(stateAfter) + action: \(action) + --- + """) + return action + } switch action { case .returnElement(let element): return element @@ -476,7 +513,20 @@ extension AsyncBackpressuredStream { func suspendNext() async throws -> Element? { return try await withTaskCancellationHandler { return try await withCheckedThrowingContinuation { continuation in - let action = self.lock.withLock { return self.stateMachine.suspendNext(continuation: continuation) } + let action = self.lock.withLock { + let stateBefore = self.stateMachine.state + let action = self.stateMachine.suspendNext(continuation: continuation) + let stateAfter = self.stateMachine.state + debug(""" + --- + event: \(#function) + state before: \(stateBefore) + state after: \(stateAfter) + action: \(action) + --- + """) + return action + } switch action { case .resumeContinuationWithElement(let continuation, let element): @@ -1239,6 +1289,16 @@ extension AsyncBackpressuredStream { guard shouldProduceMore else { // We don't have any new demand, so we can just return the element. + self.state = .streaming( + backPressureStrategy: backPressureStrategy, + buffer: buffer, + consumerContinuation: nil, + producerContinuations: producerContinuations, + cancelledAsyncProducers: cancelledAsyncProducers, + hasOutstandingDemand: hasOutstandingDemand, + iteratorInitialized: iteratorInitialized, + onTerminate: onTerminate + ) return .resumeContinuationWithElement(continuation, element) } let producers = Array(producerContinuations.map { $0.1 }) diff --git a/Tests/OpenAPIURLSessionTests/AsyncBackpressuredStreamTests/AsyncBackpressuredStreamTests.swift b/Tests/OpenAPIURLSessionTests/AsyncBackpressuredStreamTests/AsyncBackpressuredStreamTests.swift index bdb474a..f51f656 100644 --- a/Tests/OpenAPIURLSessionTests/AsyncBackpressuredStreamTests/AsyncBackpressuredStreamTests.swift +++ b/Tests/OpenAPIURLSessionTests/AsyncBackpressuredStreamTests/AsyncBackpressuredStreamTests.swift @@ -198,6 +198,120 @@ final class AsyncBackpressuredStreamTests: XCTestCase { XCTAssertEqual(strategy.didConsume(elements: Slice([])), true) XCTAssertEqual(strategy.currentWatermark, 0) } + + func testWritingOverWatermark() async throws { + try await withThrowingTaskGroup(of: Void.self) { group in + let (stream, continuation) = AsyncBackpressuredStream + .makeStream(backPressureStrategy: .highLowWatermark(lowWatermark: 1, highWatermark: 1)) + + group.addTask { + for i in 1...10 { + debug("Producer writing element \(i)...") + let writeResult = try continuation.write(contentsOf: CollectionOfOne(i)) + debug("Producer wrote element \(i), result = \(writeResult)") + // ignore backpressure result and write again anyway + } + debug("Producer finished") + continuation.finish(throwing: nil) + } + + var iterator = stream.makeAsyncIterator() + var numElementsConsumed = 0 + var expectedNextValue = 1 + while true { + debug("Consumer reading element...") + guard let element = try await iterator.next() else { break } + XCTAssertEqual(element, expectedNextValue) + debug("Consumer read element: \(element), expected: \(expectedNextValue)") + numElementsConsumed += 1 + expectedNextValue += 1 + } + XCTAssertEqual(numElementsConsumed, 10) + + group.cancelAll() + } + } + + func testStateMachineSuspendNext() async throws { + typealias Stream = AsyncBackpressuredStream + + var strategy = Stream.InternalBackPressureStrategy.highLowWatermark(.init(lowWatermark: 1, highWatermark: 1)) + _ = strategy.didYield(elements: Slice([1, 2, 3])) + var stateMachine = Stream.StateMachine(backPressureStrategy: strategy, onTerminate: nil) + stateMachine.state = .streaming( + backPressureStrategy: strategy, + buffer: [1, 2, 3], + consumerContinuation: nil, + producerContinuations: [], + cancelledAsyncProducers: [], + hasOutstandingDemand: false, + iteratorInitialized: true, + onTerminate: nil + ) + + guard case .streaming(_, let buffer, let consumerContinuation, _, _, _, _, _) = stateMachine.state else { + XCTFail("Unexpected state: \(stateMachine.state)") + return + } + XCTAssertEqual(buffer, [1, 2, 3]) + XCTAssertNil(consumerContinuation) + + _ = try await withCheckedThrowingContinuation { continuation in + let action = stateMachine.suspendNext(continuation: continuation) + + guard case .resumeContinuationWithElement(_, let element) = action else { + XCTFail("Unexpected action: \(action)") + return + } + XCTAssertEqual(element, 1) + + guard case .streaming(_, let buffer, let consumerContinuation, _, _, _, _, _) = stateMachine.state else { + XCTFail("Unexpected state: \(stateMachine.state)") + return + } + XCTAssertEqual(buffer, [2, 3]) + XCTAssertNil(consumerContinuation) + + continuation.resume(returning: element) + } + } +} + +extension AsyncBackpressuredStream.Source.WriteResult: CustomStringConvertible { + // swift-format-ignore: AllPublicDeclarationsHaveDocumentation + public var description: String { + switch self { + case .enqueueCallback: return "enqueueCallBack" + case .produceMore: return "produceMore" + } + } +} + +extension AsyncBackpressuredStream.StateMachine.SuspendNextAction: CustomStringConvertible { + // swift-format-ignore: AllPublicDeclarationsHaveDocumentation + public var description: String { + switch self { + case .none: return "none" + case .resumeContinuationWithElement: return "resumeContinuationWithElement" + case .resumeContinuationWithElementAndProducers: return "resumeContinuationWithElementAndProducers" + case .resumeContinuationWithFailureAndCallOnTerminate: return "resumeContinuationWithFailureAndCallOnTerminate" + case .resumeContinuationWithNil: return "resumeContinuationWithNil" + } + } +} + +extension AsyncBackpressuredStream.StateMachine.State: CustomStringConvertible { + // swift-format-ignore: AllPublicDeclarationsHaveDocumentation + public var description: String { + switch self { + case .initial: return "initial" + case .streaming(_, let buffer, let consumer, let producers, _, let demand, _, _): + return + "streaming(buffer.count: \(buffer.count), consumer: \(consumer != nil ? "yes" : "no"), producers: \(producers), demand: \(demand))" + case .finished: return "finished" + case .sourceFinished: return "sourceFinished" + } + } } extension AsyncSequence { @@ -206,3 +320,40 @@ extension AsyncSequence { try await self.reduce(into: []) { accumulated, next in accumulated.append(next) } } } + +extension AsyncBackpressuredStream.StateMachine.NextAction: CustomStringConvertible { + // swift-format-ignore: AllPublicDeclarationsHaveDocumentation + public var description: String { + switch self { + case .returnNil: return "returnNil" + case .returnElementAndResumeProducers: return "returnElementAndResumeProducers" + case .returnFailureAndCallOnTerminate: return "returnFailureAndCallOnTerminate" + case .returnElement: return "returnElement" + case .suspendTask: return "suspendTask" + } + } +} + +extension AsyncBackpressuredStream.StateMachine.WriteAction: CustomStringConvertible { + // swift-format-ignore: AllPublicDeclarationsHaveDocumentation + public var description: String { + switch self { + case .returnProduceMore: return "returnProduceMore" + case .returnEnqueue: return "returnEnqueue" + case .resumeConsumerContinuationAndReturnProduceMore: return "resumeConsumerContinuationAndReturnProduceMore" + case .resumeConsumerContinuationAndReturnEnqueue: return "resumeConsumerContinuationAndReturnEnqueue" + case .throwFinishedError: return "throwFinishedError" + } + } +} + +extension AsyncBackpressuredStream.StateMachine.EnqueueProducerAction: CustomStringConvertible { + // swift-format-ignore: AllPublicDeclarationsHaveDocumentation + public var description: String { + switch self { + case .resumeProducer: return "resumeProducer" + case .resumeProducerWithCancellationError: return "resumeProducerWithCancellationError" + case .none: return "none" + } + } +}