module arch.node.engines.shard_behaviour;

import arch.node.engines.shard_messages open;
import arch.node.engines.shard_config open;
import arch.node.engines.shard_environment open;

import Stdlib.Data.Nat open;
import Stdlib.Data.List as List;
import prelude open;
import arch.node.types.basics open;
import arch.node.types.identities open;
import arch.node.types.messages open;
import arch.node.types.engine open;
import arch.node.types.anoma as Anoma open;

type ShardActionArgument := | ShardActionArgumentReplyTo EngineID;

ShardActionArguments : Type := List ShardActionArgument;

findMostRecentWrite
  {KVSKey KVSDatum}
  {{Ord KVSKey}}
  (dag : DAGStructure KVSKey KVSDatum)
  (key : KVSKey)
  (timestamp : TxFingerprint)
  : Option KVSDatum :=
  case Map.lookup key (DAGStructure.keyAccesses dag) of
    | none := none
    | some timestampMap :=
      let
        validEntries :=
          List.filter
            \{entry :=
              fst entry < timestamp
                && case KeyAccess.writeStatus (snd entry) of {
                     | some writeStatus :=
                       -- Ignore empty maywrites.
                         not
                         (WriteStatus.mayWrite writeStatus
                           && isNone (WriteStatus.data writeStatus))
                     | none := false
                   }}
            (Map.toList timestampMap);
      in case maximumBy \{entry := fst entry} validEntries of
           | some (mkPair _ access) :=
             case KeyAccess.writeStatus access of {
               | some writeStatus := WriteStatus.data writeStatus
               | none := none
             }
           | none := none;

addReadAccess
  {KVSKey KVSDatum}
  {{Ord KVSKey}}
  (dag : DAGStructure KVSKey KVSDatum)
  (key : KVSKey)
  (timestamp : TxFingerprint)
  (readStatus : ReadStatus)
  : DAGStructure KVSKey KVSDatum :=
  let
    keyMap :=
      case Map.lookup key (DAGStructure.keyAccesses dag) of
        | none := Map.empty
        | some m := m;
    existingAccess :=
      case Map.lookup timestamp keyMap of
        | none :=
          mkKeyAccess@{
            readStatus := none;
            writeStatus := none;
          }
        | some access := access;
    newAccess := existingAccess@KeyAccess{readStatus := some readStatus};
    newKeyMap := Map.insert timestamp newAccess keyMap;
    newKeyAccesses := Map.insert key newKeyMap (DAGStructure.keyAccesses dag);
  in dag@DAGStructure{keyAccesses := newKeyAccesses};

addWriteAccess
  {KVSKey KVSDatum}
  {{Ord KVSKey}}
  (dag : DAGStructure KVSKey KVSDatum)
  (key : KVSKey)
  (timestamp : TxFingerprint)
  (writeStatus : WriteStatus KVSDatum)
  : DAGStructure KVSKey KVSDatum :=
  let
    keyMap :=
      case Map.lookup key (DAGStructure.keyAccesses dag) of
        | none := Map.empty
        | some m := m;
    existingAccess :=
      case Map.lookup timestamp keyMap of
        | none :=
          mkKeyAccess@{
            readStatus := none;
            writeStatus := none;
          }
        | some access := access;
    newAccess := existingAccess@KeyAccess{writeStatus := some writeStatus};
    newKeyMap := Map.insert timestamp newAccess keyMap;
    newKeyAccesses := Map.insert key newKeyMap (DAGStructure.keyAccesses dag);
  in dag@DAGStructure{keyAccesses := newKeyAccesses};

replaceReadAccess
  {KVSKey KVSDatum}
  {{Ord KVSKey}}
  (dag : DAGStructure KVSKey KVSDatum)
  (key : KVSKey)
  (timestamp : TxFingerprint)
  : Option (DAGStructure KVSKey KVSDatum) :=
  let
    keyMap :=
      case Map.lookup key (DAGStructure.keyAccesses dag) of
        | none := Map.empty
        | some m := m;
    access :=
      case Map.lookup timestamp keyMap of
        | none := none
        | some a := some a;
  in case access of
       | some a :=
         case KeyAccess.readStatus a of {
           | none := none
           -- Fail if no readStatus/no lock
           | some rs :=
             let
               updatedReadStatus := rs@ReadStatus{hasBeenRead := true};
               updatedAccess :=
                 a@KeyAccess{readStatus := some updatedReadStatus};
               updatedKeyMap := Map.insert timestamp updatedAccess keyMap;
               updatedKeyAccesses :=
                 Map.insert key updatedKeyMap (DAGStructure.keyAccesses dag);
             in some dag@DAGStructure{keyAccesses := updatedKeyAccesses}
         }
       | none := none;

