Skip to content

Commit c4c718c

Browse files
committed
fix: Merge immediately finishes when empty array of bases
1 parent ba01681 commit c4c718c

3 files changed

Lines changed: 25 additions & 3 deletions

File tree

Sources/Combiners/Merge/AsyncMergeSequence.swift

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,15 +34,19 @@ public struct AsyncMergeSequence<Base: AsyncSequence>: AsyncSequence {
3434
}
3535

3636
public struct Iterator: AsyncIteratorProtocol {
37+
private let isEmpty: Bool
3738
let mergeStateMachine: MergeStateMachine<Element>
3839

3940
init(bases: [Base]) {
41+
isEmpty = bases.isEmpty
4042
self.mergeStateMachine = MergeStateMachine(
4143
bases
4244
)
4345
}
4446

4547
public mutating func next() async rethrows -> Element? {
48+
guard !self.isEmpty else { return nil }
49+
4650
let mergedElement = await self.mergeStateMachine.next()
4751
switch mergedElement {
4852
case .element(let result):

Sources/Combiners/Merge/MergeStateMachine.swift

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,8 @@ struct MergeStateMachine<Element>: Sendable {
9898
var regulators = [Regulator<Base>]()
9999

100100
for base in bases {
101-
let regulator = Regulator<Base>(base, onNextRegulatedElement: { [state] in Self.onNextRegulatedElement($0, state: state) })
101+
let regulator = Regulator<Base>(base, onNextRegulatedElement: { [state] in Self.onNextRegulatedElement($0, state: state) }
102+
)
102103
regulators.append(regulator)
103104
}
104105

Tests/Combiners/Merge/AsyncMergeSequenceTests.swift

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,6 @@ final class AsyncMergeSequenceTests: XCTestCase {
7676

7777
let expectedElements = asyncSequence1 + asyncSequence2 + asyncSequence3 + asyncSequence4
7878

79-
8079
let sut = merge(asyncSequence1.async, asyncSequence2.async, asyncSequence3.async, asyncSequence4.async)
8180

8281
var receivedElements = [Int]()
@@ -247,7 +246,7 @@ final class AsyncMergeSequenceTests: XCTestCase {
247246
for try await element in sut {
248247
firstElement = element
249248
canCancelExpectation.fulfill()
250-
wait(for: [hasCancelExceptation], timeout: 5)
249+
await fulfillment(of: [hasCancelExceptation], timeout: 5)
251250
}
252251
XCTAssertEqual(firstElement, 10)
253252
taskHasFinishedExpectation.fulfill()
@@ -289,4 +288,22 @@ final class AsyncMergeSequenceTests: XCTestCase {
289288

290289
wait(for: [hasCancelExceptation], timeout: 1)
291290
}
291+
292+
func testMerge_finishes_when_empty_array_of_base() {
293+
let sut = AsyncMergeSequence<AsyncStream<Int>>([])
294+
let hasFinishedExpectation = expectation(description: "Merge has finished")
295+
296+
let task = Task {
297+
var received = [Int]()
298+
for try await element in sut {
299+
received.append(element)
300+
}
301+
XCTAssertTrue(received.isEmpty)
302+
hasFinishedExpectation.fulfill()
303+
}
304+
305+
wait(for: [hasFinishedExpectation], timeout: 1)
306+
307+
task.cancel()
308+
}
292309
}

0 commit comments

Comments
 (0)