Video #105: Combine Schedulers: Controlling Time
Episode: Video #105 Date: Jun 8, 2020 Access: Members Only 🔒 URL: https://www.pointfree.co/episodes/ep105-combine-schedulers-controlling-time

Description
The Scheduler protocol of Combine is a powerful abstraction that unifies many ways of executing asynchronous work, and it can even control the flow of time through our code. Unfortunately Combine doesn’t give us this ability out of the box, so let’s build it from scratch.
Video
Cloudflare Stream video ID: 951bd3d35131b70c675b877b5f8371f7 Local file: video_105_combine-schedulers-controlling-time.mp4 *(download with --video 105)*
References
- Discussions
- combine-schedulers
- 0105-combine-schedulers-pt2
- Brandon Williams
- Stephen Celis
- Mastodon
- GitHub
- CC BY-NC-SA 4.0
- source code
- MIT License
Transcript
— 0:05
This just doesn’t seem to be very dependable. I wouldn’t want to depend on this test suite where first there’s magical constants being used to determine how long to wait, and second the test suite could randomly fail for no fault of our own. This creates an environment of not trusting our test suite.
— 0:19
Luckily Combine gives us a tool for handling this. The crux of the problem is that we used DispatchQueue.main when telling our publishers what queue to receive values on and when describing how to debounce values. The Scheduler protocol
— 0:48
There is another value we can use instead of DispatchQueue.main which executes its work immediately, rather than invoking the full machinery of Grand Central Dispatch.
— 1:02
Let’s give it a spin by swapping out the DispatchQueue for an ImmediateScheduler when we invoke the register endpoint: self.register(self.email, self.password) // .receive(on: DispatchQueue.main) .receive(on: ImmediateScheduler.shared)
— 1:17
We can now remove the explicit waits in our tests that exercise the registration flow: // _ = XCTWaiter.wait(for: [XCTestExpectation()], timeout: 0.01)
— 1:31
And tests still pass!
— 1:44
This is giving us our first hints of what it means to control time. By swapping out the live queue for an immediate scheduler we get to collapse all asynchrony into plain synchrony, making everything execute right away without doing any waiting.
— 2:00
Let’s try to do the same with the validation publisher, in particular when we debounce: .debounce(for: 0.3, scheduler: ImmediateScheduler.shared)
— 2:21
Running tests we get a crash: Thread 1: EXC_BAD_INSTRUCTION (code=EXC_I386_INVOP, subcode=0x0)
— 2:24
That’s not great.
— 2:30
This is happening because apparently ImmediateScheduler and debounce don’t play nicely together. This kind of makes sense, after all debouncing is an operation that exists specifically to delay actions after a period of time, whereas immediate schedulers are all about executing things right this moment.
— 2:46
So the immediate scheduler isn’t really what we want. What we want is a way to control the passage of time, so that when we are testing a debounced publisher we can instantly move time forward 300 milliseconds to see how it behaves, rather than waiting for a dispatch queue to catch up or having things executed immediately.
— 3:10
Turns out this is possible to do, but before we can do it we need to under why it is even possible to interchangeably use DispatchQueue and ImmediateScheduler for these Combine operators.
— 3:23
The receive method on publishers doesn’t take dispatch queue as its argument, but rather it takes something known as a Scheduler : .receive(on: <#Scheduler#>)
— 3:29
This is a protocol in Combine that, according to the documentation, defines when and how to execute a closure. If we jump to the definition we can see what this means: protocol Scheduler { associatedtype SchedulerTimeType: Strideable where SchedulerTimeType.Stride: SchedulerTimeIntervalConvertible associatedtype SchedulerOptions var now: SchedulerTimeType { get } var minimumTolerance: Self.SchedulerTimeType.Stride { get } func schedule( options: SchedulerOptions?, _ action: @escaping () -> Void ) func schedule( after date: SchedulerTimeType, tolerance: SchedulerTimeType.Stride, options: SchedulerOptions?, _ action: @escaping () -> Void ) func schedule( after date: SchedulerTimeType, interval: SchedulerTimeType.Stride, tolerance: SchedulerTimeType.Stride, options: SchedulerOptions?, _ action: @escaping () -> Void ) -> Cancellable }
— 3:41
This protocol has quite a few requirements:
— 3:43
The protocol has two associated types, one represents the type used to measure time, and it must be Strideable so that you can move time forward and schedule work in the future, and another type to represent options that can configure the scheduler.
— 3:45
It also needs two properties, one that represents the current time in the scheduler, and one that represents how much leeway we allow in scheduling work.
— 3:52
And finally it has 3 methods for various ways of scheduling work. You can either:
— 3:57
Schedule work immediately
— 3:59
Or you can schedule some work to be performed after a delay
— 4:04
Or you can schedule some work to be performed on a repeating interval. Note that this one returns a Cancellable because we want to be able to stop the timer at some point.
— 4:14
Conforming to this protocol requires you to fill in all this information, and once that is done you can stick your own type into the receive(on:) method, or any method that takes a scheduler.
— 4:25
However, you will not create new types to conform to this protocol very often, if ever. Instead, you are more likely to use one of the conformances that Apple gives us. Combine ships with 4 conformances to the Scheduler protocol: DispatchQueue and ImmediateScheduler , as we’ve already seen, as well as RunLoop and OperationQueue .
— 4:44
To make use of the scheduling capabilities bestowed upon each of the types we just have to call one of these methods. For example, if we want to schedule some work to be done as soon as possible we can just do: DispatchQueue.main.schedule { print("DispatchQueue", "ASAP") }
— 5:30
This performs the work as soon as possible, but doesn’t perform it immediately. We can see this by trying to assign a variable inside the work: var value: Int? DispatchQueue.main.schedule { print("DispatchQueue", "ASAP") value = 1 } value // nil
— 5:58
So really this is no different than doing: DispatchQueue.main.async { print("DispatchQueue", "ASAP") }
— 6:19
But also we can schedule work after a delay. We have to do a little bit of work to wrap the DispatchTime in a new type, because DispatchQueue defines all new types for its associated types: DispatchQueue.main.schedule(after: .init(.now() + 1)) { print("DispatchQueue", "delayed") }
— 7:02
And this is just equivalent to: DispatchQueue.main.asyncAfter(deadline: .now() + 1) { print("DispatchQueue", "delayed") }
— 7:15
Not really that much shorter, so why bother?
— 7:20
Well things get a little more interesting when scheduling repeating intervals: DispatchQueue.main.schedule(after: .init(.now()), interval: 1) { print("DispatchQueue", "timer") }
— 7:46
Well, this doesn’t print anything, and that’s because it returns a cancellable and we aren’t holding onto it. That causes the cancellable to be deallocated immediately, which causes the interval to be cancelled. To fix we can just store it: var cancellables: Set<AnyCancellable> = [] … DispatchQueue.main.schedule(after: .init(.now()), interval: 1) { print("DispatchQueue", "timer") } .store(in: &cancellables)
— 8:11
And now we get something printing every second. Even better, this is a much nicer API than dealing with DispatchQueue ’s native timer API. To do this equivalent using vanilla Grand Central Dispatch we would need to create a DispatchSource , configure it, and then resume it: let source = DispatchSource.makeTimerSource() source.schedule(deadline: .now(), repeating: 1) source.setEventHandler { print("DispatchQueue", "source", "timer") } source.resume()
— 8:55
That’s a little bit more work than the interface that the Scheduler API gives us, so that’s nice.
— 9:04
But even better, the real power of the Scheduler protocol is that it gives us a single, unified interface for defining how and when work is performed. This interface works just as well for RunLoop s and OperationQueue s. We can even copy and paste the code we have above with just a few small changes to make it work for run loops: import Foundation RunLoop.main.schedule { print("RunLoop", "ASAP") } RunLoop.main.schedule(after: .init(Date() + 1)) { print("RunLoop", "delayed") } RunLoop.main.schedule(after: .init(Date()), interval: 1) { print("RunLoop", "interval") } .store(in: &cancellables)
— 9:57
And the same for operation queues: OperationQueue.main.schedule { print("OperationQueue", "ASAP") } OperationQueue.main.schedule(after: .init(Date() + 1)) { print("OperationQueue", "delayed") } OperationQueue.main.schedule(after: .init(Date()), interval: 1) { print("OperationQueue", "interval") } .store(in: &cancellables)
— 10:21
Interestingly, interval scheduling doesn’t seem to work for operation queues. We’re not sure if this is a bug or if operation queue aren’t intended to support timers.
— 10:35
We can even try to do something similar for the ImmediateScheduler : ImmediateScheduler.shared.schedule { print("OperationQueue", "ASAP") } ImmediateScheduler.shared.schedule(after: .init(Date() + 1)) { print("OperationQueue", "delayed") } ImmediateScheduler.shared.schedule(after: .init(Date()), interval: 1) { print("OperationQueue", "interval") } .store(in: &cancellables)
— 10:56
However, the SchedulerTimeType for ImmediateScheduler is not Date . In fact, it’s a whole new type that doesn’t even have an initializer: ImmediateScheduler.SchedulerTimeType.init
— 11:22
Whoever designed this type really didn’t want us changing the time of the scheduler! And that of course makes sense because an immediate scheduler has no sense of time, everything gets executed at once.
— 11:31
All we can do is take the scheduler’s current time and advance it so that we can schedule something in the “future”: ImmediateScheduler.shared.schedule( after: ImmediateScheduler.shared.now.advanced(by: 1) ) { print("ImmediateScheduler", "delayed") } ImmediateScheduler.shared.schedule( after: ImmediateScheduler.shared.now, interval: 1 ) { print("ImmediateScheduler", "interval") } .store(in: &cancellables)
— 11:56
And all of these execute immediately.
— 12:12
The Scheduler protocol is the way Combine abstracts away the responsibility of when and how to execute a unit of work. Any Combine operator that involves time or threading takes a scheduler as an argument, including delaying, throttling, timeouts, debouncing, and more: Just(1) .subscribe(on: <#Scheduler#>) .receive(on: <#Scheduler#>) .delay( for: <#SchedulerTimeIntervalConvertible & Comparable & SignedNumeric#>, scheduler: <#Scheduler#> ) .timeout( <#SchedulerTimeIntervalConvertible & Comparable & SignedNumeric#>, scheduler: <#Scheduler#> ) .throttle( for: <#SchedulerTimeIntervalConvertible & Comparable & SignedNumeric#>, scheduler: <#Scheduler#>, latest: <#Bool#> ) .debounce( for: <#SchedulerTimeIntervalConvertible & Comparable & SignedNumeric#>, scheduler: <#Scheduler#> )
— 13:08
So now we understand the role of the scheduler protocol in Combine, and we see that Combine ships with a few schedulers, including one that performs work immediately. Testing immediate scheduling
— 13:16
However, none of those schedulers were very good for testing. First, the live schedulers, like DispatchQueue , are difficult to test because you have to literally wait for time to pass and its not an exact science to figure out exactly how much time you need to wait. And also the immediate scheduler wasn’t great because it’s a little too naive by just squashing all of time into a single moment, and plus it didn’t even work with debouncing.
— 13:44
So, the ImmediateScheduler is just the wrong tool for this job. We’re not really sure what job ImmediateScheduler is good for, but with a little bit of work we can come up with a far more versatile tool.
— 13:59
The tool we really want is a scheduler that can be used in tests that does not execute its work until we tell it to. We should have full control over the passage of time in the scheduler. This means that if a unit of work is put on the scheduler to be executed 1 second from now, then that work will not be executed until we tell the scheduler that 1 second has passed. This will give us the maximum amount of flexibility in testing how time flows through our publishers.
— 14:24
So, let’s get started building this mythical scheduler.
— 14:28
We’ll create a new type to conform to the Scheduler protocol. The type will be a class, as are most schedulers, because it needs to manage some internal state to do its job: final class TestScheduler: Scheduler { }
— 14:52
The first requirements that must be satisfied to conform to this protocol are the associated types: typealias SchedulerTimeType = <#type#> typealias SchedulerOptions = <#type#>
— 15:03
Each scheduler we have encountered so far has their own versions of these types defined, such as DispatchQueue , RunLoop and OperationQueue . We want to be able to create a test scheduler that works in place of any kind of scheduler, and so we don’t want to hardcode a particular case here, like say the dispatch queue’s types.
— 15:20
This means we need to introduce generics so that we can have the concept of a test scheduler that works for dispatch queues, run loops, operation queues, and any other kind of scheduler out there that we can’t even imagine: class TestScheduler<SchedulerTimeType, SchedulerOptions>: Scheduler { }
— 15:41
These generics will satisfy the associated type requirement, but only if they conform to all the same protocols that the Scheduler protocol requires: class TestScheduler<SchedulerTimeType, SchedulerOptions>: Scheduler where SchedulerTimeType: Strideable, SchedulerTimeType.Stride: SchedulerTimeIntervalConvertible { }
— 16:07
Now we can use Xcode to fill in the next set of requirements: var minimumTolerance: SchedulerTimeType.Stride var now: SchedulerTimeType
— 16:17
We’re not sure how these are going to be used just yet, but this at least satisfies the requirement. We can even put a default value in for minimumTolerance : var minimumTolerance: SchedulerTimeType.Stride = 0
— 16:27
Since we still have one property with no default value and we are in a class, we need to provide an initializer: init(now: SchedulerTimeType) { self.now = now }
— 16:41
And now Xcode can fill in stubs for the remaining requirements, which is where the majority of our work will be taking place: func schedule( options: SchedulerOptions?, _ action: @escaping () -> Void ) { } func schedule( after date: SchedulerTimeType, tolerance: SchedulerTimeType.Stride, options: SchedulerOptions?, _ action: @escaping () -> Void ) { } func schedule( after date: SchedulerTimeType, interval: SchedulerTimeType.Stride, tolerance: SchedulerTimeType.Stride, options: SchedulerOptions?, _ action: @escaping () -> Void ) -> Cancellable { }
— 17:07
These 3 methods describe the 3 core pieces of functionality for any scheduler:
— 17:12
We need to be able to schedule some work to be run as soon as possible
— 17:16
We need to be able to schedule some work after a delay
— 17:19
And we need to be able to schedule some repeating work on an interval.
— 17:23
If we can implement these for our test scheduler, then Combine will take care of the rest for us. Some of these methods take extra parameters, such as options and tolerance , but those won’t factor into the test scheduler because they are only an artifact of the live scheduler. For example, DispatchQueue s options has things for QoS (which means “quality of service”), dispatch groups and more. None of that is useful for testing because the test scheduler is precise and executes work exactly when we say so.
— 17:47
Let’s start with the simplest method, which is the schedule method that is supposed to perform work immediately: func schedule( options: SchedulerOptions?, _ action: @escaping () -> Void ) { }
— 17:57
To be super explicit that we don’t plan on using options we can even add an _ : func schedule( options _: SchedulerOptions?, _ action: @escaping () -> Void ) { }
— 18:08
Now in a typical scheduler implementation we would just perform this work immediately: func schedule( options _: SchedulerOptions?, _ action: @escaping () -> Void ) { action() }
— 18:13
However, we don’t want that for the test scheduler. We only want to execute this work if we tell the scheduler that a moment of time has passed.
— 18:19
So, sounds like our TestScheduler class is missing two pieces of information so that it can do its job:
— 18:25
We need some internal state that keeps track of units of work that have been scheduled rather than actually performing that work right away.
— 18:32
And we need some kind of mechanism to tell the scheduler to advance its current time, which is the now property on the scheduler.
— 18:40
The first is straightforward to get in place, we just need an array that holds onto the unit of work that has been scheduled: private var scheduled: [() -> Void] = []
— 19:00
And now when we schedule some work for immediate execution we will just append that unit of work to this queue: func schedule( options: SchedulerOptions?, _ action: @escaping () -> Void ) { self.scheduled.append(action) }
— 19:14
Then to actually execute this work we need a method to tell the test scheduler that time has advanced, and that it should execute any work waiting in the queue. We will call this method advance : func advance() { }
— 19:27
Since right now we are only dealing with work that wants to be executed immediately, and not dealing with delays or intervals, we can just loop over the queue of work and execute all of the actions: func advance() { for action in self.scheduled { action() } self.scheduled.removeAll() }
— 19:50
And we now have a functioning test scheduler if all we care about is scheduling work without delays or intervals.
— 19:55
To see this, let’s write some tests. We can start by just writing a test for this functionality directly, without using any publishers. In particular, we can create a test scheduler, schedule some work on it, and the verify that the work is not run until advance is called: func testImmediateScheduledAction() { let testScheduler = TestScheduler< DispatchQueue.SchedulerTimeType, DispatchQueue.SchedulerOptions >( now: .init(.init(uptimeNanoseconds: 0)) ) var isExecuted = false testScheduler.schedule(options: nil) { isExecuted = true } XCTAssertEqual(isExecuted, false) testScheduler.advance() XCTAssertEqual(isExecuted, true) }
— 22:22
This passes, and so now we should have a bit of confidence that we are truly controlling the scheduler, at least for this particular situation.
— 22:37
Before moving on, let’s clean up some ergonomics, because the way we are creating a test scheduler looks a little intense. Rather than specify those generics fully, why don’t we put a static property on DispatchQueue that will simply provision a test scheduler for us: extension DispatchQueue { static var testScheduler: TestScheduler< SchedulerTimeType, SchedulerOptions > { TestScheduler(now: .init(.init(uptimeNanoseconds: 0))) } }
— 23:34
This simplifies things in our test: // let testScheduler = TestScheduler< // DispatchQueue.SchedulerTimeType, DispatchQueue.SchedulerOptions // >( // now: .init(.init(uptimeNanoseconds: 0)) //) let testScheduler = DispatchQueue.testScheduler
— 23:48
However, it turns out that instantiating DispatchTime with 0 for uptimeNanoseconds has special behavior. If we hop over to the header for DispatchTime we will see this note in the documentation, which strangely doesn’t show up in any other place for the documentation of this time: Note Note that DispatchTime(uptimeNanoseconds: 0) is equivalent to DispatchTime.now() , that is, its value represents the number of nanoseconds since boot (excluding system sleep time), not zero nanoseconds since boot.
— 24:14
So by using 0 we are secretly taking the .now() uptime in nanoseconds, which is going to change every time we call this. That will wreak havoc on tests.
— 24:26
So this is not what we want, instead we need to start at a different time, like say 1: extension DispatchQueue { static var testScheduler: TestScheduler< SchedulerTimeType, SchedulerOptions > { TestScheduler(now: .init(.init(uptimeNanoseconds: 1))) } }
— 24:34
With that bit of ergonomics done, we can write even deeper tests of our scheduler, like a test that shows what happens if we schedule multiple things at once: let scheduler = DispatchQueue.testScheduler … func testMultipleImmediateScheduledAction() { var executionCount = 0 testScheduler.schedule(options: nil) { executionCount += 1 } testScheduler.schedule(options: nil) { executionCount += 1 } XCTAssertEqual(executionCount, 0) scheduler.advance() XCTAssertEqual(executionCount, 2) }
— 25:54
Let’s write another test, but this time exercising what happens when a test scheduler is used with a publisher. To do this we will construct a publisher that emits immediately, but then make it receive its output on a test scheduler. Then it should be the case that this publisher does not emit until we advance the scheduler: func testImmediateScheduledActionWithPublisher() { var output: [Int] = [] Just(1) .receive(on: scheduler) .sink { output.append($0) } .store(in: &self.cancellables) XCTAssertEqual(output, []) scheduler.advance() XCTAssertEqual(output, [1]) }
— 27:20
We can also beef up this test by using a publisher that merges two scheduled publishers: func testImmediateScheduledActionWithMultiplePublishers() { var output: [Int] = [] Just(1) .receive(on: testScheduler), .merge(with: Just(2).receive(on: testScheduler) } .sink { output.append($0) } .store(in: &self.cancellables) XCTAssertEqual(output, []) scheduler.advance() XCTAssertEqual(output, [1, 2]) } Testing delayed scheduling
— 27:47
Nice, so we are now seeing how the test scheduler is going to allow us to control publishers too.
— 28:05
Let’s tackle the next requirement of the scheduler protocol: scheduling work that is performed after a delay. We could start by repeating what we did for immediately scheduled work: func schedule( after date: SchedulerTimeType, tolerance _: SchedulerTimeType.Stride, options _: SchedulerOptions?, _ action: @escaping () -> Void ) { self.scheduled.append(action) }
— 28:52
This of course isn’t right because we are completely ignoring the date parameter, which tells us when we want to execute this work. If we kept it this way then even if we scheduled our work to be done in the distant future it would still be executed as soon as advance() is invoked.
— 28:58
Turns out we need to beef up our scheduled queue a bit more. It isn’t enough to just hold the work that we want to execute, but we also need to hold the date that the work should be executed: private var scheduled: [ (action: () -> Void, date: SchedulerTimeType) ] = []
— 29:22
This breaks a few things. First, the advance method now needs to destructure the date in the for loop: func advance() { for (action, date) in self.scheduled { action() } self.scheduled.removeAll() }
— 29:32
And we need to fix our previously implemented schedule method because when we enqueue this work onto the scheduled array we need to tell it at what date to execute. We can use the scheduler’s now property to say that we want to execute immediately: func schedule( options: SchedulerOptions?, _ action: @escaping () -> Void ) { self.scheduled.append((action, self.now)) }
— 29:48
And we now have the proper way to schedule some work at a later date: func schedule( after date: SchedulerTimeType, tolerance: SchedulerTimeType.Stride, options: SchedulerOptions?, _ action: @escaping () -> Void ) { self.scheduled.append((action, date)) }
— 29:57
Easy enough, but of course this won’t work yet because we aren’t actually using the date in our for loop: func advance() { for (action, date) in self.scheduled { action() } self.scheduled.removeAll() }
— 30:04
First of all, it’s no longer correct to have an advance method that takes no arguments, because that assumes advance will immediately execute all the items in the scheduled array. Instead, we want to be able to tell advance how much time into the future we want to advance the scheduler’s time by, and then we need to do extra work in here to execute only the actions whose date is in the past.
— 30:29
This is a little tricky to get right, so let’s take it slow. First, the advance method should take an argument for how much time we want to advance by, and that can be represented by the ScheduleTimeType ’s Stride type, since it is Strideable : func advance(by stride: SchedulerTimeType.Stride) { … }
— 30:50
Next we need to advance the scheduler’s time so that we can see what scheduled units of work are ready to be executed. To do that we can mutate the now property directly: self.now = self.now.advanced(by: stride)
— 31:04
Even better, we can use a default value of .zero here so that when you don’t pass an argument the test schedule will act as before, simply executing any work on the queue whose date is right now: func advance(by stride: SchedulerTimeType.Stride = .zero) { … }
— 31:13
And then inside the loop we can execute the actions only if the date is in the past: for (action, date) in self.scheduled { if date <= self.now { action() } self.scheduled.removeAll() }
— 31:28
This isn’t quite enough yet, because we are removing all scheduled items at the end of advance . Instead we should only be removing the items that were actually executed: for (action, date) in self.scheduled { if date <= self.now { action() } self.scheduled.removeAll(where: { $0.date <= self.now }) }
— 31:48
Time to write some tests for scheduling work after a delay. func testScheduledAfterDelay() { let testScheduler = DispatchQueue.testScheduler var isExecuted = false testScheduler.schedule(after: testScheduler.now.advanced(by: 1)) { isExecuted = true } }
— 32:22
And then we want to test that initially the flag is false , but as soon as we advance by a second it flips to true: XCTAssertEqual(isExecuted, false) testScheduler.advance(by: 1) XCTAssertEqual(isExecuted, true)
— 32:42
This passes! And just to show that it is really doing something, let’s change it to only wait half a second: XCTAssertEqual(isExecuted, false) testScheduler.advance(by: .millseconds(500)) XCTAssertEqual(isExecuted, false)
— 33:00
And then if we wait another half second: XCTAssertEqual(isExecuted, false) testScheduler.advance(by: .millseconds(500)) testScheduler.advance(by: .millseconds(500)) XCTAssertEqual(isExecuted, true)
— 33:07
This passes! We can also push the test to the limit by being very precise: XCTAssertEqual(isExecuted, false) testScheduler.advance() XCTAssertEqual(isExecuted, false) testScheduler.advance(by: .millseconds(500)) XCTAssertEqual(isExecuted, false) testScheduler.advance(by: .millseconds(499)) XCTAssertEqual(isExecuted, false) testScheduler.advance(by: .millseconds(1)) XCTAssertEqual(isExecuted, true)
— 33:26
This passes! But our entire test suite did not . We got a failure here: _ = XCTWaiter.wait(for: [XCTestExpectation()], timeout: 0.301) XCTAssertEqual( passwordValidationMessage, ["", "Password is too short"] )
— 33:35
This is a nice reminder of how imprecise testing time can be when we test things like dispatch queues directly. In addition to slowing tests down, we must always pad the time and we always run the risk of introducing a flaky test. So let’s pad this with a little more time and hope it fails less often. _ = XCTWaiter.wait(for: [XCTestExpectation()], timeout: 0.31)
— 34:08
Compare this lack of precision to just how precise the test we just wrote is: XCTAssertEqual(isExecuted, false) testScheduler.advance() XCTAssertEqual(isExecuted, false) testScheduler.advance(by: .millseconds(500)) XCTAssertEqual(isExecuted, false) testScheduler.advance(by: .millseconds(499)) XCTAssertEqual(isExecuted, false) testScheduler.advance(by: .millseconds(1)) XCTAssertEqual(isExecuted, true)
— 34:12
Here we are asserting about timing down to the exact millisecond something should change.
— 34:25
We can take things further and assert at the microsecond level: XCTAssertEqual(isExecuted, false) testScheduler.advance() XCTAssertEqual(isExecuted, false) testScheduler.advance(by: .millseconds(500)) XCTAssertEqual(isExecuted, false) testScheduler.advance(by: .millseconds(499)) XCTAssertEqual(isExecuted, false) testScheduler.advance(by: .microseconds(999)) XCTAssertEqual(isExecuted, false) testScheduler.advance(by: .microseconds(1)) XCTAssertEqual(isExecuted, true)
— 34:40
And this passes, too.
— 34:45
We can even has some fun by testing a delay that waits for a million seconds: func testScheduledAfterLongDelay() { let testScheduler = DispatchQueue.testScheduler var isExecuted = false testScheduler.schedule( after: testScheduler.now.advanced(by: 1_000_000) ) { isExecuted = true } XCTAssertEqual(isExecuted, false) testScheduler.advance(by: .seconds(100_000_000)) XCTAssertEqual(isExecuted, true) }
— 35:09
Notice that not only does this test pass immediately, but it’s also precise. We don’t have to fudge the numbers in order to wait for dispatch queues to catch up. We are controlling time with exact numbers, and as long as the math checks out the work will execute as we expect. Testing timers
— 35:31
We have one last requirement of Scheduler to fulfill, and it’s by far the most complicated one: func schedule( after date: SchedulerTimeType, interval: SchedulerTimeType.Stride, tolerance _: SchedulerTimeType.Stride, options _: SchedulerOptions?, _ action: @escaping () -> Void ) -> Cancellable { return AnyCancellable {} }
— 35:36
This method is responsible for performing a unit of work on a repeating interval until it is canceled.
— 35:51
One way to approach this is to simply say that we are going to generate a whole bunch of units of work, starting at date and spaced apart by interval distance, and schedule them right here. Like say a million: (1...1_000_000).forEach { index in } return AnyCancellable {}
— 36:13
Inside this forEach we need to compute a future date based off the index. We can start at the date passed in, and then advance it by the interval times the index. This is possible to do, but it’s a little tricky since we aren’t dealing with simple integers or doubles and instead these abstract protocols: (1...1_000_000).forEach { index in let nextDate = date.advanced( by: interval * (SchedulerTimeType.Stride(exactly: index) ?? .zero) ) self.schedule(after: nextDate, action) } return AnyCancellable {}
— 37:22
But this clearly isn’t right. It’s strange that we are creating and scheduling so many units of work. That isn’t very efficient to do, and what if there is some day that we want to test a timer that emits more than a million times? Should we up it to a billion? Or more? We are again falling into the trap of imprecise descriptions of the problem, which is what we want to get away from in the test scheduler.
— 37:44
There’s another approach that is a little less straightforward, but ultimately more correct. We will alter the unit of work being scheduled so that when it executes it automatically re-schedules a new unit of work in the future whose distance from now is the interval.
— 38:00
We can try to accomplish this by wrapping the action closure passed to us in an all new closure that layers on the new functionality of recursively scheduling another unit of work: self.scheduled.append( ( { [weak self] in action() self?.schedule( after: date.advanced(by: interval), interval: interval, action ) }, date.advanced(by: interval) ) )
— 38:57
This is intense, and it’s kind of weird. The most glaring indicator that this is not the correct thing to do is that we have an unused cancellable warning. It’s not clear what we should do with that cancellable since we are inside this closure, and it can’t possibly include the AnyCancellable we return below.
— 39:21
There’s another way to write this code the aims to accomplish the same thing, but gets around the weirdness we are seeing. We can create a little local function that encapsulates the recursive logic, and then we can call it from within this function.
— 39:40
This function will return a new closure that immediately executes the action and then schedules another action to be performed later: func scheduleAction(for date: SchedulerTimeType) -> () -> Void { return { [weak self] in action() let nextDate = date.advanced(by: interval) self?.scheduled.append((scheduleAction(for: nextDate), nextDate)) } }
— 40:33
And then we can kick things off by scheduling this wrapped action after interval time passes after the date passed in: self.scheduled.append((scheduleAction(for: date), date))
— 41:24
We’re still not yet sure what we should do with the cancellable we are returning, but let’s leave that for now. We can already write some basic tests for this functionality to see that some things are already working. We can schedule some work that starts now and repeats every 1 second: func testScheduledInterval() { var executionCount = 0 scheduler.schedule(after: scheduler.now, interval: 1) { executionCount += 1 } .store(in: &self.cancellables) }
— 41:58
We can assert that no work is executed initially, but if we advance we will get our first execution: XCTAssertEqual(executionCount, 0) scheduler.advance() XCTAssertEqual(executionCount, 1)
— 42:37
Further, if we advance half a second we don’t execute anything: scheduler.advance(by: .milliseconds(500)) XCTAssertEqual(executionCount, 1)
— 42:53
But if we advance another half second we do execute: scheduler.advance(by: .milliseconds(500)) XCTAssertEqual(executionCount, 2)
— 43:01
If we advance another second we get another execution: scheduler.advance(by: .seconds(1)) XCTAssertEqual(executionCount, 3)
— 43:09
And we would also hope that if we advance by a lot of time then the work would be executed multiple times: testScheduler.advance(by: .seconds(5)) XCTAssertEqual(executionCount, 8)
— 43:28
But this does not currently pass. Timer cancellation
— 43:33
The problem is that although new work is getting scheduled when the current work is executed, that new work will not be picked up in the lifetime of us calling advance a single time. We need to beef up the advance method to look for more work to execute that may have been enqueued in the middle of the loop since an executed item may schedule more items.
— 43:59
But before we do that we can fix another problem with our interval method that is easier to fix. Right now we are returning an empty AnyCancellable , and that’s probably not the correct thing to do. This closure will be executed when the timer should be stopped. An example of this would be if you create a timer publisher and call prefix on it to truncate its output. After the timer completes, its cancelled should be called to do whatever cleanup work it needs to do.
— 44:32
We can actually write a test to show why this is problematic. We’ll schedule some repeating work, and then after some time passes we will cancel it with the cancellable that was returned: func testScheduledIntervalCancellation() { var executionCount = 0 scheduler.schedule(after: scheduler.now, interval: 1) { executionCount += 1 } .store(in: &self.cancellables) XCTAssertEqual(executionCount, 0) scheduler.advance() XCTAssertEqual(executionCount, 1) scheduler.advance(by: .milliseconds(500)) XCTAssertEqual(executionCount, 1) scheduler.advance(by: .milliseconds(500)) XCTAssertEqual(executionCount, 2) self.cancellables.removeAll() scheduler.advance(by: .seconds(1)) XCTAssertEqual(executionCount, 2) }
— 45:19
This last assertion fails because the cancel method right now does nothing since we are returning an empty cancellable. Let’s fix that.
— 45:35
What we need to do is find the unit of work in the scheduled array and remove it. However, we don’t have any way of finding the work. The array holds a closure, which isn’t equatable, and a date, which is ever changing.
— 45:52
So we will further enhance the scheduled array so that it holds an identifier that we can use to find the unit of work. We can use an integer for the identifier: private var scheduled: [(id: Int, action: () -> Void, date: SchedulerTimeType)] = []
— 46:10
And then track some private state to make the id automatically increments every time we provision a new one: private var lastId = 0 private func nextId() -> Int { self.lastId += 1 return self.lastId }
— 46:41
And then to get things compiling we need to stick an id in all the places we append to this array: self.scheduled.append((self.nextId(), action, self.now)) self.scheduled.append((self.nextId(), action, date))
— 47:04
Then in our interval method we can compute an id that can be shared in the recurring work: let id = self.nextId() func scheduleAction(for date: SchedulerTimeType) -> () -> Void { return { [weak self] in action() let nextDate = date.advanced(by: interval) self?.scheduled.append( (id, scheduleAction(for: nextDate), nextDate) ) } } self.scheduled.append((id, scheduleAction(for: date), date))
— 47:24
And now when the cancellable is invoked we can remove any units of work that have that id : return AnyCancellable { self.scheduled.removeAll(where: { $0.id == id }) }
— 47:43
And just like that our cancellation test is passing! Timer complexity
— 47:49
Now let’s focus on fixing the last test. We need to properly handle the recurring work when we advance by a large amount of time. Right now the problem is that we iterate over the array of scheduled items a single time, but while executing that work some new items may be added, and we will completely miss those.
— 48:07
To catch those new items we need a more robust way of iterating over the array. We can do this by manually tracking the index, and not exiting the loop until the index goes past the end of the array. This will allow new items to be added to the end of the array and we will pick them up eventually. func advance(by stride: SchedulerTimeType.Stride = .zero) { self.now = self.now.advanced(by: stride) var index = 0 while index < self.scheduled.count { let work = self.scheduled[index] if work.date <= self.now { work.action() self.scheduled.remove(at: index) } else { index += 1 } } }
— 49:24
And with that tests are passing!
— 49:29
But sadly this still isn’t quite right. One thing that is not being accounted for here at all is the ordering of simultaneous events. We can show this by writing a test that should pass, but currently fails.
— 49:48
It will start two timers, one at a 1 second interval and the other at a 2 second interval. When the timers are initially scheduled, the first timer will be at the beginning of the scheduled array and the second timer will be at the end. However, when a second passes causing the first timer to emit and reschedule its next unit of work it will be placed behind the second time. This means the next second that passes will cause the timers to emit in the wrong order: func testScheduledTwoIntervals_Fail() { let testScheduler = DispatchQueue.testScheduler var values: [String] = [] let c1 = testScheduler.schedule( after: testScheduler.now.advanced(by: 1), interval: 1 ) { values.append("Hello") } let c2 = testScheduler.schedule( after: testScheduler.now.advanced(by: 2), interval: 2 ) { values.append("World") } XCTAssertEqual(values, []) testScheduler.advance(by: 2) XCTAssertEqual(values, ["Hello", "Hello", "World"]) } Correction The episode’s code contains a mistake here: after wasn’t advanced. It was corrected behind the scenes using some movie magic. XCTAssertEqual failed: (”[“Hello”, “World”, “Hello”]”) is not equal to (”[“Hello”, “Hello”, “World”]”)
— 51:01
It’s easy enough to fix this. We just need to sort the scheduled array of work items after we run an action, first by their execute date, and then if they have the same execute date we will sort by the ids so that work added to the queue first will be executed first.
— 51:36
But even if we do that I wouldn’t have a ton of confidence that we have fully captured the correct test scheduler logic and all of its edge cases. Each time we came across a problem we just applied a bandaid to make it work again.
— 51:48
The final nail in the coffin for this implementation is the fact that if we access the test scheduler’s current time during a test it does not accurately reflect the time at that moment. For example, consider this test: func testScheduleNow() { let testScheduler = DispatchQueue.testScheduler var times: [UInt64] = [] let c = testScheduler.schedule( after: testScheduler.now, interval: 1 ) { times.append(testScheduler.now.dispatchTime.uptimeNanoseconds) } XCTAssertEqual(times, []) testScheduler.advance(by: 3) XCTAssertEqual( times, [1, 1_000_000_001, 2_000_000_001, 3_000_000_001] ) } XCTAssertEqual failed: (”[3000000001, 3000000001, 3000000001, 3000000001]”) is not equal to (”[1, 1000000001, 2000000001, 3000000001]”)
— 53:07
Each time we accessed testScheduler.now it was the final time of the scheduler rather than the time at each moment in the interval. And it’s clear why this is happening: in the advance method we immediately advance the scheduler’s now to be the final moment in the future: self.now = self.now.advanced(by: stride)
— 53:35
We aren’t going to get any of the moments along the way because we are just jumping straight to the final time. Timer scheduling
— 53:40
So that does it. This approach to implementing advance just isn’t going to work. We got here by applying bandaid after bandaid to our previous solution, but interval schedulers are just complicated enough that we need a completely different approach.
— 53:52
Rather than continuously looping over the array of scheduled items to perform them, we are going to make advance responsible for executing all the items for just one moment in time, and then recursively call itself to step forward in time as needed. This will allow us to simultaneously execute repeating work while also taking it one step at a time so that the scheduler’s now value will probably reflect the current time.
— 54:26
So, let’s get to it. Since we only want advance to be responsible for executing the work for a single moment in time, we can start off by finding what is the next moment where this is work to perform. We can do this by sorting the array of work items to make sure they are in order of increasing date: func advance(by stride: SchedulerTimeType.Stride = .zero) { self.scheduled.sort { lhs, rhs in lhs.date < rhs.date } … }
— 54:57
And then we can pluck off the first unit of work to find out if it is within the range of time we are striding by. If it is, then that is the date for which we want to execute all work items on that date. If it is not, then there is no work to be done and we can early out: guard let nextDate = self.scheduled.first?.date, self.now.advanced(by: stride) >= nextDate else { self.now = self.now.advanced(by: stride) return }
— 55:52
Now that we have the single date for which we want to execute all work at this date, we can advance now to be this date: self.now = nextDate
— 56:10
This will make it so that any work we are about to execute will have access to the correct time in the scheduler.
— 56:28
Next we will loop over the array of scheduled items to find all the ones that want to be executed at this exact date. We will execute those items and remove them from the array. Since the array has been sorted by date we can assume that all the items we are interested in are contiguously at the beginning of the array: while let (_, action, date) = self.scheduled.first, date == nextDate { action() self.scheduled.removeFirst() }
— 56:58
Now, our job isn’t done yet, because we’ve only executed the work up to nextDate , but there may be more time in the stride left to execute. We need to recursively call advance with however much time was left over in the difference between the stride and the nextDate we computed. We can compute by before we advance now : let nextStride = stride - self.now.distance(to: nextDate) self.now = nextDate
— 57:32
And that is exactly how much time we need to advance by to finish up all the work: self.advance(by: nextStride)
— 58:04
And if we run tests we see we get one failure… oops! XCTAssertEqual failed: (”[“Hello”, “World”, “Hello”]”) is not equal to (”[“Hello”, “Hello”, “World”]”)
— 58:10
The failure is in that last test we wrote that showed that simultaneous interval work does not execute in the right order. This is because we need to take into account the order that units of work were added to the scheduler when sorting. Right now we just sort by their date, but if two items have the same date we want it so that the item that was first added to the scheduler is executed first.
— 59:03
This is straightforward to fix, we can just further sort by their ids when the dates are equal. This can be done via a nifty trick using tuple comparison: self.scheduled.sort { lhs, rhs in (lhs.date, lhs.id) < (rhs.date, rhs.id) }
— 59:49
And now all tests pass! Including the one that captured the scheduler’s current time at each interval.
— 1:00:03
Let’s have a little fun with it. Let’s check that if we create an interval for every second and run it for 1,000 seconds, then we get 1,000 emissions instantly: func testFun() { let testScheduler = DispatchQueue.testScheduler var values: [Int] = [] let c = testScheduler.schedule( after: testScheduler.now, interval: 1 ) { values.append(values.count) } XCTAssertEqual(values, []) testScheduler.advance(by: 1_000) XCTAssertEqual(values, Array(0...1_000)) }
— 1:00:57
This is super powerful. These tests finish instantly, but are testing behavior that takes place across long stretches of time.
— 1:01:05
There is just one last problem to fix in our test scheduler, and it’s a subtle one. And in fact we caught this one only just a few weeks ago. The problem creeps up when mixing in work that is scheduled on an interval with synchronous work. It turns out that interval work can tear itself down after executing its action, for example imagine you have a timer such that you only take the first 3 emissions. After that 3rd emission is executed in the scheduler, its cancellable will be immediately invoked, which will remove all the scheduled items.
— 1:01:43
There are 2 points in our code where this subtlety can cause problems. First, when we recursively schedule interval work we are performing the action and then scheduling the next unit of interval work: func scheduleAction(for date: SchedulerTimeType) -> () -> Void { return { [weak self] in action() let nextDate = date.advanced(by: interval) self?.scheduled.append( (id, nextDate, scheduleAction(for: nextDate)) ) } }
— 1:01:58
The problem here is that executing the action could cause the timer to be torn down, but then a moment later a new unit of work will be scheduled, which would lead the timer to continue executing.
— 1:02:20
The other potential problem lies in these lines: while let (_, date, action) = self.scheduled.first, date == nextDate { action() self.scheduled.removeFirst() } Here we are assuming that after we execute the action that the first scheduled item will still be the same one we plucked of the queue in the while loop. However, as we have described above, this is not necessary true because the act of execution the action can cause the cancellable to be executed which means that work will have already been removed. This means we may accidentally be removing some unrelated work.
— 1:02:48
And in fact it’s easy to cook up a simple test failure that shows both of these problems. We simply need to debounce a publisher and then receive(on:) it. Under the hood debounce is secret using interval schedulers so that it can schedule work in the future that is cancellable, and by mixing in another piece of scheduled work we force the lines above to remove the incorrect unit of work. func testFail() { let scheduler = DispatchQueue.testScheduler let subject = PassthroughSubject<Void, Never>() var count = 0 subject .debounce(for: 1, scheduler: scheduler) .receive(on: scheduler) .sink { count += 1 } .store(in: &self.cancellables) subject.send() scheduler.advance(by: 100) XCTAssertEqual(count, 1) } XCTAssertEqual failed: (“99”) is not equal to (“1”)
— 1:04:02
Somehow by advancing 100 seconds we have gotten 99 emissions. That is very strange because a debounced publisher should only emit as many times as the base publisher emits, or less, which was just once. So this is showing that things got a little mixed up. What happened is that a new unit of interval work was scheduled even after the debounce was torn down, and by removing the first unit of scheduled work we actually removed the receive(on:) synchronous work, therefore letting the interval work remain, hence why it seems like we have a timer going now.
— 1:04:50
This is very subtle, and shows just how complex scheduling can be. But luckily the fix is easy. First, we want to execute interval work after its future work was scheduled. This will give the cancellable time to truly tear down any future work: func scheduleAction(for date: SchedulerTimeType) -> () -> Void { return { [weak self] in let nextDate = date.advanced(by: interval) self?.scheduled.append( (id, nextDate, scheduleAction(for: nextDate)) ) action() } }
— 1:05:08
Next we want to update the removeFirst logic to truly remove the exact unit of work we execute by eagerly removing the unit of work before performing the action: self.scheduled.removeFirst() action()
— 1:05:26
And now tests pass! Next time: erasing time
— 1:05:31
This was a lot of work to create this test scheduler, and honestly there may even be a few more subtle bugs lurking in the shadows. Scheduling is an incredibly complex topic, and we have really jumped into the deep end on this episode.
— 1:05:47
One way to take these ideas further would be to employ some formal methods for verifying scheduling behavior and to check that our test scheduler works correctly. We’ll leave that for another time though.
— 1:06:04
Now that we’ve got a test scheduler defined and implemented, let’s try to use it to make our view model more testable. Right now we are using DispatchQueue.main in a few spots in our view model, and as we have seen this is quite a complex dependency to have in our view model, and complicates testing. References combine-schedulers Brandon Williams & Stephen Celis • Jun 14, 2020 An open source library that provides schedulers for making Combine more testable and more versatile. http://github.com/pointfreeco/combine-schedulers Downloads Sample code 0105-combine-schedulers-pt2 Point-Free A hub for advanced Swift programming. Brought to you by Brandon Williams and Stephen Celis . Content Become a member The Point-Free Way Beta previews Gifts Videos Collections Free clips Blog More About Us Community Slack Mastodon Twitter BlueSky GitHub Contact Us Privacy Policy © 2026 Point-Free, Inc. All rights are reserved for the videos and transcripts on this site. All other content is licensed under CC BY-NC-SA 4.0 , and the underlying source code to run this site is licensed under the MIT License .