Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@
xcuserdata/
DerivedData/
.swiftpm/xcode/package.xcworkspace/contents.xcworkspacedata
Package.resolved
16 changes: 0 additions & 16 deletions Package.resolved

This file was deleted.

34 changes: 21 additions & 13 deletions Package.swift
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// swift-tools-version:5.5
// swift-tools-version:6.0
// The swift-tools-version declares the minimum version of Swift required to build this package.

import PackageDescription
Expand All @@ -16,23 +16,31 @@ let package = Package(
name: "AsyncExtensions",
targets: ["AsyncExtensions"]),
],
dependencies: [.package(url: "https://github.com/apple/swift-collections.git", .upToNextMajor(from: "1.0.3"))],
dependencies: [
.package(url: "https://github.com/apple/swift-collections.git", .upToNextMajor(from: "1.0.3")),
.package(url: "https://github.com/apple/swift-async-algorithms.git", .upToNextMajor(from: "1.0.0")),
.package(url: "https://github.com/OpenCombine/OpenCombine.git", from: "0.14.0"),
.package(url: "https://github.com/apple/swift-atomics.git", .upToNextMajor(from: "1.2.0")),
],
targets: [
.target(
name: "AsyncExtensions",
dependencies: [.product(name: "Collections", package: "swift-collections")],
path: "Sources"
// ,
// swiftSettings: [
// .unsafeFlags([
// "-Xfrontend", "-warn-concurrency",
// "-Xfrontend", "-enable-actor-data-race-checks",
// ])
// ]
dependencies: [
.product(name: "Collections", package: "swift-collections"),
.product(name: "Atomics", package: "swift-atomics")
],
path: "Sources",
swiftSettings: [.swiftLanguageMode(.v5)]
),
.testTarget(
name: "AsyncExtensionsTests",
dependencies: ["AsyncExtensions"],
path: "Tests"),
dependencies: [
"AsyncExtensions",
.product(name: "OpenCombine", package: "OpenCombine", condition: .when(platforms: [.linux])),
.product(name: "AsyncAlgorithms", package: "swift-async-algorithms")
],
path: "Tests",
swiftSettings: [.swiftLanguageMode(.v5)]
),
]
)
8 changes: 1 addition & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

**AsyncExtensions** provides a collection of operators that intends to ease the creation and combination of `AsyncSequences`.

**AsyncExtensions** can be seen as a companion to Apple [swift-async-algorithms](https://github.com/apple/swift-async-algorithms). For now there is an overlap between both libraries, but when **swift-async-algorithms** becomes stable the overlapping operators while be deprecated in **AsyncExtensions**. Nevertheless **AsyncExtensions** will continue to provide the operators that the community needs and are not provided by Apple.
**AsyncExtensions** can be seen as a companion to Apple [swift-async-algorithms](https://github.com/apple/swift-async-algorithms), which provides operators that the community needs and are not provided by Apple.

## Adding AsyncExtensions as a Dependency

Expand Down Expand Up @@ -44,11 +44,6 @@ AsyncStream)
* [AsyncThrowingReplaySubject](./Sources/AsyncSubjects/AsyncThrowingReplaySubject.swift): Throwing subject with a shared output. Maintains and replays a buffered amount of values

### Combiners
* [`zip(_:_:)`](./Sources/Combiners/Zip/AsyncZip2Sequence.swift): Zips two `AsyncSequence` into an AsyncSequence of tuple of elements
* [`zip(_:_:_:)`](./Sources/Combiners/Zip/AsyncZip3Sequence.swift): Zips three `AsyncSequence` into an AsyncSequence of tuple of elements
* [`zip(_:)`](./Sources/Combiners/Zip/AsyncZipSequence.swift): Zips any async sequences into an array of elements
* [`merge(_:_:)`](./Sources/Combiners/Merge/AsyncMerge2Sequence.swift): Merges two `AsyncSequence` into an AsyncSequence of elements
* [`merge(_:_:_:)`](./Sources/Combiners/Merge/AsyncMerge3Sequence.swift): Merges three `AsyncSequence` into an AsyncSequence of elements
* [`merge(_:)`](./Sources/Combiners/Merge/AsyncMergeSequence.swift): Merges any `AsyncSequence` into an AsyncSequence of elements
* [`withLatest(_:)`](./Sources/Combiners/WithLatestFrom/AsyncWithLatestFromSequence.swift): Combines elements from self with the last known element from an other `AsyncSequence`
* [`withLatest(_:_:)`](./Sources/Combiners/WithLatestFrom/AsyncWithLatestFrom2Sequence.swift): Combines elements from self with the last known elements from two other async sequences
Expand All @@ -58,7 +53,6 @@ AsyncStream)
* [AsyncFailSequence](./Sources/Creators/AsyncFailSequence.swift): Creates an `AsyncSequence` that immediately fails
* [AsyncJustSequence](./Sources/Creators/AsyncJustSequence.swift): Creates an `AsyncSequence` that emits an element an finishes
* [AsyncThrowingJustSequence](./Sources/Creators/AsyncThrowingJustSequence.swift): Creates an `AsyncSequence` that emits an elements and finishes bases on a throwing closure
* [AsyncLazySequence](./Sources/Creators/AsyncLazySequence.swift): Creates an `AsyncSequence` of the elements from the base sequence
* [AsyncTimerSequence](./Sources/Creators/AsyncTimerSequence.swift): Creates an `AsyncSequence` that emits a date value periodically
* [AsyncStream Pipe](./Sources/Creators/AsyncStream+Pipe.swift): Creates an AsyncStream and returns a tuple standing for its inputs and outputs

