Video #198: Async Composable Architecture: Streams
Episode: Video #198 Date: Jul 25, 2022 Access: Members Only 🔒 URL: https://www.pointfree.co/episodes/ep198-async-composable-architecture-streams

Description
We introduce another helper to the Effect type that can use an asynchronous context to send multiple actions back into the system. By leveraging Swift’s structured concurrency we can create complex effects in a natural way, all without sacrificing testability.
Video
Cloudflare Stream video ID: 15ca4df29e3369735c7b9caddb83ea6f Local file: video_198_async-composable-architecture-streams.mp4 *(download with --video 198)*
References
- Discussions
- to do this
- the 200th prime
- discussions of this
- 0198-tca-concurrency-pt4
- Brandon Williams
- Stephen Celis
- Mastodon
- GitHub
- CC BY-NC-SA 4.0
- source code
- MIT License
Transcript
— 0:05
So things are starting to look pretty good. Not only can we use Effect.task to open up an asynchronous context for our effects, which allows us to perform as many awaits as we want and use a familiar coding style to compute the data that we want to feed back into the system, but we can now also leverage async sleeping in order to introduce delays into our effects.
— 0:24
And best of all, it’s still all testable. Sure we had to insert a little bit of hackiness into our test scheduler, but hopefully Swift will someday introduce tools that allow us to have more control over how work is scheduled in the concurrency runtime.
— 0:38
But there is still a pretty big blind spot in our asynchronous effects story for the Composable Architecture. And that’s effects that need to send multiple actions back into the system. Effect.task works great for when we have a single action, but what if we need to send many?
— 0:55
It’s totally possible to support this, and in doing so we will be able to greatly simplify many complex effects, and further reduce our reliance on Combine. Timer counter
— 1:07
Let’s explore this topic by introducing yet another feature to our increasingly bizarre counter demo application. We are going to add a button that when pressed starts a timer, and for each tick of the timer the count is incremented. And while the timer is running we will display a different button that allows us to stop the timer.
— 1:32
Let’s start by getting the view portion of this functionality in place, and then let that guide how we update the domain of the feature. In the view we need to be able to determine if the time is currently running or not so that we know which button to show: Section { if <#???#> { Button("Stop timer") { viewStore.send(<#???#>) } .frame(maxWidth: .infinity) } else { Button("Start timer") { viewStore.send(<#???#>) } .frame(maxWidth: .infinity) } } And in each of the buttons we need to send some action.
— 1:55
So, sounds like our state needs to hold a boolean that determines if the timer is running or not: if viewStore.isTimerRunning { … }
— 2:03
And the action enum needs two more cases for starting and stopping the timer: Button("Stop timer") { viewStore.send(.stopTimerButtonTapped) } .frame(maxWidth: .infinity) … Button("Start timer") { viewStore.send(.startTimerButtonTapped) } .frame(maxWidth: .infinity)
— 2:27
None of this is compiling, so let’s update the domain to accommodate for this new state and actions.
— 2:35
We can update the EffectsBasicsState to hold onto the boolean: struct EffectsBasicsState: Equatable { var isTimerRunning = false … }
— 2:43
And we can add two cases to the EffectsBasicsAction enum: enum EffectsBasicsAction: Equatable { … case startTimerButtonTapped case stopTimerButtonTapped }
— 2:51
But we will also have an action for when the timer effect emits: enum EffectsBasicsAction: Equatable { … case timerTick }
— 3:01
This is the action we can react to in order to increment the count for each timer tick.
— 3:07
We don’t need to update the environment at all because it already everything we need to implement this new functionality. In particular, we just need the main queue scheduler in order to construct a timer.
— 3:16
Next we have the reducer, which is the actual business logic of the feature. Things aren’t compiling right now because we have 3 new actions to implement in the switch. case .startTimerButtonTapped: case .stopTimerButtonTapped: case .timerTick:
— 3:35
We can start with the .startTimerButtonTapped action. First thing we can do is flip the isTimerRunning boolean to true since the timer will now be running: case .startTimerButtonTapped: state.isTimerRunning = true
— 3:46
Next we need to start a timer. There is a helper in the library that makes it easy to schedule a timer, and give it an explicit identifier so that it can be easily cancelled later: enum TimerID {} … case .startTimerButtonTapped: state.isTimerRunning = true return Effect.timer( id: TimerID.self, every: .seconds(1), on: environment.mainQueue ) .map { _ in .timerTick }
— 4:53
Then, when the stop button is tapped we can flip the isTimerRunning boolean back to false and cancel the timer: case .stopTimerButtonTapped: state.isTimerRunning = false return .cancel(id: TimerID.self)
— 5:09
And the final piece of logic in the reducer is what happens when we receive a timerTick action: case .timerTick: state.count += 1 return .none }
— 5:16
That’s all it takes to add a timer to our demo, and we can give it a spin in the preview to see that everything works as expected.
— 5:31
The best part of implementing this new functionality using the Composable Architecture is that it instantly becomes testable. We already have all the tools at our disposal with no additional work.
— 5:41
Let’s first get a stub of a test into place: func testTimer() { let store = TestStore( initialState: EffectsBasicsState(), reducer: effectsBasicsReducer, environment: .unimplemented ) } Before sending actions into the test store so that we can assert on what happens, we should first decide what dependencies we think are going to be used in the test. We will definitely need to supply a main queue, but it doesn’t seem like the fact client is going to be used, so we can leave it unimplemented.
— 6:11
So, we will introduce a test scheduler and set it inside the test store’s environment: func testTimer() { let mainQueue = DispatchQueue.test let store = TestStore( initialState: EffectsBasicsState(), reducer: effectsBasicsReducer, environment: .unimplemented ) store.environment.mainQueue = mainQueue .eraseToAnyScheduler() }
— 6:29
Now we can start sending actions. Let’s first simulate what happens when the start timer button is tapped. We expect the isTimeRunning boolean to flip to true: store.send(.startTimerButtonTapped) { $0.isTimerRunning = true }
— 6:48
If we leave things as-is then we will of course have a test failure because this action kicks off a long-living effect that is still alive: testTimer(): An effect returned for this action is still running. It must complete before the end of the test. …
— 7:01
The way to fix this is to send the action that is responsible for tearing down that long-living effect, which happens to be the .stopTimerButtonTapped action, and when that action is sent we expect the isTimerRunning boolean to flip back to false: store.send(.stopTimerButtonTapped) { $0.isTimerRunning = false }
— 7:16
The test is now passing, but also we aren’t really exercising much of the timer functionality. We just start the timer and then immediately stop it before it can even tick a single time.
— 7:28
In order get some real test coverage on the timer functionality we need to tell the test clock to move time forward so that the timer effect can feed an event back into the system: mainQueue.advance(by: .seconds(1)) store.receive(.timerTick) { $0.count = 1 }
— 7:52
And now we are getting some good coverage on the actual logic of the feature.
— 7:59
We could also advance it a couple of seconds at once and then assert on each action that was received: scheduler.advance(by: .seconds(4)) store.receive(.timerTick) { $0.count = 2 } store.receive(.timerTick) { $0.count = 3 } store.receive(.timerTick) { $0.count = 4 } store.receive(.timerTick) { $0.count = 5 }
— 8:16
So, we now have the feature implemented, including a test that exercises its logic. It’s pretty incredible to see how easy it can be to layer on complex functionality while not losing out on any testability. A better timer
— 8:27
But things could be better. Right now we are using the Effect.timer helper the library comes with, which is a Combine publisher. This means we have to leverage Combine operators if we need to perform transformations or customizations, and sometimes that can be quite unwieldy. For example, suppose we didn’t want the timer to simply tick once every second, but rather have it slowly speed up as time went on. Doing that with Combine publishers can be quite tricky.
— 8:53
Such customizations are a lot easier to accomplish in the world of structured concurrency. We’ve already see than the simple Effect.task is capable of cleaning up complex, asynchronous code, but unfortunately that effect can only emit a single time. We need to come up with a new effect helper that provides an asynchronous context like Effect.task does, but also allows sending as many actions as you want.
— 9:18
Let’s sketch out what such an API would look like, and then see how it can be implemented.
— 9:25
What we’d like to do is return an effect that opens up a new asynchronous context, and currently Effect.task is the only such tool so let’s just use that for a moment: case .startTimerButtonTapped: state.isTimerRunning = true return .task { }
— 9:36
Then in here we’d like to start up a timer. Thanks to the asynchronous context provide it is very easy to provide timer-like behavior by just doing an infinite loop with a sleep on the inside: return .task { while true { try? await environment.mainQueue.sleep(for: .seconds(1)) // send timerTick somehow??? } }
— 10:03
And then we could mark the whole effect as cancellable so that we could cancel it later: return .task { … } .cancellable(id: TimerID.self)
— 10:17
Now, we of course can’t use Effect.task for this because it’s an effect that can only emit a single time. We need a different name, and in fact the library actually comes with an effect that is quite similar to what we are trying to theorize here.
— 10:32
It’s called Effect.run , and it allows you to create an effect that can output any number of values. It’s created with a closure that is handed what is called a “subscriber”, which is just something we can send data to: Effect.run { subscriber in … }
— 10:45
Interestingly we also must return a cancellable from the closure: Effect.run { subscriber in … return AnyCancellable { … } }
— 10:54
The cancellable serves two purposes:
— 10:56
First, if the effect is cancelled, this closure will be invoked allowing us to perform clean up, such as releasing resources or canceling other systems that are running.
— 11:05
Second, it acts as a mechanism to keep resources alive while the effect is doing its work. This is needed because the closure provided to Effect.run does not actually do the real work of producing the data that is sent to the subscriber. In fact, it’s lifetime ends pretty much immediately, because the real work of the effect is done by handing off control to some other system using delegates or callbacks. Because of this we need a mechanism for keeping resources alive while the effect is doing its work, and so we can use the cancellable to capture those resources, and then release them when the effect finishes.
— 11:37
We have an example of this in the speech recognition demo application that’s included in the repo, where we use Effect.run to interface with the SFSpeechRecognizer class for listening to audio and transcribing it to text. The SpeechClient interface that encapsulates this dependency uses Effect.run in order to perform this work, and uses the cancellable to keep alive the audio session and speech recognization objects while the effect is doing its work: recognitionTask: { request in Effect.run { subscriber in … let cancellable = AnyCancellable { audioEngine?.stop() inputNode?.removeTap(onBus: 0) recognitionTask?.cancel() _ = speechRecognizer _ = speechRecognizerDelegate } … return cancellable } }
— 12:10
We essentially want an async/await version of this helper. But even better, the async/await version won’t even need the cancellable. The asynchronous closure provided for the effect lives for the entire lifetime of the effect. There’s no need to capture resources to keep them alive because the effect is only alive for as long as the closure is alive. This will greatly simplify things.
— 12:31
So, what we are seeing here is that we want a new Effect.run that takes an asynchronous closure which is passed some object that can send actions back into the system: return .run { send in while true { try? await environment.mainQueue.sleep(for: .seconds(1)) send(.timerTick) } } .cancellable(id: TimerID.self)
— 12:57
With such an effect it would be incredibly easy to add logic or customizations to this timer in a very familiar style.
— 13:03
For example, if we wanted to speed up the timer a little bit with each tick we could simply do: return .run { send in var count = 0 while true { defer { count += 1 } try? await environment.mainQueue.sleep( for: .milliseconds(max(10, 1000 - count * 50)) ) send(.timerTick) } } .cancellable(id: TimerID.self) Now with every tick the timer will wait 50 milliseconds less than the previous tick.
— 13:48
So, what does it take to implement this new async/await-friendly Effect.run ? Well, it’s not so bad actually because we can make use of the old, Combine-based Effect.run under the hood. Let’s start by getting a signature in place.
— 14:00
It’s a static function on the Effect type that takes an operator as an argument, which is an async closure that itself takes a closure as an argument: extension Effect { static func run( operation: (_ send: (Output) -> Void) async -> Void ) -> Self { } }
— 14:23
This seems complex, but remember that at the call site we open up an Effect.run and we are handed a “send” function that takes actions so that we can send actions back into the system: Effect.run { (send: (Action) -> Void) in send(…) }
— 14:38
So, it looks quite complex, but it really is what we need.
— 14:41
The effect we return on the inside can just call down to the Combine-based API that hands us a subscriber and we must return a cancellable: extension Effect { static func run( operation: (_ send: (Output) -> Void) async -> Void ) -> Self { .run { subscriber in return AnyCancellable { } } } }
— 14:52
Technically this gets everything compiling right now, but also this effect isn’t actually doing anything. What we want to do is invoke the async operation , which requires handing over a closure, and whenever that closure is invoked with some output we pass it along to the subscriber: await operation { output in subscriber.send(output) }
— 15:14
However, we can’t do this because we don’t have an asynchronous context available right now. We need to create one, and as we explored deeply in our previous series on concurrency , the only way to do this is by spinning up a new unstructured task. extension Effect { public static func run( operation: (_ send: (Output) -> Void) async -> Void ) -> Self { .run { subscriber in Task { await operation { output in subscriber.send(output) } } return AnyCancellable { } } } }
— 15:35
But in order for that to work we need to make the operation escaping: operation: @escaping ((Output) -> Void) async -> Void
— 15:40
Also, thanks to what we’ve learned in previous episodes on concurrency, we also know that the operation should be marked as @Sendable since Task ’s initializer takes a sendable closure: operation: @escaping @Sendable ((Output) -> Void) async -> Void
— 15:50
We are getting close, but there are still a few things not right about this.
— 15:53
First of all, nothing is completing the effect right now. The Combine-based Effect.run creates an effect that is long living until the “completion” event is sent to the subscriber: subscriber.send(completion: .finished) If we invoke that right after the operation is done suspending then we can ensure that the effect is completed as soon as the asynchronous work completes: Task { await operation { output in subscriber.send(output) } subscriber.send(completion: .finished) }
— 16:14
Getting closer to the final form, but still something not quite right.
— 16:17
If something cancels the effect then we want that to trickle down and cancel the asynchronous work being performed. If the async work happening inside operation is regularly checking Task.isCancelled or using the throwing Task.checkCancellation() function, then it will be able to short-circuit future work as soon as it detects it is cancelled.
— 16:35
To accomplish this we can get a handle on the task we create, and then cancel it whenever the returned cancellable is invoked: extension Effect { public static func _run( operation: @escaping (_ send: (Output) -> Void) async -> Void ) -> Self { .run { subscriber in let task = Task { await operation { output in subscriber.send(output) } subscriber.send(completion: .finished) } return AnyCancellable { task.cancel() } } } }
— 16:45
Effect.run is almost fully complete, there’s just two more small changes we need to make.
— 16:50
The asynchronous context our effect runs in can execute on any thread, sometimes even changing threads multiple times during the lifetime of the effect, but for the purposes of the Composable Architecture we need all effect emissions to happen on the main thread.
— 17:03
We can force this at the level of Effect.run by marking the send closure as @MainActor : operation: @escaping ( _ send: @MainActor (Output) -> Void ) async -> Void
— 17:10
This will force anyone who tries to send an output to the closure to do so on the main actor. If they are already on the main actor, then they can do so without any additional ceremony. However, if they are not, as is the case in our reducer, then we get a compiler error letting us know that we must await.
— 17:26
We also need to make sure the effect completes on the main thread, which is where the subscriber is sent a completion. We can ensure that line executes on the main actor by annotating the task’s closure from where it’s invoked: let task = Task { @MainActor in … subscriber.send(completion: .finished) }
— 17:37
Note that this does not force the operation to execute on the main thread. That work is still free to operate on any thread in the cooperative pool. Adding the @MainActor annotation on makes it so that sending the finished event happens on the main thread.
— 17:51
We have a compiler error back where we’re using this new run helper, where we feed an action to the send closure. Expression is ‘async’ but is not marked with ’await’
— 17:59
This is necessary because we need to be able to suspend while we wait for the main actor to be ready to accept the work we want to enqueue on it, in particularly calling the send closure with .timerTick : await send(.timerTick)
— 18:09
With those few changes things are compiling again, and we can run the demo again in the simulator and now it runs with no warnings. Stopping the timer also works, which means effect cancellation is also properly propagating into the asynchronous context.
— 18:35
However, there’s a subtly to consider. Currently our effect sleeps using try? : try? await environment.mainQueue.sleep( for: .milliseconds(max(10, 1000 - count * 50)) )
— 18:41
This means once the task is cancelled the sleep function will throw, but we will never exit the while loop since we are using try? . To confirm this we can put a print statement at the beginning of the loop: while true { print("Hello!") … }
— 18:53
And when we run the app, start and stop the timer, we will see the while loop is still running. Now since the effect was cancelled it can no longer feed data back into the system, but technically the effect code is still running.
— 19:12
One easy fix for this is to check if the task is cancelled with every iteration of the loop: while !Task.isCancelled { … }
— 19:20
Or we could wrap everything in a do / catch so that we could use the non-optional version of try , which would allow cancellation of the sleep to propagate upwards: return .run { send in do { … } catch {} }
— 19:33
Either of these will work fine, but also in the actual final release of the library that includes all of these concurrency tools, we will actually allow Effect.run to throw cancellation errors without wrapping everything in a do / catch , which will greatly increase the safety and ergonomics of Effect.run , but we aren’t going to discuss that right now.
— 19:51
This is all looking pretty incredible. We have now constructed an extremely complex, long-living effect. A timer that counts up faster and faster as time goes on, and the whole thing is cancellable.
— 20:02
And even better, it’s still 100% testable. We can hop over to our tests, and upgrade it to run in an asynchronous context: func testTimer() async { … await mainQueue.advance(by: .seconds(1)) await store.receive(.timerTick) { … } await mainQueue.advance(by: .seconds(4)) await store.receive(.timerTick) { … } await store.receive(.timerTick) { … } await store.receive(.timerTick) { … } await store.receive(.timerTick) { … } … }
— 20:02
We can run our tests and they already pass! Which may seem a little surprising, because we changed the timer to speed up over time. Well it turns out that the first few updates are still measurable at second intervals, but we can strengthen our test coverage by asserting against the exact delays we expect: await scheduler.advance(by: .seconds(1)) await store.receive(.timerTick) { $0.count = 1 } await scheduler.advance(by: .milliseconds(950)) await store.receive(.timerTick) { $0.count = 2 } await scheduler.advance(by: .milliseconds(900)) await store.receive(.timerTick) { $0.count = 3 } await scheduler.advance(by: .milliseconds(850)) await store.receive(.timerTick) { $0.count = 4 } await scheduler.advance(by: .milliseconds(800)) await store.receive(.timerTick) { $0.count = 5 }
— 21:09
And this passes too. Prime computation
— 21:16
It’s absolutely amazing that we can test such complex asynchronous code and so easily. We now have the perfect tool for expressing a long-living effect that can send multiple actions back into the system.
— 21:27
Let’s pile on yet another feature onto our increasingly complex demo counter application. We are going to add a button that allows you to compute the “nth prime” based on the current count of the screen. So, if the counter is set to 10, then tapping the button will compute the 10th prime. And if the counter is on 50,000, then tapping the button will compute the 50,000th prime.
— 21:48
To make things a little more interesting, we are going to show a progress bar in the UI to show the progress in computing that prime number. This will be handy because it can take a few seconds to compute really large prime numbers.
— 22:01
This will give us an opportunity to not only show off our new Effect.run helper again, but will also showcase Swift’s cooperative concurrency features. We will allow our intense prime computation to regularly suspend itself so that other tasks can get time on the CPU, which is very difficult to accomplish with the old concurrency tools in the Apple ecosystem.
— 22:26
Let’s start with the view layer. We will create a new section to house a button for computing the “nth prime”: Section { Button("Compute \(viewStore.count)th prime") { viewStore.send(.nthPrimeButtonTapped) } .frame(maxWidth: .infinity) }
— 22:55
And then just below this button we want to check if the nth prime calculation is in progress, and if so show a progress view: if let nthPrimeProgress = viewStore.nthPrimeProgress { ProgressView(value: nthPrimeProgress) .progressViewStyle(.linear) }
— 23:32
We just need to figure out what to put in these placeholder spots. This is a domain modeling exercise where we update our state and actions so that we have access to this information from the view.
— 23:43
Next we can add an optional double to the state to signify whether or not the computation is in progress, and how far along the computation is: struct EffectsBasicsState: Equatable { … var nthPrimeProgress: Double? }
— 23:50
And then, we can add an action that signifies the button being tapped: enum EffectsBasicsAction: Equatable { … case nthPrimeButtonTapped … }
— 23:55
So, we’ve added information to our domain in order to accommodate the view. Now let’s see what it takes to update the reducer so that we can actually implement the new logic.
— 24:01
Currently the reducer is not compiling because we need to handle the new action: case .nthPrimeButtonTapped: return .none
— 24:10
In here we want to spin up a new long-living effect that performs the “nth prime” computation, and periodically reports its progress back to the system.
— 24:19
We actually made use of an “nth prime” function in our concurrency series of episodes in order to explain how an intense CPU computation and play nicely in the cooperative pool by yielding its execution context every once in awhile. Let’s copy and paste that function real quick so that we can take some inspiration from it: private func isPrime(_ p: Int) -> Bool { if p <= 1 { return false } if p <= 3 { return true } for i in 2...Int(sqrtf(Float(p))) { if p % i == 0 { return false } } return true } private func asyncNthPrime(_ n: Int) async { let start = Date() var primeCount = 0 var prime = 2 while primeCount < n { defer { prime += 1 } if isPrime(prime) { primeCount += 1 } else if prime.isMultiple(of: 1_000) { await Task.yield() } } print( "\(n)th prime", prime-1, "time", Date().timeIntervalSince(start) ) }
— 24:42
The asyncNthPrime simply loops through every number up to n and counts how many of those numbers were prime. It also sprinkles in some Task.yield s every once in awhile so that other tasks can get a chance to use the thread. Now, in the previous episodes we didn’t have any concept of progress. We just let the function do its thing, and then printed the prime number at the end along with how long it took.
— 25:03
We can adapt this function to do what we want. Let’s start by opening up a new .run effect so that we can do some work and send actions along the way: case .nthPrimeButtonTapped: return .run { send in }
— 25:15
We can copy-and-paste most of the asyncNthPrime function inside here with a few small modifications to do the intense computation: case .nthPrimeButtonTapped: return .run { send in var primeCount = 0 var prime = 2 while primeCount < state.count { defer { prime += 1 } if isPrime(prime) { primeCount += 1 } else if prime.isMultiple(of: 1_000) { await Task.yield() } } }
— 25:24
This doesn’t currently work because we are capturing the mutable state value a @Sendable closure. But, we don’t actually need state as a mutable value, we just need the count as an immutable value, so let’s explicitly capture: return .run { [count = state.count] send in … while primeCount < count { … } }
— 25:36
Now, once this while loop is done it means we have computed the nth prime, and so we want to report back to the system by sending an action: while primeCount < count { … } await send(<#???#>)
— 25:49
So, looks like we need a new action to represent the response from this computation: enum EffectsBasicsAction: Equatable { … case nthPrimeResponse(Int) … }
— 26:01
Then we can send the action from the effect: await send(.nthPrimeResponse(prime - 1))
— 26:18
And respond to that action in the reducer by filling in fact state and clearing out the progress state since the computation is no longer inflight: case let .nthPrimeResponse(prime): state.numberFact = "The \(state.count)th prime is \(prime)." state.nthPrimeProgress = nil return .none
— 26:54
We can actually run this in the preview and it works. We can count up to, say 10, tap the “Compute the 10th prime” button, and the fact field will populate with: Fact The 10th prime is 29.
— 27:06
However, we don’t yet have the progress aspect in place yet. And this is definitely an issue, because if we start the screen off with a very high count, such as 50,000: initialState: EffectsBasicsState(count: 50_000),
— 27:17
And then press the “Compute the 50000th prime” we will see that nothing seems to happen for a few seconds, and then all the sudden the fact pops in: Fact The 50000th prime is 611953
— 27:28
This is not a pleasant user experience, so we really do want to get the progress aspect in place.
— 27:34
It’s easy enough to compute the percentage progress of this computation because we are just linearly counting from 2 up to count , and so it’s just a matter of dividing those numbers as doubles: Double(primeCount) / Double(count)
— 27:50
In order to send this information into the system we need yet another action to receive the double as progress: enum EffectsBasicsAction: Equatable { … case nthPrimeProgress(Double) … }
— 28:08
Then we could send this action at the end of the while loop: while primeCount < count { … await send( .nthPrimeProgress( Double(primeCount) / Double(count) ) ) }
— 28:11
And we can respond to it in the reducer: case let .nthPrimeProgress(progress): state.nthPrimeProgress = progress return .none
— 28:27
However, our placement of send in the Effect.run is probably a bit too heavy-handed in reporting the progress of this computation. This will send an action for every single number we check, which in our current preview would mean sending 50,000 actions. Now, the progress percentage does not change significantly between checking one number and its successor.
— 28:54
We already have a mechanism in this function for making it cooperate with other CPU tasks by yielding every 1,000 numbers we check. Perhaps we could report the progress at these branch points too: } else if prime.isMultiple(of: 1_000) { await send( .nthPrimeProgress( Double(primeCount) / Double(count) ) ) await Task.yield() } This will greatly reduce the number of times we send actions to update the progress.
— 29:08
And with that we have finished implementing the feature, and it works as we expect. If we try to compute the 50,000th prime we will see a progress bar appear, slowly fill up, and then report the final answer.
— 29:23
It’s honestly pretty incredible how easy it was to create such a complex effect, and do so right inline in our reducer. Thanks to the asynchronous context provided to us and the send function, we can do whatever asynchronous work we want and send as many actions back into the system we want.
— 29:40
Let’s make things a little bit nicer. Right now the UI is kinda just popping into place when things change, such as the progress bar and fact. It would be nicer to apply some animations so that things slide in and out or fade in and out.
— 29:58
One thing we can do is just make use of withAnimation directly in the effect. However, if we try to do this naively: withAnimation { await send( .nthPrimeProgress( Double(primeCount) / Double(count) ) ) } Cannot pass function of type ‘() async -> ()’ to parameter expecting synchronous function type
— 30:15
…we get a compiler error. The work done inside withAnimation must be synchronous. We are not allowed to perform asynchronous work. This is because withAnimation needs to look at the state of the UI before and after mutating state in order to know what to animate.
— 30:31
Now, the only reason send needs to be awaited is because it’s marked as @MainActor and the effect is running in an arbitrary task. So, we must await so that we can serialize our work to the main thread.
— 30:40
Now technically we could mark the entire effect as @MainActor : .run { @MainActor send in … }
— 30:57
But this is not ideal because now all the intense CPU work is being done on the main thread. Previously it was working on threads in the cooperative thread pool managed by Swift’s concurrency runtime. So, let’s not do that.
— 31:24
We want to allow this effect to execute in the cooperative thread pool, and just at the moment of sending the action into the store we want switch to the main thread. We can do this by using the run static method on MainActor : await MainActor.run { }
— 31:42
The closure handed to run is synchronous, that is, it is not marked as async , and so we can only do synchronous work in here, and it will be performed on the main thread.
— 31:51
This means we can call send without awaiting it from inside withAnimation : await MainActor.run { withAnimation { send(.nthPrimeResponse(prime - 1)) } } Reference to captured var ‘prime’ in concurrently-executing code; this is an error in Swift 6
— 31:57
Though we do need to immutably capture any mutable variables first: await MainActor.run { [prime] in … }
— 32:08
So, this does work, but it’s also very verbose. Wouldn’t it be nicer to just specify the animation when sending the animation?
— 32:44
This is already what we do when sending an action to a view store: viewStore.send(…, animation: .default) So, what if we could this when sending actions from Effect.run : await send( .nthPrimeProgress( Double(primeCount) / Double(count) ), animation: .default ) … await send( .nthPrimeResponse(prime - 1), animation: .default )
— 32:54
In order to support this we need to upgrade the send closure to a full on type so that it can optionally accept an animation.
— 33:03
The type can just be a simple struct wrapper around a closure for sending actions: struct Send<Action> { let send: (Action) -> Void }
— 33:17
And we can mark the type as @MainActor since we want all of its interactions to happen on the main thread: @MainActor struct Send<Action> { let send: (Action) -> Void }
— 33:22
Then we can swap out the closure in Effect.run with this type: extension Effect { public static func run( operation: @escaping @Sendable ( Send<Output> ) async throws -> Void ) -> Self { .run { subscriber in let task = Task { try await operation( Send(send: { subscriber.send($0) }) ) subscriber.send(completion: .finished) } return AnyCancellable { task.cancel() } } } }
— 33:43
We now have some compiler errors because technically we now need to use send.send anywhere we want to send an action from the effect: await send.send(.timerTick)
— 33:53
That is of course strange, but we can leverage a feature of Swift that allows us to treat the send value as if it is a function that can be called directly: @MainActor struct Send<Action> { let send: @Sendable (Action) -> Void func callAsFunction(_ action: Action) { self.send(action) } } Now we are able to go back and treat send as a function, even though secretly it’s its own type: await send(.timerTick)
— 34:12
And even better, we can define an overload of callAsFunction that takes an animation so that we can wrap the send in withAnimation : public func callAsFunction( _ action: Action, animation: Animation? ) { withAnimation(animation) { self.send(action) } }
— 34:51
Now everything is compiling, and our demo application uses animations when showing and hiding various UI components.
— 35:19
Now if we run in the simulator we will see that we can fetch a prime and see a nice animation.
— 35:30
So, this is all looking pretty incredible, but things get even better. Although we have introduced an incredibly complex effect to our reducer, everything is still 100% testable. To prove this, let’s write a test.
— 35:43
We can start by getting a stub of a test in place, but unlike our other tests, let’s have the initial state already be at a high number, like say 200, so that we don’t have to send a bunch of actions to increment: func testNthPrime() async throws { let store = TestStore( initialState: EffectsBasicsState(count: 200), reducer: effectsBasicsReducer, environment: .unimplemented ) }
— 36:20
Now let’s simulate what happens when we tap the “nth prime” button: store.send(.nthPrimeButtonTapped)
— 36:25
We of course don’t expect this test to pass yet because the effect is going to cause actions to be fed back into the system, and we need to account for those.
— 36:33
We expect a progress action to be sent back into the system, but in order to do so we need to know what progress value is going to be fed back in: await store.receive(.nthPrimeProgress(<#???#>))
— 36:46
Now, we technically could compute this from first principles. It’s a complex computation, but it is technically possible.
— 36:53
Alternatively, we could just stub something in temporarily: await store.receive(.nthPrimeProgress(0))
— 37:01
And then run the tests and we will get a nicely formatted failure letting us know that the reported progress was 0.84: testNthPrime(): Received unexpected action: … EffectsBasicsAction.nthPrime( − EffectsBasicsAction.NthPrimeAction.progress(0.0) + EffectsBasicsAction.NthPrimeAction.progress(0.84) ) (Expected: −, Received: +) Failed: testNthPrime(): State was not expected to change, but a change occurred: … EffectsBasicsState( count: 200, isNumberFactRequestInFlight: false, isTimerRunning: false, − nthPrimeProgress: nil, + nthPrimeProgress: 0.84, numberFact: nil ) (Expected: −, Actual: +)
— 37:05
And so we can hard code that percentage in the action, and assert how the state changes: await store.receive(.nthPrimeProgress(0.84)) { $0.nthPrimeProgress = 0.84 }
— 37:15
So, this is an easy way to get the data we need for this test.
— 37:17
It’s also worth noting that this value could have also been computed by us from first principles. We know that progress is reported every 1,000th iteration of the while loop, which means we need to figure out how many primes there are less than 1,000. We can use Wolfram alpha to do this , which, which is a powerful computing platform that allows you to ask questions to get answers.
— 37:44
So there are 168 primes less than 1,000, and we are counting up to 200, and it just so happens that 168/200 is exactly 0.84.
— 37:54
Now that we’ve correctly received that action we need to receive the next action. We can ask Wolfram for the 200th prime .
— 38:17
…to find that it is 1,223.
— 38:20
Since that is less than 2,000 we don’t expect to get another progress action, and instead just the final response action, at which point the progress goes back to nil and the numberFact field updates: await store.receive(.nthPrimeResponse(1_223)) { $0.nthPrimeProgress = nil $0.numberFact = "The 200th prime is 1223." }
— 38:42
If we run tests they very nearly pass, but there is just the one single failure: testNthPrime(): An effect returned for this action is still running. It must complete before the end of the test. …
— 38:52
It’s only complaining that the effect is technically still inflight. Our assertions are correct, which means we did receive the nthPrimeResponse action with a value of 1223, and the state did update exactly as we specified.
— 39:02
The problem is that even though the Effect.run we construct in the reducer completes immediately after it sends the last action, it’s still running on a background thread which means it may need a tiny bit of time to actually fully complete.
— 39:20
In fact, if we just insert a little Task.yield at the end of the test: func testNthPrime() async throws { … await Task.yield() }
— 39:25
…we’ll see that the test now passes. By suspending for just a brief moment we give the Effect.run effect enough time to completely finish so that the test store can see that everything was cleaned up.
— 39:31
We didn’t run into this problem before with our timer code because we had an explicit action that was responsible for cancelling the timer, and that immediately tears down the effect and so there is no need to wait around for a little bit of time. Next time: effect lifetimes
— 39:43
It’s a bit of a bummer that we need to insert this yield. We don’t even know if it’s enough to get things to always pass. Maybe if we run this enough times it will eventually fail, leading us to insert a few additional yields to push things along.
— 39:57
One thing that could potentially fix this is if Swift supported async deinit for objects, because then we could suspend for a bit when the test store is torn down to see if all effects finished, and if not then we could error. There has been some discussions of this in the evolution forums, but no movement on a final design yet.
— 40:16
But, even before we get that feature, there’s still something we can do to make this work deterministically without sprinkling yields into the test. It is possible to send an action to the store and get back a task that represents the lifecycle of the effect that was kicked off from that action. That would give you something concrete to await on so that you could suspend until the exact moment that the effect finishes. This would give us a 100% deterministic way to make our test pass.
— 40:44
It also turns out that by getting a handle on an effect’s lifecycle we can improve other parts of the library too. For example, SwiftUI has an interesting view modifier called .task that allows you to spin up some asynchronous work when the view appears, and that work is automatically cancelled when the view disappears.
— 41:02
Wouldn’t it be cool if we could send an action in that task modifier so that when the view disappears it cancels the inflight effect that was kicked off from the action? This makes it possible for a feature to seamlessly tear down its effects when the view disappears.
— 41:17
Let’s see how this is possible…next time! References Collection: Concurrency Brandon Williams & Stephen Celis Note Swift has many tools for concurrency, including threads, operation queues, dispatch queues, Combine and now first class tools built directly into the language. We start from the beginning to understand what the past tools excelled at and where they faultered in order to see why the new tools are so incredible. http://pointfree.co/collections/concurrency Downloads Sample code 0198-tca-concurrency-pt4 Point-Free A hub for advanced Swift programming. Brought to you by Brandon Williams and Stephen Celis . Content Become a member The Point-Free Way Beta previews Gifts Videos Collections Free clips Blog More About Us Community Slack Mastodon Twitter BlueSky GitHub Contact Us Privacy Policy © 2026 Point-Free, Inc. All rights are reserved for the videos and transcripts on this site. All other content is licensed under CC BY-NC-SA 4.0 , and the underlying source code to run this site is licensed under the MIT License .