Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add missing transition AsyncBackpressuredStream state machine #27

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,20 @@ extension AsyncBackpressuredStream {
}

func write<S: Sequence>(contentsOf sequence: S) throws -> Source.WriteResult where S.Element == Element {
let action = self.lock.withLock { return self.stateMachine.write(sequence) }
let action = self.lock.withLock {
let stateBefore = self.stateMachine.state
let action = self.stateMachine.write(sequence)
let stateAfter = self.stateMachine.state
debug("""
---
event: write
state before: \(stateBefore)
state after: \(stateAfter)
action: \(action)
---
""")
return action
}

switch action {
case .returnProduceMore: return .produceMore
Expand Down Expand Up @@ -385,7 +398,18 @@ extension AsyncBackpressuredStream {
onProduceMore: @escaping @Sendable (Result<Void, any Error>) -> Void
) {
let action = self.lock.withLock {
return self.stateMachine.enqueueProducer(writeToken: writeToken, onProduceMore: onProduceMore)
let stateBefore = self.stateMachine.state
let action = self.stateMachine.enqueueProducer(writeToken: writeToken, onProduceMore: onProduceMore)
let stateAfter = self.stateMachine.state
debug("""
---
event: \(#function)
state before: \(stateBefore)
state after: \(stateAfter)
action: \(action)
---
""")
return action
}

switch action {
Expand Down Expand Up @@ -449,7 +473,20 @@ extension AsyncBackpressuredStream {
}

func next() async throws -> Element? {
let action = self.lock.withLock { return self.stateMachine.next() }
let action = self.lock.withLock {
let stateBefore = self.stateMachine.state
let action = self.stateMachine.next()
let stateAfter = self.stateMachine.state
debug("""
---
event: next
state before: \(stateBefore)
state after: \(stateAfter)
action: \(action)
---
""")
return action
}

switch action {
case .returnElement(let element): return element
Expand All @@ -476,7 +513,20 @@ extension AsyncBackpressuredStream {
func suspendNext() async throws -> Element? {
return try await withTaskCancellationHandler {
return try await withCheckedThrowingContinuation { continuation in
let action = self.lock.withLock { return self.stateMachine.suspendNext(continuation: continuation) }
let action = self.lock.withLock {
let stateBefore = self.stateMachine.state
let action = self.stateMachine.suspendNext(continuation: continuation)
let stateAfter = self.stateMachine.state
debug("""
---
event: \(#function)
state before: \(stateBefore)
state after: \(stateAfter)
action: \(action)
---
""")
return action
}

switch action {
case .resumeContinuationWithElement(let continuation, let element):
Expand Down Expand Up @@ -1239,6 +1289,16 @@ extension AsyncBackpressuredStream {

guard shouldProduceMore else {
// We don't have any new demand, so we can just return the element.
self.state = .streaming(
backPressureStrategy: backPressureStrategy,
buffer: buffer,
consumerContinuation: nil,
producerContinuations: producerContinuations,
cancelledAsyncProducers: cancelledAsyncProducers,
hasOutstandingDemand: hasOutstandingDemand,
iteratorInitialized: iteratorInitialized,
onTerminate: onTerminate
)
return .resumeContinuationWithElement(continuation, element)
}
let producers = Array(producerContinuations.map { $0.1 })
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,120 @@ final class AsyncBackpressuredStreamTests: XCTestCase {
XCTAssertEqual(strategy.didConsume(elements: Slice([])), true)
XCTAssertEqual(strategy.currentWatermark, 0)
}

func testWritingOverWatermark() async throws {
try await withThrowingTaskGroup(of: Void.self) { group in
let (stream, continuation) = AsyncBackpressuredStream<Int, any Error>
.makeStream(backPressureStrategy: .highLowWatermark(lowWatermark: 1, highWatermark: 1))

group.addTask {
for i in 1...10 {
debug("Producer writing element \(i)...")
let writeResult = try continuation.write(contentsOf: CollectionOfOne(i))
debug("Producer wrote element \(i), result = \(writeResult)")
// ignore backpressure result and write again anyway
}
debug("Producer finished")
continuation.finish(throwing: nil)
}

var iterator = stream.makeAsyncIterator()
var numElementsConsumed = 0
var expectedNextValue = 1
while true {
debug("Consumer reading element...")
guard let element = try await iterator.next() else { break }
XCTAssertEqual(element, expectedNextValue)
debug("Consumer read element: \(element), expected: \(expectedNextValue)")
numElementsConsumed += 1
expectedNextValue += 1
}
XCTAssertEqual(numElementsConsumed, 10)

group.cancelAll()
}
}

func testStateMachineSuspendNext() async throws {
typealias Stream = AsyncBackpressuredStream<Int, any Error>

var strategy = Stream.InternalBackPressureStrategy.highLowWatermark(.init(lowWatermark: 1, highWatermark: 1))
_ = strategy.didYield(elements: Slice([1, 2, 3]))
var stateMachine = Stream.StateMachine(backPressureStrategy: strategy, onTerminate: nil)
stateMachine.state = .streaming(
backPressureStrategy: strategy,
buffer: [1, 2, 3],
consumerContinuation: nil,
producerContinuations: [],
cancelledAsyncProducers: [],
hasOutstandingDemand: false,
iteratorInitialized: true,
onTerminate: nil
)

guard case .streaming(_, let buffer, let consumerContinuation, _, _, _, _, _) = stateMachine.state else {
XCTFail("Unexpected state: \(stateMachine.state)")
return
}
XCTAssertEqual(buffer, [1, 2, 3])
XCTAssertNil(consumerContinuation)

_ = try await withCheckedThrowingContinuation { continuation in
let action = stateMachine.suspendNext(continuation: continuation)

guard case .resumeContinuationWithElement(_, let element) = action else {
XCTFail("Unexpected action: \(action)")
return
}
XCTAssertEqual(element, 1)

guard case .streaming(_, let buffer, let consumerContinuation, _, _, _, _, _) = stateMachine.state else {
XCTFail("Unexpected state: \(stateMachine.state)")
return
}
XCTAssertEqual(buffer, [2, 3])
XCTAssertNil(consumerContinuation)

continuation.resume(returning: element)
}
}
}

extension AsyncBackpressuredStream.Source.WriteResult: CustomStringConvertible {
// swift-format-ignore: AllPublicDeclarationsHaveDocumentation
public var description: String {
switch self {
case .enqueueCallback: return "enqueueCallBack"
case .produceMore: return "produceMore"
}
}
}

extension AsyncBackpressuredStream.StateMachine.SuspendNextAction: CustomStringConvertible {
// swift-format-ignore: AllPublicDeclarationsHaveDocumentation
public var description: String {
switch self {
case .none: return "none"
case .resumeContinuationWithElement: return "resumeContinuationWithElement"
case .resumeContinuationWithElementAndProducers: return "resumeContinuationWithElementAndProducers"
case .resumeContinuationWithFailureAndCallOnTerminate: return "resumeContinuationWithFailureAndCallOnTerminate"
case .resumeContinuationWithNil: return "resumeContinuationWithNil"
}
}
}

extension AsyncBackpressuredStream.StateMachine.State: CustomStringConvertible {
// swift-format-ignore: AllPublicDeclarationsHaveDocumentation
public var description: String {
switch self {
case .initial: return "initial"
case .streaming(_, let buffer, let consumer, let producers, _, let demand, _, _):
return
"streaming(buffer.count: \(buffer.count), consumer: \(consumer != nil ? "yes" : "no"), producers: \(producers), demand: \(demand))"
case .finished: return "finished"
case .sourceFinished: return "sourceFinished"
}
}
}

extension AsyncSequence {
Expand All @@ -206,3 +320,40 @@ extension AsyncSequence {
try await self.reduce(into: []) { accumulated, next in accumulated.append(next) }
}
}

extension AsyncBackpressuredStream.StateMachine.NextAction: CustomStringConvertible {
// swift-format-ignore: AllPublicDeclarationsHaveDocumentation
public var description: String {
switch self {
case .returnNil: return "returnNil"
case .returnElementAndResumeProducers: return "returnElementAndResumeProducers"
case .returnFailureAndCallOnTerminate: return "returnFailureAndCallOnTerminate"
case .returnElement: return "returnElement"
case .suspendTask: return "suspendTask"
}
}
}

extension AsyncBackpressuredStream.StateMachine.WriteAction: CustomStringConvertible {
// swift-format-ignore: AllPublicDeclarationsHaveDocumentation
public var description: String {
switch self {
case .returnProduceMore: return "returnProduceMore"
case .returnEnqueue: return "returnEnqueue"
case .resumeConsumerContinuationAndReturnProduceMore: return "resumeConsumerContinuationAndReturnProduceMore"
case .resumeConsumerContinuationAndReturnEnqueue: return "resumeConsumerContinuationAndReturnEnqueue"
case .throwFinishedError: return "throwFinishedError"
}
}
}

extension AsyncBackpressuredStream.StateMachine.EnqueueProducerAction: CustomStringConvertible {
// swift-format-ignore: AllPublicDeclarationsHaveDocumentation
public var description: String {
switch self {
case .resumeProducer: return "resumeProducer"
case .resumeProducerWithCancellationError: return "resumeProducerWithCancellationError"
case .none: return "none"
}
}
}