Expand Down
33 changes: 19 additions & 14 deletions Sources/AsyncChannels/AsyncBufferedChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
// Created by Thibault Wittemberg on 07/01/2022.
//

import Atomics
import DequeModule
import OrderedCollections

Expand Down Expand Up @@ -69,27 +70,24 @@ public final class AsyncBufferedChannel<Element: Sendable>: AsyncSequence, Senda
enum State: @unchecked Sendable {
case idle
case queued(Deque<Value>)
case awaiting(OrderedSet<Awaiting>)
case awaiting([Awaiting])
case finished

static var initial: State {
.idle
}
}

let ids: ManagedCriticalState<Int>
let ids: ManagedAtomic<Int>
let state: ManagedCriticalState<State>

public init() {
self.ids = ManagedCriticalState(0)
self.ids = ManagedAtomic(0)
self.state = ManagedCriticalState(.initial)
}

func generateId() -> Int {
self.ids.withCriticalRegion { ids in
ids += 1
return ids
}
ids.wrappingIncrementThenLoad(by: 1, ordering: .relaxed)
}

var hasBufferedElements: Bool {
Expand Down Expand Up @@ -155,12 +153,12 @@ public final class AsyncBufferedChannel<Element: Sendable>: AsyncSequence, Senda

func next(onSuspend: (() -> Void)? = nil) async -> Element? {
let awaitingId = self.generateId()
let cancellation = ManagedCriticalState<Bool>(false)
let cancellation = ManagedAtomic<Bool>(false)

return await withTaskCancellationHandler {
await withUnsafeContinuation { [state] (continuation: UnsafeContinuation<Element?, Never>) in
let decision = state.withCriticalRegion { state -> AwaitingDecision in
let isCancelled = cancellation.withCriticalRegion { $0 }
let isCancelled = cancellation.load(ordering: .acquiring)
guard !isCancelled else { return .resume(nil) }

switch state {
Expand All @@ -184,7 +182,13 @@ public final class AsyncBufferedChannel<Element: Sendable>: AsyncSequence, Senda
return .suspend
}
case .awaiting(var awaitings):
awaitings.updateOrAppend(Awaiting(id: awaitingId, continuation: continuation))
let awaiting = Awaiting(id: awaitingId, continuation: continuation)

if let index = awaitings.firstIndex(where: { $0 == awaiting }) {
awaitings[index] = awaiting
} else {
awaitings.append(awaiting)
}
state = .awaiting(awaitings)
return .suspend
case .finished:
Expand All @@ -200,12 +204,13 @@ public final class AsyncBufferedChannel<Element: Sendable>: AsyncSequence, Senda
}
} onCancel: { [state] in
let awaiting = state.withCriticalRegion { state -> Awaiting? in
cancellation.withCriticalRegion { cancellation in
cancellation = true
}
cancellation.store(true, ordering: .releasing)
switch state {
case .awaiting(var awaitings):
let awaiting = awaitings.remove(.placeHolder(id: awaitingId))
let index = awaitings.firstIndex(where: { $0 == .placeHolder(id: awaitingId) })
guard let index else { return nil }
let awaiting = awaitings[index]
awaitings.remove(at: index)
if awaitings.isEmpty {
state = .idle
} else {
Expand Down
33 changes: 19 additions & 14 deletions Sources/AsyncChannels/AsyncThrowingBufferedChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
// Created by Thibault Wittemberg on 07/01/2022.
//

import Atomics
import DequeModule
import OrderedCollections

Expand Down Expand Up @@ -80,27 +81,24 @@ public final class AsyncThrowingBufferedChannel<Element, Failure: Error>: AsyncS
enum State: @unchecked Sendable {
case idle
case queued(Deque<Value>)
case awaiting(OrderedSet<Awaiting>)
case awaiting([Awaiting])
case terminated(Termination)

static var initial: State {
.idle
}
}

let ids: ManagedCriticalState<Int>
let ids: ManagedAtomic<Int>
let state: ManagedCriticalState<State>

public init() {
self.ids = ManagedCriticalState(0)
self.ids = ManagedAtomic(0)
self.state = ManagedCriticalState(.initial)
}

func generateId() -> Int {
self.ids.withCriticalRegion { ids in
ids += 1
return ids
}
ids.wrappingIncrementThenLoad(by: 1, ordering: .relaxed)
}

var hasBufferedElements: Bool {
Expand Down Expand Up @@ -176,12 +174,12 @@ public final class AsyncThrowingBufferedChannel<Element, Failure: Error>: AsyncS

func next(onSuspend: (() -> Void)? = nil) async throws -> Element? {
let awaitingId = self.generateId()
let cancellation = ManagedCriticalState<Bool>(false)
let cancellation = ManagedAtomic<Bool>(false)

return try await withTaskCancellationHandler {
try await withUnsafeThrowingContinuation { [state] (continuation: UnsafeContinuation<Element?, Error>) in
let decision = state.withCriticalRegion { state -> AwaitingDecision in
let isCancelled = cancellation.withCriticalRegion { $0 }
let isCancelled = cancellation.load(ordering: .acquiring)
guard !isCancelled else { return .resume(nil) }

switch state {
Expand All @@ -208,7 +206,13 @@ public final class AsyncThrowingBufferedChannel<Element, Failure: Error>: AsyncS
return .suspend
}
case .awaiting(var awaitings):
awaitings.updateOrAppend(Awaiting(id: awaitingId, continuation: continuation))
let awaiting = Awaiting(id: awaitingId, continuation: continuation)

if let index = awaitings.firstIndex(where: { $0 == awaiting }) {
awaitings[index] = awaiting
} else {
awaitings.append(awaiting)
}
state = .awaiting(awaitings)
return .suspend
case .terminated(.finished):
Expand All @@ -227,12 +231,13 @@ public final class AsyncThrowingBufferedChannel<Element, Failure: Error>: AsyncS
}
} onCancel: { [state] in
let awaiting = state.withCriticalRegion { state -> Awaiting? in
cancellation.withCriticalRegion { cancellation in
cancellation = true
}
cancellation.store(true, ordering: .releasing)
switch state {
case .awaiting(var awaitings):
let awaiting = awaitings.remove(.placeHolder(id: awaitingId))
let index = awaitings.firstIndex(where: { $0 == .placeHolder(id: awaitingId) })
guard let index else { return nil }
let awaiting = awaitings[index]
awaitings.remove(at: index)
if awaitings.isEmpty {
state = .idle
} else {
Expand Down
71 changes: 43 additions & 28 deletions Sources/AsyncSubjects/AsyncCurrentValueSubject.swift
Original file line number Diff line number Diff line change
Expand Up @@ -67,54 +67,57 @@ public final class AsyncCurrentValueSubject<Element>: AsyncSubject where Element
/// Sends a value to all consumers
/// - Parameter element: the value to send
public func send(_ element: Element) {
self.state.withCriticalRegion { state in
let channels = self.state.withCriticalRegion { state -> [AsyncBufferedChannel<Element>] in
state.current = element
for channel in state.channels.values {
channel.send(element)
}
return Array(state.channels.values)
}

for channel in channels {
channel.send(element)
}
}

/// Finishes the async sequences with a normal ending.
/// - Parameter termination: The termination to finish the subject.
public func send(_ termination: Termination<Failure>) {
self.state.withCriticalRegion { state in
let channels = self.state.withCriticalRegion { state -> [AsyncBufferedChannel<Element>] in
state.terminalState = termination
let channels = Array(state.channels.values)
state.channels.removeAll()
for channel in channels {
channel.finish()
}
return channels
}

for channel in channels {
channel.finish()
}
}

func handleNewConsumer() -> (iterator: AsyncBufferedChannel<Element>.Iterator, unregister: @Sendable () -> Void) {
let asyncBufferedChannel = AsyncBufferedChannel<Element>()
var consumerId: Int!
var unregister: (@Sendable () -> Void)?

let (terminalState, current) = self.state.withCriticalRegion { state -> (Termination?, Element) in
(state.terminalState, state.current)
}

if let terminalState = terminalState, terminalState.isFinished {
asyncBufferedChannel.finish()
return (asyncBufferedChannel.makeAsyncIterator(), {})
}

asyncBufferedChannel.send(current)

let consumerId = self.state.withCriticalRegion { state -> Int in
state.ids += 1
state.channels[state.ids] = asyncBufferedChannel
return state.ids
self.state.withCriticalRegion { state in
let terminalState = state.terminalState
if let terminalState, terminalState.isFinished {
asyncBufferedChannel.finish()
} else {
state.ids &+= 1
consumerId = state.ids
state.channels[consumerId] = asyncBufferedChannel
asyncBufferedChannel.send(state.current)
}
}

let unregister = { @Sendable [state] in
state.withCriticalRegion { state in
state.channels[consumerId] = nil
if let consumerId {
unregister = { @Sendable [state, consumerId] in
state.withCriticalRegion { state in
state.channels[consumerId] = nil
}
}
}

return (asyncBufferedChannel.makeAsyncIterator(), unregister)
return (asyncBufferedChannel.makeAsyncIterator(), unregister ?? {})
}

public func makeAsyncIterator() -> AsyncIterator {
Expand All @@ -124,6 +127,7 @@ public final class AsyncCurrentValueSubject<Element>: AsyncSubject where Element
public struct Iterator: AsyncSubjectIterator {
var iterator: AsyncBufferedChannel<Element>.Iterator
let unregister: @Sendable () -> Void
var isFinished = false

init(asyncSubject: AsyncCurrentValueSubject) {
(self.iterator, self.unregister) = asyncSubject.handleNewConsumer()
Expand All @@ -134,11 +138,22 @@ public final class AsyncCurrentValueSubject<Element>: AsyncSubject where Element
}

public mutating func next() async -> Element? {
await withTaskCancellationHandler {
// Don't proceed if we've already finished
guard !isFinished else { return nil }

let result = await withTaskCancellationHandler {
await self.iterator.next()
} onCancel: { [unregister] in
unregister()
}

// If iteration completed normally (returned nil), unregister the channel
if result == nil {
isFinished = true
unregister()
}

return result
}
}
}
Loading