-- Fail if no access exists for the timestamp
replaceWriteAccess
  {KVSKey KVSDatum}
  {{Ord KVSKey}}
  (dag : DAGStructure KVSKey KVSDatum)
  (key : KVSKey)
  (timestamp : TxFingerprint)
  (newData : Option KVSDatum)
  : Option (DAGStructure KVSKey KVSDatum) :=
  let
    keyMap :=
      case Map.lookup key (DAGStructure.keyAccesses dag) of
        | none := Map.empty
        | some m := m;
  in case Map.lookup timestamp keyMap of
       | some a :=
         case KeyAccess.writeStatus a of {
           | none := none
           -- Fail if no writeStatus exists/no write lock
           | some ws :=
             case isNone newData && not (WriteStatus.mayWrite ws) of {
               | true := none
               -- null writes can only happen on mayWrites
               | false :=
                 let
                   data :=
                     case newData of
                       | none := WriteStatus.data ws
                       -- Do not update and do not fail if nothing needs to be written.
                       | some dat := some dat;
                   updatedAccess :=
                     a@KeyAccess{writeStatus := some
                       ws@WriteStatus{data := data}};
                   updatedKeyMap := Map.insert timestamp updatedAccess keyMap;
                   updatedKeyAccesses :=
                     Map.insert
                       key
                       updatedKeyMap
                       (DAGStructure.keyAccesses dag);
                 in some dag@DAGStructure{keyAccesses := updatedKeyAccesses}
             }
         }
       | none := none;

-- Fail if no access exists for the timestamp
generateReadMsg
  {KVSKey KVSDatum Executable}
  (sender : EngineID)
  (key : KVSKey)
  (timestamp : TxFingerprint)
  (data : KVSDatum)
  (executor : EngineID)
  : EngineMsg (PreMsg KVSKey KVSDatum Executable) :=
  mkEngineMsg@{
    sender := sender;
    target := executor;
    mailbox := some 0;
    msg :=
      Anoma.MsgShard
        (ShardMsgKVSRead
          mkKVSReadMsg@{
            timestamp := timestamp;
            key := key;
            data := data;
          });
  };

-- Try to send a read message for a valid, pending eager read lock.
execEagerReadsAtTime
  {KVSKey KVSDatum Executable}
  {{Ord KVSKey}}
  (sender : EngineID)
  (dag : DAGStructure KVSKey KVSDatum)
  (key : KVSKey)
  (timestamp : TxFingerprint)
  (access : KeyAccess KVSDatum)
  : Option
    (Pair
      (DAGStructure KVSKey KVSDatum)
      (EngineMsg (Anoma.PreMsg KVSKey KVSDatum Executable))) :=
  case KeyAccess.readStatus access of
    | some readStatus :=
      case
        ReadStatus.isEager readStatus && not (ReadStatus.hasBeenRead readStatus)
      of {
        | true :=
          case
            timestamp < DAGStructure.heardAllWrites dag
              && timestamp >= DAGStructure.heardAllReads dag
          of {
            | true :=
              case findMostRecentWrite dag key timestamp of {
                | some data :=
                  let
                    newReadStatus := readStatus@ReadStatus{hasBeenRead := true};
                    newDag := addReadAccess dag key timestamp newReadStatus;
                    msg :=
                      generateReadMsg
                        sender
                        key
                        timestamp
                        data
                        (ReadStatus.executor readStatus);
                  in some (mkPair newDag msg)
                | none := none
              }
            | false := none
          }
        | false := none
      }
    | none := none;

-- Try to send a read messages for valid, pending eager read locks of a key.
execEagerReadsAtKey
  {KVSKey KVSDatum Executable}
  {{Ord KVSKey}}
  (sender : EngineID)
  (dag : DAGStructure KVSKey KVSDatum)
  (key : KVSKey)
  (timestampMap : Map TxFingerprint (KeyAccess KVSDatum))
  : Pair
    (DAGStructure KVSKey KVSDatum)
    (List (EngineMsg (Anoma.PreMsg KVSKey KVSDatum Executable))) :=
  let
    processTimestamp :=
      \{k v acc :=
        case acc of
          | mkPair currDag msgs :=
            case execEagerReadsAtTime sender currDag key k v of
              | some processed := mkPair (fst processed) (snd processed :: msgs)
              | none := acc};
  in Map.foldr processTimestamp (mkPair dag []) timestampMap;

