FRP in Lean: Stateful combinators, safety, and liveness
Non-pointwise combinators have knowledge of previous timesteps
Up to now, every Signal we’ve seen has been stateless. FRP.map and
FRP.map2 from last time apply pure functions to the value at each timestep,
with no memory of what came before. Real reactive systems have evolving state:
the value at time t+1 depends on the value at time t.
In this post, we’ll build up to FRP.accumulate, which is a general-purpose
non-pointwise FRP combinator. We’ll see that the point of these combinators is
to combine a sequence of discrete Events at various points in time to
maintain a running Signal that captures those events changing the Signal
value.
A warmup exercise: a quick equivalence lemma
Last time, we saw how FRP primitives interact with temporal logic types. In case it’s been awhile since you’ve read the previous piece, let’s warm up with a little lemma that will fault in a bunch of the material from before, and that’ll become useful shortly.
Suppose I have some Signal β - that is, a time-varying value of type β.
And, suppose I have some statement inv about β values, such that at every
time step in the signal, inv holds. Then, inv is an invariant over that
Signal. A bit more formally:
Quick reminder from previous posts: StateProp β is a β -> Prop; in other
words, given a value of some type, produce a propositiona about that value. We
used this back in the intro to LTL - it’s the type that
an LTL.atom consumes.
theorem some_lemma {inv : StateProp β} (sig : □ β) (h : ∀ t, inv (sig t))
: /- TODO: what can we show? -/ := by /- TODO: how do we show it? -/
Can we say anything in temporal logic about how the invariant and the signal
interact? Hopefully you might say something informal like “it will always be
the case that inv holds for sig”. Less formally, we’d say (□ (LTL.atom inv)) sig. Let’s prove it!
theorem always_atom {inv : StateProp β} (sig : □ β) :
(∀ t, inv (sig t)) → (□ (LTL.atom inv)) sig := by
intro h t ; -- TODO
β : Sort u_1
inv : StateProp β
sig : □ β
h : ∀ (t : Time), inv (sig t)
t : Nat
⊢ LTL.atom inv (drop t sig)
This lets us say, starting from the goal and looking backwards to the
hypotheses, “I want to show that □ (atom p) holds; I’ll prove ∀ t, p (trace t) instead, which is equivalent”, or, starting from a hypothesis and looking
ahead to the goal, “I have ∀ t, p (trace t); I’ll repackage it as □ (atom p) to use other LTL operators.”
This isn’t too hard to prove, because □ is just defined under the hood just
like h. With some unfolding of LTL definitions, we end up precisely with
the hypothesis h.
Technically, for this to be an equivalence, you need to show implication in
both directions. Give that a try and prove the stronger statement
always_atom_iff, which we’ll make use of in the next post. To write it,
you’ll change → to ↔ and use the constructor tactic to split the goal
into the two implications.
theorem always_atom {inv : StateProp β} (sig : □ β) :
(∀ t, inv (sig t)) → (□ (LTL.atom inv)) sig := by
intro h t ; simp [LTL.atom, drop, now] ; exact h t
Goals accomplished!
this theorem is nice because it collates an infinite number of non-temporal
inv (sig t) proofs into a single temporal proof. If we can build Signals
whose values satisfy an invariant at every timestep, then we can use
always_atom to automatically lift that invariant into a safety property.
We’ll get the temporal logic guarantee without explicitly needing to do any
temporal reasoning.
To build signals that maintain invariants through time, we need a different kind of combinator. And to derive it from first principles, we’ll take a short detour through a small piece of category-theoretic vocabulary: catamorphisms.
Catamorphisms and recursors
You already know how to fold over a plain old List in a plain old functional
programming language:
List.foldr
takes an initial value β and a “merge” function a -> β -> β, and collapses
the list down into a single result. For a long time, I thought folds were
somehow intrinsic to Lists because I’d never seen folds in any other context,
but you can write a fold-like operation over any algebraic datatype.
That operation is called a
catamorphism, and relates to
recursors in type theory (which we’ll see an example of in a moment). A
catamorphism for some type replaces each constructor with a corresponding
computation (an “algebra component”) that produces the result type β.
That function consumes:
- the constructor’s non-recursive arguments, and
- the constructor’s recursive arguments after they’ve already been folded.
Let’s invent the catamorphism for Lists from first principles. Why does
List.foldr have the arguments it does? It stems from the datatype’s
constructors. There are two ways to construct a List, as we all know: Nil : List a produces the empty list, and Cons : a -> List a -> List a prepends an
element onto a list. This means that which replaces Nil : List a will
be typed β, and that which replaces Cons : a -> List a -> List a will be
typed a -> β -> β.
In other words, List.foldr is the catamorphic operation for List! It shows
us how to collapse one constructor layer into a β.
Recursors generalise catamorphisms
If we want more generality than just “given the recursive results of the
subdata in each constructor, produce a value”, we’ll need a different kind of
algebraic transformation on our datatype. For example, it isn’t clear how we
could write a Nat.predecessor or List.drop_last function with
catamorphisms. We’d need to choose a richer datatype to fold over, like a
(prev, curr) pair that “remembers” the previous value. A generalisation that
does let us do this without changing the type we’re folding over is a
recursor.
We could also write a catamorphism for Nats: Nat has two constructors,
Zero : Nat and Succ : Nat -> Nat. Because the Zero constructor takes no
arguments, we provide a constant β value for that case, and for Succ, a β -> β.
We said before that recursors occupy a similar purpose to catamorphisms but
have been vague about what that actually means. Let’s look at a simplified
version of Lean’s recursor for Nat:
In reality, the recursor, is a bit more
complicated
owing to β being a dependent type called the “motive”, but the idea is
exactly the same - it lets us “tear down” a value to call a function on its
enclosing elements, while still giving us access to the original, non-folded
over values.
def Nat.rec (onZero : β) (onSucc : Nat → β → β) : Nat → β
| .zero => onZero
| .succ n => onSucc n (Nat.rec onZero onSucc n)
This isn’t the catamorphism for Nats: succ also consumes the predecessor
argument (that is, the recursive argument before being folded). This is more
general than a catamorphism; it’s called a
paramorphism,
and it’s built up from a different kind of algebraic structure than
catamorphisms.
(Before continuing, pause and ponder: suppose Nat.rec also consumed some
invariant β -> Prop. How might we apply such a function inside the
recursor?)
Our first non-pointwise combinator: Signaling on the catamorphism for Time
Ok, so a catamorphism for Nat (with constructors Zero and Succ Nat) would
take a β and a β -> β. Since Time is definitionally a Nat, a
catamorphism on Time would take a β and a β -> β, similarly. But what do
those two arguments actually mean?
What it means is stepping through time from an initial state. And, that’s what
scan, our first non-pointwise combinator, does.
Should this have been a paramorphism instead? After all, we call Nat’s
recursor internally here. Maybe I’ll come to regret this design choice!
def scan: (step : β → β) (init : β) : Signal β =
fun n => Nat.rec init (fun _t s => step s) n
scan produces a function that takes a time value and steps the β -> β
function that many times from an initial state. It does it with the recursor
for the natural numbers, which produces init when t=0 and applies the given
function Nat -> β -> β when t=(n+1).
Evaluating scan step init at time n recomputes from init every time, O(n)
per evaluation, and so O(n²) to evaluate the whole signal. This isn’t
dissimilar from the problem we had last time with Event.latch, and the
solution’s the same: A real FRP runtime would do something smarter like mutate
a value over time to cache previous state(s).
def screaming : Signal String := scan (· ++ "a") ""
#eval (List.range 5).map screaming -- ["", "a", "aa", "aaa", "aaaa"]
Something interesting to note is that this is our first Signal that isn’t
ultimately driven by the tick of the clock: we never actually do any
computation based on the internal value of t like we did with the UTC
time conversion example.
This should also feel a lot like what we did with stepping state machines back
in the first post! The difference, of course, is that vmStep needed an
action at each step (and a proof it was valid, of course). scan Signals
don’t, by contrast; it’s an autonomous state machine that just evolves on its
own. We’ll need a richer combinator to start folding in Events into the
mix.
Pause and ponder: a generalisation of scan could let the step function vary
as a function of time: this means it’d consume a Signal (β -> β) instead of
just a β -> β. If you’re feeling ambitious: implement this primitive, and
then write scan in terms of it. (Hint: you’ll need FRP.const.)
Traffic lights, revisited
Here’s where we left off the traffic light example: this retains our original unfortunate property that the walk signal is on only when the button is pressed. Not only does this violate liveness (someone can tape the button down and impede traffic forever), it’s also slightly unrealistic in that pedestrians need time to also cross the crosswalk.
inductive WalkSign where
| Walk
| DontWalk
deriving Repr, DecidableEq
def walkSignal (button : ◇ Unit) : □ WalkSign :=
fun t => match button t with
| some () => .Walk
| none => .DontWalk
def carLight (button : ◇ Unit) : □ Light :=
fun t => match button t with
| some () => .Red
| none => cycling t
def pedCrossing (button : ◇ Unit) : □ (Light × WalkSign) :=
FRP.map2 Prod.mk (carLight button) (walkSignal button)
Our goal is now to introduce a stateful value that can encode how many ticks are left on the crosswalk or how many ticks are left until the button can be pressed once again. And, of course, as an inductive type, we can write a catamorphism for it:
CrossingState isn’t a recursive datatype like Nat or List so maybe it’s a
bit strange to write a “fold”, since no actual folding happens. A common
naming convention for this is elim, for type eliminator.
namespace CrossingState
inductive CrossingState where
| Idle -- Traffic light runs as usual
| Countdown : Nat → CrossingState -- N more walk ticks after this one
| Cooldown : Nat → CrossingState -- N more cooldown ticks after this one
deriving Repr
def fold (idle : β) (countdown : Nat → β) (cooldown : Nat → β)
: CrossingState → β
| .Idle => idle
| .Countdown n => countdown n
| .Cooldown n => cooldown n
end CrossingState
Using our scan combinator, we can write a Signal CrossingState that steps
itself through time according to a tick function: when a Countdown reaches
zero, we transition to a Cooldown, and when Cooldown reaches zero, we
transition to Idle.
def CrossingState.tick : CrossingState → CrossingState :=
fold
(idle := CrossingState.Idle)
(countdown := fun
| 0 => CrossingState.Cooldown 3
| n+1 => CrossingState.Countdown n)
(cooldown := fun
| 0 => .Idle
| n+1 => CrossingState.Cooldown n)
#eval List.range 8 |>.map (FRP.scan CrossingState.tick (.Cooldown 3))
-- output:
[Cooldown 3,
Cooldown 2,
Cooldown 1,
Cooldown 0,
Idle,
Idle,
Idle,
Idle]
This is great! We can step through our light example through time. Of course,
what’s missing is a way for an Event to inject a change into the fold.
That’s what accumulate will get us.
accumulate is a temporal fold over an Event
The idea of accumulate is this: we’re going to start with an Event of some
type and produce a Signal of some type. Since we said that accumulate
generalises scan, makes sense that we should at least consume an init
state - this at least nails down the type of the returned Signal.
def accumulate /- TODO: what else? -/ (init: β) (ev: ◇ a) : Signal β :=
sorry -- TODO: what to do?
Intuiting the function type for accumulate…
The goal of this section is to build up from intuition what the remaining
arguments of accumulate must be. Given that accumulate also involves a
catamorphism over Time, we probably want both a β and β -> β, but their
meaning will probably change, and so their argument names likely will as well.
Before we go any further, you should spend a moment trying to figure out what
the catamorphism for Option a is. (When you’re ready: did you choose β
(for the none case) and α → β (for the some a case)?)
…when the event is not firing…
One thing to notice is that so long as the given Event isn’t triggering (that
is, when ev t = none, this is doing exactly the same thing as our scan
combinator. So, scan’s step might as well be called onNone, since that’s
how to just produce a new β given the previous one.
-def accumulate /- TODO: what else? -/ (init: β) (ev: ◇ a) : Signal β :=
+def accumulate /- TODO: what else? -/ (onNone: β → β) (init : β) (ev: ◇ a) : Signal β :=
sorry -- TODO
Notice that this onNone is not the same as the β we guessed a moment ago.
The reason is that we’re not producing βs from scratch, but rather one that
involves the previous β state. So, that value has to be threaded through that
function. Generally, we say we’ve lifted β into the pure catamorphism.
…and when it is firing
This also means we want a onSome function, of some β-producing type!
-def accumulate /- TODO: what else? -/ (onNone: β → β) (init : β) (ev: ◇ a) : Signal β :=
+def accumulate (onSome: ? -> β) (onNone: β → β) (init : β) (ev: ◇ a) : Signal β :=
sorry -- TODO
Using the definition of catamorphisms for Option a, as well as our observation
about lifting the current β, we can figure out the type for onSome. It’s
got to consume an a, because that falls out straight from the catamorphism;
we have to do something with some a, after all! And, just like with
onNone, we will want to also lift a β in, too.
So, ultimately, our function will have type a -> β -> β. Which, makes sense
for the situation in which we’re calling it: ev has fired, producing an a,
and so we want to combine that with the current signal value in some way. And
so, our final function will look thus:
-def accumulate (onSome: ? -> β) (onNone: β → β) (init : β) (ev: ◇ a) : Signal β :=
+def accumulate (onSome: a → β → β) (onNone: β → β) (init : β) (ev: ◇ a) : Signal β :=
sorry -- TODO
Before proceeding, you should spend a moment convincing yourself that the wrong
thing to do here would have been to have accumulate consume an input,
“background” Signal for when the event’s not firing, instead of the init
and onNone values. Had we done that, we’d be back to the piecewise
combinator where consequences of the Event firing can’t ripple through
subsequent timesteps.
Implementing accumulate with the recursor for Time
OK, how do we actually write this thing? Since we said earlier that
accumulate generalises scan, using the recursor for Nat seems
like a good idea. Here’s the overall shape we’ll be working with:
def accumulate (onSome: a → β → β) (onNone: β → β) (init : β) (ev: ◇ a) : Signal β :=
- sorry -- TODO
+ fun n => Nat.rec
+ sorry -- TODO: what to do at t=0?
+ (fun s => sorry) -- TODO: what to do at t=(n+1)?
+ n
One helper that might be worth writing: both branches in Nat.rec need to either
call onSome or onNone: let’s factor out dispatching on whether the event has fired
into a helper function, called, I donno, switch:
(* Helper: at each time step, decide which β → β to use. *)
let switch (t: Time) : β → β := match ev t with
| none => onNone
| some a => onSome a
When n=0, we’ll return init directly. init is the value at time 0, so
there’s no event value to consult yet. For the n=(n'+1) case, we’ll pass in
the next time value, and the previous state, which the recursor will
automatically supply for us.
So in conclusion, our final accumulate is:
Notice that our use of Nat.rec looks a lot like the body of scan.
def accumulate (onSome: a → β → β) (onNone: β → β) (init : β) (ev: ◇ a) : Signal β :=
+ let switch (t: Time) : β → β := match ev t with
+ | none => onNone
+ | some a => onSome a
+
fun n => Nat.rec
- sorry -- TODO: what to do at t=0?
- (fun s => sorry) -- TODO: what to do at t=(n+1)?
+ init -- n=0
+ (fun n' s => switch (n'+1) s) -- n=(n'+1)
n
Notice that, because we actually do use n' in the recursor, in contrast to
scan, accumulate is genuinely paramorphic!
Pause and ponder: If you wrote the time-varying generalization of scan in the
previous section, and are still feeling ambitious, implement accumulate in
terms of that general combinator, FRP.map, and switch. Similarly, can you
write Event.latch in terms of accumulate?
Weaving in button presses
At last, we have enough mechanism to actually fold over both ordinary
transitions and inject events into the Signal stream: Let’s define a
button press Event that fires at t=2 and t=5:
Remember that to make an event truly an “eventually” we need to also supply a
witness that it fires at least once. by exists 5 would have been sufficient
as well.
def presses : ◇ Unit :=
let f := fun
| 2 | 5 => some ()
| _ => none
⟨ f, by exists 2 ⟩
Next, we need to define our onNone and onSome behaviour. When the button’s
not being pressed, the behaviour is just our usual tick. When it is pressed,
we’ll ignore the event unless we’re in the Idle state:
def onNone := tick
def onSome (_ev : Unit)
| .Idle => .Countdown 3
| s => tick s
Now we can wire all these up! We’ll use presses alongside onNone and onSome,
and some starting state:
def crosswalk ev := FRP.accumulate
(.Cooldown 2) -- init
CrossingState.onNone
CrossingState.onSome
ev
#eval List.range 10 |>.map (fun n => (n, (crosswalk presses) n))
[(0, CrossingState.Cooldown 2),
(1, CrossingState.Cooldown 1),
(2, CrossingState.Cooldown 0), -- t=2: Button press (ignored)
(3, CrossingState.Idle),
(4, CrossingState.Idle),
(5, CrossingState.Countdown 3), -- t=5: Button press (accepted)
(6, CrossingState.Countdown 2),
(7, CrossingState.Countdown 1),
(8, CrossingState.Countdown 0),
(9, CrossingState.Cooldown 3)]
We see precisely what we wanted to: at t=2, we’re still in the midst of a
cooldown, so that button press is ignored. However, by the time we reach
t=5, we’re in the Idle state, so the press is accepted.
An inductive invariant-aware accumulate
Way back in the first post in this series, we created the
dependent type validStep s a to ensure that a transition (in a pop machine,
or any other transition system) is valid. Since accumulate feels so much
like driving a state machine, we should come up with a similar mechanism for
“safe accumulation”.
With raw dependent types, a validStep needed to be proved at every step at
every trace (even if leaning on decidable decision procedures meant that that
proof could be, in practice, by decide). We got a great guarantee from this:
if a precondition wouldn’t hold, the whole trace would fail to typecheck, but
total static safety came with a heavy price in terms of ergonomics. And,
what’s more, the proofs we could write about a particular trace could be really
precise: ‘on the trace pay, pay, choose, take, pay, “you had dropped three
coins in total, terminating in a specific final state”’ could be something we
could express easily.
We then moved to a monadic API, which had no proof obligations on the user whatsoever. The ergonomics of that API were pretty ideal in that you could just sequence arbitrary steps, but we lost static guarantees and had to fall back to runtime failures when hitting an invalid step.
There’s a third point in the solution space that we can play with. Back in
yet another post, we introduced inductive invariants,
which are predicates that hold at init and are preserved on every step.
Rather than validity proofs operating on a step-by-step basis, necessitating every trace having their own sequence of proofs applied, we can write proofs at the granularity of the initial state and the step function itself. These are exactly what we get from an inductive invariant: proofs of initiation and concecution.
Assume-guarantee reasoning with refinement types
Let’s start writing this invariant-aware accumulate. It’ll begin with
consuming one additional implicit argument, the invariant itself. (Making the
argument implicit means the typechecker just infers it from the other
arguments.)
def accumulate
{inv : StateProp β} -- NEW: our inductive invariant, an implicit argument.
(init : ...)
(onNone : ...)
(onSome : ...)
(ev : ...)
: ... := -- TODO
For inv to be an inductive invariant, it has to hold for the β init
(that’s the initiation requirement). As the caller who supplies init,
it’ll be our job to provide a proof of this, so the internals of accumulate
can assume that it’s true. (Remember, this is an assumption in the formal
logic sense, not in the make-an-ass-out-of-you-and-me sense.)
Up to this point we probably would have passed in an additional h_init
argument of type inv init. That’d work! But, I’m going to do it in a
different way, using a particular kind of dependent type called a refinement
type.
If you’re interested in learning more about refinement types, might I be so bold as to suggest my Papers We Love talk about it?
A refinement type bundles up a plain old type with a proposition that must always hold for that type to be well-defined. I love refinement type systems because they’re in practice more restrictive than the full-blown dependent types that we’ve been using in Lean, and so, if you design your type system well, you can have something that feels like dependent types but with type inference!
You should pause and ponder about the connections between refinement types and subtypes.
Our refinement type for init, instead of just being a plain old β, will now
be a { s : β // inv s }. You can read this aloud as “a β such that inv v”. (Notice that we gave the particular β a name, so we could refer to it
in the “such that” qualifier.) When we want to operate on a value whose type
is a refinement type, just like with dependent pairs, we’ll destruct it into a
record with two
fields
⟨val, property⟩.
So that’s Initiation. The other requirement for inductive invariants is
consecution, namely, “under the assumption that we’re in a valid state,
stepping leaves us in a valid state”. Under our catamorphism we have two ways
to step (either when an event fires, onSome, or when it does not, onNone),
so we’ll have to refine those type arguments, too.
That leaves us with all our arguments to accumulate looking like this:
The standard convention is to name the before state s and the after state
s'.
def accumulate
{inv: StateProp β}
- (init : ...)
- (onNone : ...)
- (onSome : ...)
- (ev : ...)
- : ... := -- TODO
+ (init : { s: β // inv s})
+ (onNone: { s: β // inv s } → { s': β // inv s' })
+ (onSome: α → { s: β // inv s} → {s': β // inv s'})
+ (ev: ◇ α)
+ : /- TODO: return type? -/ := ... -- TODO: implementation?
Previously, accumulate returned a Signal β. Stands to reason that it’s now
going to be a refinement type { sig: (Signal β) // ... }. But what’s the
proposition that should be true on the return type?
Remember that inductive invariants encode safety properties. So, whatever that
property is, it better hold for every time step in the returned signal. That’s
an proposition in LTL! In particular, (□ (LTL.atom inv)) applied to the
Signal.
So, our final function signature for accumulate is:
def accumulate
{inv: StateProp β}
(init : { s: β // inv s})
(onNone: { s: β // inv s } → { s': β // inv s' })
(onSome: α → { s: β // inv s} → {s': β // inv s'})
(ev: ◇ α)
- : /- TODO: return type? -/ := ... -- TODO: implementation?
+ : { sig : (Signal β) // (□ (LTL.atom inv)) sig } := ... -- TODO: implementation?
Notice that this refines the entire Signal, not the β inside the signal.
“(Signal β) // ...” is saying “here’s a Signal and a proof of its safety
property”, whereas, if we’d parenthesized it differently, Signal (β // ...)
would make a pointwise claim about every timestep.
sig : (Signal β) // (□ (LTL.atom inv)) sig is what we get for proving
initiation and consecution: if we implement our refined accumulate correctly,
it’ll guarantee that inv holds at all time steps and therefore will guarantee
our safety property.
A bit more concretely, a refined value has two fields: .val and .property.
What this means for our safety proof-carrying signal is that, informally,
(s.val)[t] : β - we can extract the raw signal value at any time step t
like before. But now, we can also extract the per-tick proof of the invariant
holding; again, informally, (s.property)[t] : inv (s.val t). (This isn’t
valid Lean, but just meant to be demonstrative.)
Such “assume-guarantee” reasoning is critical for composing verified code together: the guarantee out of one function can become an assumption into the next. (In the next post, we’ll see how assume-guarantee reasoning composes.)
Coercing a subtype back into a Signal
OK, accumulate returns a refinement type-pair of a Signal plus the safety
witness, but in actual uses of accumulate we only want the Signal itself.
Let’s add a quick coercion, just like we did for Events last time, to treat
the refinement type as callable, itself:
instance {P : Signal StateProp β} :
CoeFun { sig : (Signal β) // P sig } (fun _ => Signal β) where
coe s := s.val
In general, when you have a function bundled with a proof obligation, CoeFun
lets the function be used without manual unwrapping. The safety theorem is
still tagging along, but not in the way when we just want the function.
Implementing the dependent accumulate
OK, let’s fix our accumulate implementation. Our goal is first to fix the
type errors that we’ve introduced by changing the function signatures, and then
decide the shape of the final value that gets produced.
Fixing switch and our recursor
Our switch helper is currently operating on raw βs and so doesn’t typecheck
anymore:
...
let switch (t: Time) : β → β := match ev t with
| none => onNone
| some a => onSome a
Type mismatch
onNone
has type
{ s // inv s } → { s' // inv s' }
but is expected to have type
β → β
Luckily, the hard part is going to be writing onNone and onSome, not
calling them. So, it’s enough to just change the type signature to ensure
applying those functions is consistent with their types:
...
- let switch (t: Time) : β → β := match ev t with
- | none => onNone
- | some a => onSome a
+ init
(fun n s => switch (n + 1) s) n
failed to elaborate eliminator, expected type is not available
A slightly more alarming error, that, unlike the previous one, doesn’t give us
much to go on. In essence, Nat.rec relies on a type annotation that we’ve
lost in the refactoring; factoring out let-bindings can sometimes require
adding some annotations, especially if the right-hand side of the binding has
weird dependent types like motive types. Luckily, writing down our intent here
is also simple change - just gotta follow the types:
- let step_at := fun n => Nat.rec
+ let step_at : Signal {s: β // inv s} := fun n => Nat.rec
init
(fun n s => switch (n + 1) s) n
Lifting step_at’s proofs into a safety property
Notice that step_at now returns a pointwise proof: we get back a record < val := value_at_i, property := validity_at_i>: the value after time step i,
and a proof that whatever that value is, it satisfies inv at time step i.
This is our Signal (β // ...) example from before, but we need it to look
like (Signal β) // ..." instead. We’ll do this with the theorem we wrote
at the top of this post! We’ll take the infinitely-many pointwise invariant
presevation proofs and collate them into an LTL proposition. This’ll complete
the implementation.
theorem always_atom {inv : StateProp β} (sig : Signal β) (h : ∀ t, inv (sig t))
: (□ (LTL.atom inv)) sig := by
intro t ; simp [LTL.atom, drop, now] ; exact h t
def accumulate
-- Given a property over some state ...
{inv: StateProp β}
-- an initial state,
(init : { s: β // inv s})
-- a transition function when no event fires...
(onNone: { s: β // inv s } → { s': β // inv s' })
-- a transition function when an event _does_ fire...
(onSome: α → { s: β // inv s} → {s': β // inv s'})
-- and an event...
(ev: Event α)
-- ...produce a single refined value, made up of a `Signal β`, and
-- a safety proof over all time steps.
: { sig : Signal β // (□ (LTL.atom inv)) sig } :=
-- `switch` produces the next state, depending on whether the event
-- fired at the given timestep
let switch (t: Time) : {s: β // inv s} → {s': β // inv s'} :=
match ev t with
| none => onNone
| some a => onSome a
-- `step_at` takes `t` steps through `switch`; at each time step, it
-- produces a β alongside its proof of .preserving `inv`
let step_at : □ {s: β // inv s} :=
fun t => Nat.rec
init
(fun n s => switch (t + 1) s)
t
-- Reorganize the signal of refined values into a refined signal.
let vals : □ β := fun t => (step_at t).vals
let safety : ∀ t, inv (vals t) := fun t => (step_at t).property
⟨ vals, always_atom vals safety ⟩
A safe intersection
OK, let’s tie this post off by showing how to prove a traffic light safety
property with accumulate.
This might sound trivial but an integer overflow was why the Ariane 5 blew up shortly after launch.
Suppose that the microcontroller inside the traffic light uses a three-bit counter. For safety-certified hardware, each gate is audited, and city council won’t pay for re-certifying a wider register. So, a safety property that prevents ever overflowing our counter registers would be:
We’ve seen before that abbrevs are “transparent” to the typechecker;
this’ll make writing proofs about bounded a bit easier.
abbrev bounded : CrossingState → Prop
| .Idle => True
| .Countdown n => 0 <= n ∧ n < 8
| .Cooldown n => 0 <= n ∧ n < 8
(Technically, Lean’s Nat datatype prevents us underflowing here, but I’ll
state the lower bound in the safety property, anyway.)
The design constraints for this particular intersection are well within this bound, but perhaps, say, for crossing a wider road with more car traffic, city planners would want both a larger countdown and cooldown value. Or, perhaps, those values could be dynamically-scaled up depending on live traffic data that doesn’t know anything about the underlying hardware. Our goal is to statically prevent anyone from, in the future, trying to set a value that would overflow that register.
OK, let’s lift our tick function into its refinement type version. Once we
replace all the sorrys with a valid proof of bounded, we’ll be able to wire
it all up to an accumulate call and produce a verified FRP program.
Here’s the skeleton with our modified tick function: since we now consume and
produce the refinement type value, we consume and produce tuples of the
CrossingState and proof of bounded s.
def tick : { s: CrossingState // bounded s } → { s': CrossingState // bounded s' }
| ⟨.Idle, _ ⟩ => ⟨ .Idle, by sorry ⟩
| ⟨.Cooldown 0, _ ⟩ => ⟨ .Idle, by sorry ⟩
| ⟨.Cooldown (n+1), _ ⟩ => ⟨ .Cooldown n, by sorry ⟩
| ⟨.Countdown 0, _ ⟩ => ⟨ .Cooldown 3, by sorry ⟩
| ⟨.Countdown (n+1), _ ⟩ => ⟨ .Countdown n, by sorry ⟩
Proving validity of tick’s possible arguments
All transitions to .Idle are trivial to prove, literally, since bounded .Idle = True. First two cases sorted without any difficulty.
...
| ⟨.Idle, _ ⟩ => ⟨ .Idle, by trivial ⟩ -- NEW
| ⟨.Cooldown 0, _ ⟩ => ⟨ .Idle, by trivial ⟩ -- NEW
...
The nonzero Countdown and Cooldown steps are pretty easy, too. unfolding
the definition of bounded means our goal is to prove 0 <= n and n < 8,
given a hypothesis h stating the same about n+1. As expected, lia will deal with
that inequality without any difficulties.
If we’d used def rather than abbrev when defining bounded, we would have
needed to first show that our goal was to prove 0 <= n ∧ n < 8, and then
discharge it with lia; as it stands now the tactic can see through the
abbrev to its definition`.
...
| ⟨.Cooldown (n+1), _ ⟩ => ⟨ .Cooldown n, by lia ⟩
| ⟨.Countdown 0, _ ⟩ => ⟨ .Cooldown 3, by lia ⟩
| ⟨.Countdown (n+1), _ ⟩ => ⟨ .Cooldown n, by lia ⟩
...
def tick : { s: CrossingState // bounded s } → { s': CrossingState // bounded s' }
- | ⟨.Idle, _ ⟩ => ⟨ .Idle, by sorry ⟩
+ | ⟨.Idle, _ ⟩ => ⟨ .Idle, by trivial ⟩
- | ⟨.Cooldown 0, _ ⟩ => ⟨ .Idle, by sorry ⟩
+ | ⟨.Cooldown 0, _ ⟩ => ⟨ .Idle, by trivial ⟩
- | ⟨.Cooldown (n+1), _ ⟩ => ⟨ .Cooldown n, by sorry ⟩
+ | ⟨.Cooldown (n+1), _ ⟩ => ⟨ .Cooldown n, by lia⟩
- | ⟨.Countdown 0, _ ⟩ => ⟨ .Cooldown 3, by sorry ⟩
+ | ⟨.Countdown 0, _ ⟩ => ⟨ .Cooldown 3, by lia ⟩
- | ⟨.Countdown (n+1), _ ⟩ => ⟨ .Countdown n, by sorry ⟩
+ | ⟨.Countdown (n+1), _ ⟩ => ⟨ .Countdown n, by lia ⟩
Implementing our step functions in terms of tick
Here’s how our onNone and onSome functions looked before.
def onNone := tick
def onSome (_ev : Unit)
| .Idle => .Countdown 3
| s => tick s
Our only change needs to be in onSome, when we transition from Idle to
Countdown 3. lia works like magic.
def onNone := tick
def onSome (_ev : Unit)
- | .Idle => .Countdown 3
+ | ⟨.Idle, h⟩ => ⟨ .Countdown 3, by lia ⟩
| s => tick s
Tying it all together
With one final definition using accumulate, our job here is done:
def crosswalk (ev : ◇ Unit) : { sig // □ (LTL.atom bounded) sig } :=
FRP.accumulate
⟨CrossingState.CrossingState.Idle, by trivial⟩
CrossingState.onNone
CrossingState.onSome
ev
Let’s look at the type signature of crosswalk: we can give it any sequence
of button presses - a polite button presser, an eager spammer, or some other
sequence you or I haven’t thought of yet - and out comes a Signal of states
bundled with a safety proof that the hardware never overflows its counter
register.
Last time, we proved the walkOnlyWhenRed safety property, manually, for
a given specific reactive program. Here, the safety property is encapsulated
in its return type. This means that any FRP program that makes use of crosswalk,
like, say:
#eval List.range 10 |>.map (fun n => (n, (crosswalk presses) n))
[(0, CrossingState.Idle),
(1, CrossingState.Idle),
(2, CrossingState.Idle), -- t=2: button pressed!
(3, CrossingState.Countdown 3),
(4, CrossingState.Countdown 2),
(5, CrossingState.Countdown 1), -- t=5: button pressed (ignored...)
(6, CrossingState.Countdown 0),
(7, CrossingState.Cooldown 3),
(8, CrossingState.Cooldown 2),
(9, CrossingState.Cooldown 1),
(10 CrossingState.Cooldown 0),
(11, CrossingState.Idle)]
… or, a larger reactive program that composes crosswalk with other
Signals, or possibly any number of button schedule Events, will carry
through that safety property, holding us in good stead, all held together
by Lean’s typechecker.
Next time, we’ll looking at composing Signals with different safety
properties: As an example, let’s go back to our spammer Event and see what
the trace looks like:
#eval List.range 12 |>.map (fun n => (n, (crosswalk spammer) n))
[(0, CrossingState.Idle), -- t=0: button pressed!
(1, CrossingState.Countdown 3), -- t=1: button pressed (ignored)
(2, CrossingState.Countdown 2), -- t=2: button pressed (ignored, again)
(3, CrossingState.Countdown 1), -- ...and so on, forever ...
(4, CrossingState.Countdown 0),
(5, CrossingState.Cooldown 3),
(6, CrossingState.Cooldown 2),
(7, CrossingState.Cooldown 1),
(8, CrossingState.Cooldown 0),
(9, CrossingState.Idle), -- t=9: button pressed!
(10, CrossingState.Countdown 3), -- ...and so on, forever ...
(11, CrossingState.Countdown 2)]
No matter how hard we hammer the button, we’ll at least once in awhile
eventually get an Idle state for crosswise traffic to flow again.
Notice something interesting: in Part 4, this exact event was the
counter-example to liveness; spammer could keep cars from ever proceeding.
Under the countdown-and-then-cooldown protocol, that’s no longer possible. The
system returns to Idle every 9 ticks, no matter what — which means cars get
their turn even under maximum harassment. We’ve essentially fixed the liveness
violation from Part 4 by introducing state!
This is a liveness property: “always, eventually Idle.” But because we have
an explicit bound (every 9 ticks, never longer), it’s actually expressible as a
safety property, via a technique called liveness-to-safety reduction. The
trick is to augment the state with a deadline counter and assert “either we’re
Idle now, or the deadline is still in the future.” That conjunction is an
inductive invariant; accumulate would give us □ of it for free. This post is
already too long so I won’t show you the details here, but the takeaway is that
bounded liveness is closer to within reach than the safety/liveness distinction
usually suggests.
Next time, we’ll start composing reactive components, building more interesting
dependency graphs of Signals where one’s output becomes another’s input.