-- Try to send all read messages for valid, pending eager read locks.
execEagerReads
  {KVSKey KVSDatum Executable}
  {{Ord KVSKey}}
  (sender : EngineID)
  (dag : DAGStructure KVSKey KVSDatum)
  : Pair
    (DAGStructure KVSKey KVSDatum)
    (List (EngineMsg (Anoma.PreMsg KVSKey KVSDatum Executable))) :=
  let
    processKey :=
      \{k v acc :=
        case acc of
          | mkPair currDag msgs :=
            let
              processed := execEagerReadsAtKey sender currDag k v;
            in mkPair (fst processed) (msgs ++ snd processed)};
  in Map.foldr processKey (mkPair dag []) (DAGStructure.keyAccesses dag);

ShardAction (KVSKey KVSDatum Executable ProgramState : Type) : Type :=
  Action
    ShardCfg
    (ShardLocalState KVSKey KVSDatum)
    ShardMailboxState
    ShardTimerHandle
    ShardActionArguments
    (Anoma.PreMsg KVSKey KVSDatum Executable)
    (Anoma.PreCfg KVSKey KVSDatum Executable)
    (Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);

ShardActionInput (KVSKey KVSDatum Executable : Type) : Type :=
  ActionInput
    ShardCfg
    (ShardLocalState KVSKey KVSDatum)
    ShardMailboxState
    ShardTimerHandle
    ShardActionArguments
    (Anoma.PreMsg KVSKey KVSDatum Executable);

ShardActionEffect (KVSKey KVSDatum Executable ProgramState : Type) : Type :=
  ActionEffect
    (ShardLocalState KVSKey KVSDatum)
    ShardMailboxState
    ShardTimerHandle
    (Anoma.PreMsg KVSKey KVSDatum Executable)
    (Anoma.PreCfg KVSKey KVSDatum Executable)
    (Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);

ShardActionExec (KVSKey KVSDatum Executable ProgramState : Type) : Type :=
  ActionExec
    ShardCfg
    (ShardLocalState KVSKey KVSDatum)
    ShardMailboxState
    ShardTimerHandle
    ShardActionArguments
    (Anoma.PreMsg KVSKey KVSDatum Executable)
    (Anoma.PreCfg KVSKey KVSDatum Executable)
    (Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);

acquireLockAction
  {KVSKey KVSDatum Executable ProgramState}
  {{Ord KVSKey}}
  (input : ShardActionInput KVSKey KVSDatum Executable)
  : Option (ShardActionEffect KVSKey KVSDatum Executable ProgramState) :=
  let
    cfg := ActionInput.cfg input;
    env := ActionInput.env input;
    local := EngineEnv.localState env;
    trigger := ActionInput.trigger input;
  in case getEngineMsgFromTimestampedTrigger trigger of
       | some mkEngineMsg@{
                msg := Anoma.MsgShard (ShardMsgKVSAcquireLock lockMsg);
              } :=
         let
           addEagerReadAccesses :=
             \{key dag :=
               let
                 readStatus :=
                   mkReadStatus@{
                     hasBeenRead := false;
                     isEager := true;
                     executor := KVSAcquireLockMsg.executor lockMsg;
                   };
               in addReadAccess
                 dag
                 key
                 (KVSAcquireLockMsg.timestamp lockMsg)
                 readStatus};
           addLazyReadAccesses :=
             \{key dag :=
               let
                 readStatus :=
                   mkReadStatus@{
                     hasBeenRead := false;
                     isEager := false;
                     executor := KVSAcquireLockMsg.executor lockMsg;
                   };
               in addReadAccess
                 dag
                 key
                 (KVSAcquireLockMsg.timestamp lockMsg)
                 readStatus};
           addWillWriteAccesses :=
             \{key dag :=
               let
                 writeStatus :=
                   mkWriteStatus@{
                     data := none;
                     mayWrite := false;
                   };
               in addWriteAccess
                 dag
                 key
                 (KVSAcquireLockMsg.timestamp lockMsg)
                 writeStatus};
           addMayWriteAccesses :=
             \{key dag :=
               let
                 writeStatus :=
                   mkWriteStatus@{
                     data := none;
                     mayWrite := true;
                   };
               in addWriteAccess
                 dag
                 key
                 (KVSAcquireLockMsg.timestamp lockMsg)
                 writeStatus};
           dagWithEagerReads :=
             Set.foldr
               addEagerReadAccesses
               (ShardLocalState.dagStructure local)
               (KVSAcquireLockMsg.eager_read_keys lockMsg);
           dagWithAllReads :=
             Set.foldr
               addLazyReadAccesses
               dagWithEagerReads
               (KVSAcquireLockMsg.lazy_read_keys lockMsg);
           dagWithWillWrites :=
             Set.foldr
               addWillWriteAccesses
               dagWithAllReads
               (KVSAcquireLockMsg.will_write_keys lockMsg);
           dagWithAllWrites :=
             Set.foldr
               addMayWriteAccesses
               dagWithWillWrites
               (KVSAcquireLockMsg.may_write_keys lockMsg);
           propagationResult :=
             execEagerReads (getEngineIDFromEngineCfg cfg) dagWithAllWrites;
           newLocal :=
             local@ShardLocalState{dagStructure := fst propagationResult};
           newEnv := env@EngineEnv{localState := newLocal};
         in some
           mkActionEffect@{
             env := newEnv;
             msgs :=
               mkEngineMsg@{
                 sender := getEngineIDFromEngineCfg (ActionInput.cfg input);
                 target := KVSAcquireLockMsg.worker lockMsg;
                 mailbox := some 0;
                 msg :=
                   Anoma.MsgShard
                     (ShardMsgKVSLockAcquired
                       mkKVSLockAcquiredMsg@{
                         timestamp := KVSAcquireLockMsg.timestamp lockMsg;
                       });
               }
                 :: snd propagationResult;
             timers := [];
             engines := [];
           }
       | _ := none;

processWriteAction
  {KVSKey KVSDatum Executable ProgramState}
  {{Ord KVSKey}}
  (input : ShardActionInput KVSKey KVSDatum Executable)
  : Option (ShardActionEffect KVSKey KVSDatum Executable ProgramState) :=
  let
    cfg := ActionInput.cfg input;
    env := ActionInput.env input;
    local := EngineEnv.localState env;
    trigger := ActionInput.trigger input;
  in case getEngineMsgFromTimestampedTrigger trigger of
       | some mkEngineMsg@{msg := Anoma.MsgShard (ShardMsgKVSWrite writeMsg)} :=
         let
           dag := ShardLocalState.dagStructure local;
           key := KVSWriteMsg.key writeMsg;
           timestamp := KVSWriteMsg.timestamp writeMsg;
         in case
              replaceWriteAccess dag key timestamp (KVSWriteMsg.datum writeMsg)
            of {
              | some updatedDag :=
                let
                  propagationResult :=
                    execEagerReads (getEngineIDFromEngineCfg cfg) updatedDag;
                  newLocal :=
                    local@ShardLocalState{dagStructure := fst
                      propagationResult};
                  newEnv := env@EngineEnv{localState := newLocal};
                  readMsgs := snd propagationResult;
                in some
                  mkActionEffect@{
                    env := newEnv;
                    msgs := readMsgs;
                    timers := [];
                    engines := [];
                  }
              | none := none
            }
       | _ := none;

processReadRequestAction
  {KVSKey KVSDatum Executable ProgramState}
  {{Ord KVSKey}}
  (input : ShardActionInput KVSKey KVSDatum Executable)
  : Option (ShardActionEffect KVSKey KVSDatum Executable ProgramState) :=
  let
    cfg := ActionInput.cfg input;
    env := ActionInput.env input;
    local := EngineEnv.localState env;
    trigger := ActionInput.trigger input;
  in case getEngineMsgFromTimestampedTrigger trigger of
       | some mkEngineMsg@{
                sender := sender;
                msg := Anoma.MsgShard (ShardMsgKVSReadRequest readReqMsg);
              } :=
         let
           dag := ShardLocalState.dagStructure local;
           key := KVSReadRequestMsg.key readReqMsg;
           timestamp := KVSReadRequestMsg.timestamp readReqMsg;
           actual := KVSReadRequestMsg.actual readReqMsg;
         in case timestamp >= DAGStructure.heardAllReads dag of {
              | false := none
              | true :=
                case replaceReadAccess dag key timestamp of {
                  | none := none
                  -- Fail if no valid read lock exists
                  | some updatedDag :=
                    case actual of {
                      | false :=
                        -- If `actual` is false, just update the DAG and return.
                        let
                          newLocal :=
                            local@ShardLocalState{dagStructure := updatedDag};
                          newEnv := env@EngineEnv{localState := newLocal};
                        in some
                          mkActionEffect@{
                            env := newEnv;
                            msgs := [];
                            timers := [];
                            engines := [];
                          }
                      | true :=
                        case findMostRecentWrite updatedDag key timestamp of {
                          | none := none
                          | some data :=
                            let
                              readMsg :=
                                mkEngineMsg@{
                                  sender := getEngineIDFromEngineCfg cfg;
                                  target := sender;
                                  mailbox := some 0;
                                  msg :=
                                    Anoma.MsgShard
                                      (ShardMsgKVSRead
                                        mkKVSReadMsg@{
                                          timestamp := timestamp;
                                          key := key;
                                          data := data;
                                        });
                                };
                              newLocal :=
                                local@ShardLocalState{dagStructure := updatedDag};
                              newEnv := env@EngineEnv{localState := newLocal};
                            in some
                              mkActionEffect@{
                                env := newEnv;
                                msgs := [readMsg];
                                timers := [];
                                engines := [];
                              }
                        }
                    }
                }
            }
       | _ := none;

updateSeenAllAction
  {KVSKey KVSDatum Executable ProgramState}
  {{Ord KVSKey}}
  (input : ShardActionInput KVSKey KVSDatum Executable)
  : Option (ShardActionEffect KVSKey KVSDatum Executable ProgramState) :=
  let
    cfg := ActionInput.cfg input;
    env := ActionInput.env input;
    local := EngineEnv.localState env;
    trigger := ActionInput.trigger input;
  in case getEngineMsgFromTimestampedTrigger trigger of
       | some mkEngineMsg@{
                msg := Anoma.MsgShard (ShardMsgUpdateSeenAll updateMsg);
              } :=
         let
           oldDag := ShardLocalState.dagStructure local;
           newDag :=
             case UpdateSeenAllMsg.write updateMsg of
               | true :=
                 oldDag@DAGStructure{heardAllWrites := UpdateSeenAllMsg.timestamp
                   updateMsg}
               | false :=
                 oldDag@DAGStructure{heardAllReads := UpdateSeenAllMsg.timestamp
                   updateMsg};
           propagationResult :=
             case UpdateSeenAllMsg.write updateMsg of
               | true := execEagerReads (getEngineIDFromEngineCfg cfg) newDag
               | false := mkPair newDag [];
           newLocal :=
             local@ShardLocalState{dagStructure := fst propagationResult};
           newEnv := env@EngineEnv{localState := newLocal};
           readMsgs := snd propagationResult;
         in some
           mkActionEffect@{
             env := newEnv;
             msgs := readMsgs;
             timers := [];
             engines := [];
           }
       | _ := none;

acquireLockActionLabel
  {KVSKey KVSDatum Executable ProgramState}
  {{Ord KVSKey}}
  : ShardActionExec KVSKey KVSDatum Executable ProgramState :=
  Seq [acquireLockAction];

processWriteActionLabel
  {KVSKey KVSDatum Executable ProgramState}
  {{Ord KVSKey}}
  : ShardActionExec KVSKey KVSDatum Executable ProgramState :=
  Seq [processWriteAction];

processReadRequestActionLabel
  {KVSKey KVSDatum Executable ProgramState}
  {{Ord KVSKey}}
  : ShardActionExec KVSKey KVSDatum Executable ProgramState :=
  Seq [processReadRequestAction];

updateSeenAllActionLabel
  {KVSKey KVSDatum Executable ProgramState}
  {{Ord KVSKey}}
  : ShardActionExec KVSKey KVSDatum Executable ProgramState :=
  Seq [updateSeenAllAction];

ShardGuard (KVSKey KVSDatum Executable ProgramState : Type) : Type :=
  Guard
    ShardCfg
    (ShardLocalState KVSKey KVSDatum)
    ShardMailboxState
    ShardTimerHandle
    ShardActionArguments
    (Anoma.PreMsg KVSKey KVSDatum Executable)
    (Anoma.PreCfg KVSKey KVSDatum Executable)
    (Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);

ShardGuardOutput (KVSKey KVSDatum Executable ProgramState : Type) : Type :=
  GuardOutput
    ShardCfg
    (ShardLocalState KVSKey KVSDatum)
    ShardMailboxState
    ShardTimerHandle
    ShardActionArguments
    (Anoma.PreMsg KVSKey KVSDatum Executable)
    (Anoma.PreCfg KVSKey KVSDatum Executable)
    (Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);

ShardGuardEval (KVSKey KVSDatum Executable ProgramState : Type) : Type :=
  GuardEval
    ShardCfg
    (ShardLocalState KVSKey KVSDatum)
    ShardMailboxState
    ShardTimerHandle
    ShardActionArguments
    (Anoma.PreMsg KVSKey KVSDatum Executable)
    (Anoma.PreCfg KVSKey KVSDatum Executable)
    (Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);

acquireLockGuard
  {KVSKey KVSDatum Executable ProgramState}
  {{Ord KVSKey}}
  (trigger : TimestampedTrigger
    ShardTimerHandle
    (Anoma.PreMsg KVSKey KVSDatum Executable))
  (cfg : EngineCfg ShardCfg)
  (env : ShardEnv KVSKey KVSDatum)
  : Option (ShardGuardOutput KVSKey KVSDatum Executable ProgramState) :=
  case getEngineMsgFromTimestampedTrigger trigger of
    | some mkEngineMsg@{msg := Anoma.MsgShard (ShardMsgKVSAcquireLock _)} :=
      some
        mkGuardOutput@{
          action := acquireLockActionLabel;
          args := [];
        }
    | _ := none;

processWriteGuard
  {KVSKey KVSDatum Executable ProgramState}
  {{Ord KVSKey}}
  (trigger : TimestampedTrigger
    ShardTimerHandle
    (Anoma.PreMsg KVSKey KVSDatum Executable))
  (cfg : EngineCfg ShardCfg)
  (env : ShardEnv KVSKey KVSDatum)
  : Option (ShardGuardOutput KVSKey KVSDatum Executable ProgramState) :=
  case getEngineMsgFromTimestampedTrigger trigger of
    | some mkEngineMsg@{msg := Anoma.MsgShard (ShardMsgKVSWrite _)} :=
      some
        mkGuardOutput@{
          action := processWriteActionLabel;
          args := [];
        }
    | _ := none;

processReadRequestGuard
  {KVSKey KVSDatum Executable ProgramState}
  {{Ord KVSKey}}
  (trigger : TimestampedTrigger
    ShardTimerHandle
    (Anoma.PreMsg KVSKey KVSDatum Executable))
  (cfg : EngineCfg ShardCfg)
  (env : ShardEnv KVSKey KVSDatum)
  : Option (ShardGuardOutput KVSKey KVSDatum Executable ProgramState) :=
  case getEngineMsgFromTimestampedTrigger trigger of
    | some mkEngineMsg@{msg := Anoma.MsgShard (ShardMsgKVSReadRequest _)} :=
      some
        mkGuardOutput@{
          action := processReadRequestActionLabel;
          args := [];
        }
    | _ := none;

updateSeenAllGuard
  {KVSKey KVSDatum Executable ProgramState}
  {{Ord KVSKey}}
  (trigger : TimestampedTrigger
    ShardTimerHandle
    (Anoma.PreMsg KVSKey KVSDatum Executable))
  (cfg : EngineCfg ShardCfg)
  (env : ShardEnv KVSKey KVSDatum)
  : Option (ShardGuardOutput KVSKey KVSDatum Executable ProgramState) :=
  case getEngineMsgFromTimestampedTrigger trigger of
    | some mkEngineMsg@{msg := Anoma.MsgShard (ShardMsgUpdateSeenAll _)} :=
      some
        mkGuardOutput@{
          action := updateSeenAllActionLabel;
          args := [];
        }
    | _ := none;

ShardBehaviour (KVSKey KVSDatum Executable ProgramState : Type) : Type :=
  EngineBehaviour
    ShardCfg
    (ShardLocalState KVSKey KVSDatum)
    ShardMailboxState
    ShardTimerHandle
    ShardActionArguments
    (Anoma.PreMsg KVSKey KVSDatum Executable)
    (Anoma.PreCfg KVSKey KVSDatum Executable)
    (Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);

shardBehaviour : ShardBehaviour String String ByteString String :=
  mkEngineBehaviour@{
    guards :=
      First
        [
          acquireLockGuard;
          processWriteGuard;
          processReadRequestGuard;
          updateSeenAllGuard;
        ];
  };