Juvix imports
module arch.node.engines.shard_behaviour;
import arch.node.engines.shard_messages open;
import arch.node.engines.shard_config open;
import arch.node.engines.shard_environment open;
import Stdlib.Data.Nat open;
import Stdlib.Data.List as List;
import prelude open;
import arch.node.types.basics open;
import arch.node.types.identities open;
import arch.node.types.messages open;
import arch.node.types.engine open;
import arch.node.types.anoma as Anoma open;
Shard Behaviour¶
Action arguments¶
ShardActionArgument¶
type ShardActionArgument := | ShardActionArgumentReplyTo EngineID;
ShardActionArguments¶
ShardActionArguments : Type := List ShardActionArgument;
Helper Functions¶
maximumBy {A : Type} (f : A -> Nat) (lst : List A) : Option A :=
  let
    maxHelper :=
      \{curr acc :=
        case acc of
          | none := some curr
          | some maxVal :=
            case f curr > f maxVal of
              | true := some curr
              | false := some maxVal};
  in foldr maxHelper none lst;
findMostRecentWrite
  (dag : DAGStructure)
  (key : KVSKey)
  (timestamp : TxFingerprint)
  : Option KVSDatum :=
  case Map.lookup key (DAGStructure.keyAccesses dag) of
    | none := none
    | some timestampMap :=
      let
        validEntries :=
          List.filter
            \{entry :=
              fst entry < timestamp
                && case KeyAccess.writeStatus (snd entry) of {
                     | some writeStatus :=
                       not
                         (WriteStatus.mayWrite writeStatus
                           && isNone (WriteStatus.data writeStatus))
                     | none := false
                   }}
            (Map.toList timestampMap);
      in case maximumBy \{entry := fst entry} validEntries of
           | some (mkPair _ access) :=
             case KeyAccess.writeStatus access of {
               | some writeStatus := WriteStatus.data writeStatus
               | none := none
             }
           | none := none;
-- add read without prior lock
addReadAccess
  (dag : DAGStructure)
  (key : KVSKey)
  (timestamp : TxFingerprint)
  (readStatus : ReadStatus)
  : DAGStructure :=
  let
    keyMap :=
      case Map.lookup key (DAGStructure.keyAccesses dag) of
        | none := Map.empty
        | some m := m;
    existingAccess :=
      case Map.lookup timestamp keyMap of
        | none :=
          mkKeyAccess@{
            readStatus := none;
            writeStatus := none;
          }
        | some access := access;
    newAccess := existingAccess@KeyAccess{readStatus := some readStatus};
    newKeyMap := Map.insert timestamp newAccess keyMap;
    newKeyAccesses := Map.insert key newKeyMap (DAGStructure.keyAccesses dag);
  in dag@DAGStructure{keyAccesses := newKeyAccesses};
-- add write without prior lock
addWriteAccess
  (dag : DAGStructure)
  (key : KVSKey)
  (timestamp : TxFingerprint)
  (writeStatus : WriteStatus)
  : DAGStructure :=
  let
    keyMap :=
      case Map.lookup key (DAGStructure.keyAccesses dag) of
        | none := Map.empty
        | some m := m;
    existingAccess :=
      case Map.lookup timestamp keyMap of
        | none :=
          mkKeyAccess@{
            readStatus := none;
            writeStatus := none;
          }
        | some access := access;
    newAccess := existingAccess@KeyAccess{writeStatus := some writeStatus};
    newKeyMap := Map.insert timestamp newAccess keyMap;
    newKeyAccesses := Map.insert key newKeyMap (DAGStructure.keyAccesses dag);
  in dag@DAGStructure{keyAccesses := newKeyAccesses};
-- Replaces if read lock exists
replaceReadAccess
  (dag : DAGStructure)
  (key : KVSKey)
  (timestamp : TxFingerprint)
  : Option DAGStructure :=
  let
    keyMap :=
      case Map.lookup key (DAGStructure.keyAccesses dag) of
        | none := Map.empty
        | some m := m;
    access :=
      case Map.lookup timestamp keyMap of
        | none := none
        | some a := some a;
  in case access of
       | some a :=
         case KeyAccess.readStatus a of {
           | none := none
           | some rs :=
             let
               updatedReadStatus := rs@ReadStatus{hasBeenRead := true};
               updatedAccess :=
                 a@KeyAccess{readStatus := some updatedReadStatus};
               updatedKeyMap := Map.insert timestamp updatedAccess keyMap;
               updatedKeyAccesses :=
                 Map.insert key updatedKeyMap (DAGStructure.keyAccesses dag);
             in some dag@DAGStructure{keyAccesses := updatedKeyAccesses}
         }
       | none := none;
-- Replaces if write lock exists
replaceWriteAccess
  (dag : DAGStructure)
  (key : KVSKey)
  (timestamp : TxFingerprint)
  (newData : Option KVSDatum)
  : Option DAGStructure :=
  let
    keyMap :=
      case Map.lookup key (DAGStructure.keyAccesses dag) of
        | none := Map.empty
        | some m := m;
  in case Map.lookup timestamp keyMap of
       | some a :=
         case KeyAccess.writeStatus a of {
           | none := none
           | some ws :=
             case isNone newData && not (WriteStatus.mayWrite ws) of {
               | true := none
               | false :=
                 let
                   data :=
                     case newData of
                       | none := WriteStatus.data ws
                       | some dat := some dat;
                   updatedAccess :=
                     a@KeyAccess{writeStatus := some
                       ws@WriteStatus{data := data}};
                   updatedKeyMap := Map.insert timestamp updatedAccess keyMap;
                   updatedKeyAccesses :=
                     Map.insert
                       key
                       updatedKeyMap
                       (DAGStructure.keyAccesses dag);
                 in some dag@DAGStructure{keyAccesses := updatedKeyAccesses}
             }
         }
       | none := none;
generateReadMsg
  (sender : EngineID)
  (key : KVSKey)
  (timestamp : TxFingerprint)
  (data : KVSDatum)
  (executor : EngineID)
  : EngineMsg Msg :=
  mkEngineMsg@{
    sender := sender;
    target := executor;
    mailbox := some 0;
    msg :=
      Anoma.MsgShard
        (ShardMsgKVSRead
          mkKVSReadMsg@{
            timestamp := timestamp;
            key := key;
            data := data;
          });
  };
execEagerReadsAtTime
  (sender : EngineID)
  (dag : DAGStructure)
  (key : KVSKey)
  (timestamp : TxFingerprint)
  (access : KeyAccess)
  : Option (Pair DAGStructure (EngineMsg Msg)) :=
  case KeyAccess.readStatus access of
    | some readStatus :=
      case
        ReadStatus.isEager readStatus && not (ReadStatus.hasBeenRead readStatus)
      of {
        | true :=
          case
            timestamp < DAGStructure.heardAllWrites dag
              && timestamp >= DAGStructure.heardAllReads dag
          of {
            | true :=
              case findMostRecentWrite dag key timestamp of {
                | some data :=
                  let
                    newReadStatus := readStatus@ReadStatus{hasBeenRead := true};
                    newDag := addReadAccess dag key timestamp newReadStatus;
                    msg :=
                      generateReadMsg
                        sender
                        key
                        timestamp
                        data
                        (ReadStatus.executor readStatus);
                  in some (mkPair newDag msg)
                | none := none
              }
            | false := none
          }
        | false := none
      }
    | none := none;
execEagerReadsAtKey
  (sender : EngineID)
  (dag : DAGStructure)
  (key : KVSKey)
  (timestampMap : Map TxFingerprint KeyAccess)
  : Pair DAGStructure (List (EngineMsg Msg)) :=
  let
    processTimestamp :=
      \{k v acc :=
        case acc of
          | mkPair currDag msgs :=
            case execEagerReadsAtTime sender currDag key k v of
              | some processed := mkPair (fst processed) (snd processed :: msgs)
              | none := acc};
  in Map.foldr processTimestamp (mkPair dag []) timestampMap;
execEagerReads
  (sender : EngineID)
  (dag : DAGStructure)
  : Pair DAGStructure (List (EngineMsg Msg)) :=
  let
    processKey :=
      \{k v acc :=
        case acc of
          | mkPair currDag msgs :=
            let
              processed := execEagerReadsAtKey sender currDag k v;
            in mkPair (fst processed) (msgs ++ snd processed)};
  in Map.foldr processKey (mkPair dag []) (DAGStructure.keyAccesses dag);
Actions¶
Auxiliary Juvix code
ShardAction¶
ShardAction : Type :=
  Action
    ShardCfg
    ShardLocalState
    ShardMailboxState
    ShardTimerHandle
    ShardActionArguments
    Anoma.Msg
    Anoma.Cfg
    Anoma.Env;
ShardActionInput¶
ShardActionInput : Type :=
  ActionInput
    ShardCfg
    ShardLocalState
    ShardMailboxState
    ShardTimerHandle
    ShardActionArguments
    Anoma.Msg;
ShardActionEffect¶
ShardActionEffect : Type :=
  ActionEffect
    ShardLocalState
    ShardMailboxState
    ShardTimerHandle
    Anoma.Msg
    Anoma.Cfg
    Anoma.Env;
ShardActionExec¶
ShardActionExec : Type :=
  ActionExec
    ShardCfg
    ShardLocalState
    ShardMailboxState
    ShardTimerHandle
    ShardActionArguments
    Anoma.Msg
    Anoma.Cfg
    Anoma.Env;
acquireLockAction¶
Process lock acquisition request and send confirmation.
- State update
- Update DAG with new read/write accesses.
- Messages to be sent
- KVSLockAcquired message to worker.
acquireLockAction (input : ShardActionInput) : Option ShardActionEffect :=
  let
    cfg := ActionInput.cfg input;
    env := ActionInput.env input;
    local := EngineEnv.localState env;
    trigger := ActionInput.trigger input;
  in case getEngineMsgFromTimestampedTrigger trigger of
       | some mkEngineMsg@{
                msg := Anoma.MsgShard (ShardMsgKVSAcquireLock lockMsg);
              } :=
         let
           addEagerReadAccesses :=
             \{key dag :=
               let
                 readStatus :=
                   mkReadStatus@{
                     hasBeenRead := false;
                     isEager := true;
                     executor := KVSAcquireLockMsg.executor lockMsg;
                   };
               in addReadAccess
                 dag
                 key
                 (KVSAcquireLockMsg.timestamp lockMsg)
                 readStatus};
           addLazyReadAccesses :=
             \{key dag :=
               let
                 readStatus :=
                   mkReadStatus@{
                     hasBeenRead := false;
                     isEager := false;
                     executor := KVSAcquireLockMsg.executor lockMsg;
                   };
               in addReadAccess
                 dag
                 key
                 (KVSAcquireLockMsg.timestamp lockMsg)
                 readStatus};
           addWillWriteAccesses :=
             \{key dag :=
               let
                 writeStatus :=
                   mkWriteStatus@{
                     data := none;
                     mayWrite := false;
                   };
               in addWriteAccess
                 dag
                 key
                 (KVSAcquireLockMsg.timestamp lockMsg)
                 writeStatus};
           addMayWriteAccesses :=
             \{key dag :=
               let
                 writeStatus :=
                   mkWriteStatus@{
                     data := none;
                     mayWrite := true;
                   };
               in addWriteAccess
                 dag
                 key
                 (KVSAcquireLockMsg.timestamp lockMsg)
                 writeStatus};
           dagWithEagerReads :=
             Set.foldr
               addEagerReadAccesses
               (ShardLocalState.dagStructure local)
               (KVSAcquireLockMsg.eager_read_keys lockMsg);
           dagWithAllReads :=
             Set.foldr
               addLazyReadAccesses
               dagWithEagerReads
               (KVSAcquireLockMsg.lazy_read_keys lockMsg);
           dagWithWillWrites :=
             Set.foldr
               addWillWriteAccesses
               dagWithAllReads
               (KVSAcquireLockMsg.will_write_keys lockMsg);
           dagWithAllWrites :=
             Set.foldr
               addMayWriteAccesses
               dagWithWillWrites
               (KVSAcquireLockMsg.may_write_keys lockMsg);
           propagationResult :=
             execEagerReads (getEngineIDFromEngineCfg cfg) dagWithAllWrites;
           newLocal :=
             local@ShardLocalState{dagStructure := fst propagationResult};
           newEnv := env@EngineEnv{localState := newLocal};
         in some
           mkActionEffect@{
             env := newEnv;
             msgs :=
               mkEngineMsg@{
                 sender := getEngineIDFromEngineCfg (ActionInput.cfg input);
                 target := KVSAcquireLockMsg.worker lockMsg;
                 mailbox := some 0;
                 msg :=
                   Anoma.MsgShard
                     (ShardMsgKVSLockAcquired
                       mkKVSLockAcquiredMsg@{
                         timestamp := KVSAcquireLockMsg.timestamp lockMsg;
                       });
               }
                 :: snd propagationResult;
             timers := [];
             engines := [];
           }
       | _ := none;
processWriteAction¶
Process write request and potentially trigger eager reads.
- State update
- Update DAG with write data and trigger eager reads.
- Messages to be sent
- KVSRead messages if eligible eager reads are found.
processWriteAction (input : ShardActionInput) : Option ShardActionEffect :=
  let
    cfg := ActionInput.cfg input;
    env := ActionInput.env input;
    local := EngineEnv.localState env;
    trigger := ActionInput.trigger input;
  in case getEngineMsgFromTimestampedTrigger trigger of
       | some mkEngineMsg@{msg := Anoma.MsgShard (ShardMsgKVSWrite writeMsg)} :=
         let
           dag := ShardLocalState.dagStructure local;
           key := KVSWriteMsg.key writeMsg;
           timestamp := KVSWriteMsg.timestamp writeMsg;
         in case
              replaceWriteAccess dag key timestamp (KVSWriteMsg.datum writeMsg)
            of {
              | some updatedDag :=
                let
                  propagationResult :=
                    execEagerReads (getEngineIDFromEngineCfg cfg) updatedDag;
                  newLocal :=
                    local@ShardLocalState{dagStructure := fst
                      propagationResult};
                  newEnv := env@EngineEnv{localState := newLocal};
                  readMsgs := snd propagationResult;
                in some
                  mkActionEffect@{
                    env := newEnv;
                    msgs := readMsgs;
                    timers := [];
                    engines := [];
                  }
              | none := none
            }
       | _ := none;
processReadRequestAction¶
Process read request and potentially send read response.
- State update
- Update DAG with read request status.
- Messages to be sent
- KVSRead message if read data is available.
processReadRequestAction
  (input : ShardActionInput) : Option ShardActionEffect :=
  let
    cfg := ActionInput.cfg input;
    env := ActionInput.env input;
    local := EngineEnv.localState env;
    trigger := ActionInput.trigger input;
  in case getEngineMsgFromTimestampedTrigger trigger of
       | some mkEngineMsg@{
                sender := sender;
                msg := Anoma.MsgShard (ShardMsgKVSReadRequest readReqMsg);
              } :=
         let
           dag := ShardLocalState.dagStructure local;
           key := KVSReadRequestMsg.key readReqMsg;
           timestamp := KVSReadRequestMsg.timestamp readReqMsg;
           actual := KVSReadRequestMsg.actual readReqMsg;
         in case timestamp >= DAGStructure.heardAllReads dag of {
              | false := none
              | true :=
                case replaceReadAccess dag key timestamp of {
                  | none := none
                  | some updatedDag :=
                    case actual of {
                      | false :=
                        let
                          newLocal :=
                            local@ShardLocalState{dagStructure := updatedDag};
                          newEnv := env@EngineEnv{localState := newLocal};
                        in some
                          mkActionEffect@{
                            env := newEnv;
                            msgs := [];
                            timers := [];
                            engines := [];
                          }
                      | true :=
                        case findMostRecentWrite updatedDag key timestamp of {
                          | none := none
                          | some data :=
                            let
                              readMsg :=
                                mkEngineMsg@{
                                  sender := getEngineIDFromEngineCfg cfg;
                                  target := sender;
                                  mailbox := some 0;
                                  msg :=
                                    Anoma.MsgShard
                                      (ShardMsgKVSRead
                                        mkKVSReadMsg@{
                                          timestamp := timestamp;
                                          key := key;
                                          data := data;
                                        });
                                };
                              newLocal :=
                                local@ShardLocalState{dagStructure := updatedDag};
                              newEnv := env@EngineEnv{localState := newLocal};
                            in some
                              mkActionEffect@{
                                env := newEnv;
                                msgs := [readMsg];
                                timers := [];
                                engines := [];
                              }
                        }
                    }
                }
            }
       | _ := none;
updateSeenAllAction¶
Process seen-all update and potentially trigger eager reads.
- State update
- Update DAG barriers and trigger eager reads.
- Messages to be sent
- KVSRead messages if eligible eager reads are found.
updateSeenAllAction (input : ShardActionInput) : Option ShardActionEffect :=
  let
    cfg := ActionInput.cfg input;
    env := ActionInput.env input;
    local := EngineEnv.localState env;
    trigger := ActionInput.trigger input;
  in case getEngineMsgFromTimestampedTrigger trigger of
       | some mkEngineMsg@{
                msg := Anoma.MsgShard (ShardMsgUpdateSeenAll updateMsg);
              } :=
         let
           oldDag := ShardLocalState.dagStructure local;
           newDag :=
             case UpdateSeenAllMsg.write updateMsg of
               | true :=
                 oldDag@DAGStructure{heardAllWrites := UpdateSeenAllMsg.timestamp
                   updateMsg}
               | false :=
                 oldDag@DAGStructure{heardAllReads := UpdateSeenAllMsg.timestamp
                   updateMsg};
           propagationResult :=
             case UpdateSeenAllMsg.write updateMsg of
               | true := execEagerReads (getEngineIDFromEngineCfg cfg) newDag
               | false := mkPair newDag [];
           newLocal :=
             local@ShardLocalState{dagStructure := fst propagationResult};
           newEnv := env@EngineEnv{localState := newLocal};
           readMsgs := snd propagationResult;
         in some
           mkActionEffect@{
             env := newEnv;
             msgs := readMsgs;
             timers := [];
             engines := [];
           }
       | _ := none;
Action Labels¶
acquireLockActionLabel¶
acquireLockActionLabel : ShardActionExec := Seq [acquireLockAction];
processWriteActionLabel¶
processWriteActionLabel : ShardActionExec := Seq [processWriteAction];
processReadRequestActionLabel¶
processReadRequestActionLabel : ShardActionExec :=
  Seq [processReadRequestAction];
updateSeenAllActionLabel¶
updateSeenAllActionLabel : ShardActionExec := Seq [updateSeenAllAction];
Guards¶
Auxiliary Juvix code
ShardGuard¶
ShardGuard : Type :=
  Guard
    ShardCfg
    ShardLocalState
    ShardMailboxState
    ShardTimerHandle
    ShardActionArguments
    Anoma.Msg
    Anoma.Cfg
    Anoma.Env;
ShardGuardOutput¶
ShardGuardOutput : Type :=
  GuardOutput
    ShardCfg
    ShardLocalState
    ShardMailboxState
    ShardTimerHandle
    ShardActionArguments
    Anoma.Msg
    Anoma.Cfg
    Anoma.Env;
ShardGuardEval¶
ShardGuardEval : Type :=
  GuardEval
    ShardCfg
    ShardLocalState
    ShardMailboxState
    ShardTimerHandle
    ShardActionArguments
    Anoma.Msg
    Anoma.Cfg
    Anoma.Env;
acquireLockGuard¶
- Condition
- Message type is ShardMsgKVSAcquireLock.
acquireLockGuard
  (trigger : TimestampedTrigger ShardTimerHandle Anoma.Msg)
  (cfg : EngineCfg ShardCfg)
  (env : ShardEnv)
  : Option ShardGuardOutput :=
  case getEngineMsgFromTimestampedTrigger trigger of
    | some mkEngineMsg@{msg := Anoma.MsgShard (ShardMsgKVSAcquireLock _)} :=
      some
        mkGuardOutput@{
          action := acquireLockActionLabel;
          args := [];
        }
    | _ := none;
processWriteGuard¶
- Condition
- Message type is ShardMsgKVSWrite.
processWriteGuard
  (trigger : TimestampedTrigger ShardTimerHandle Anoma.Msg)
  (cfg : EngineCfg ShardCfg)
  (env : ShardEnv)
  : Option ShardGuardOutput :=
  case getEngineMsgFromTimestampedTrigger trigger of
    | some mkEngineMsg@{msg := Anoma.MsgShard (ShardMsgKVSWrite _)} :=
      some
        mkGuardOutput@{
          action := processWriteActionLabel;
          args := [];
        }
    | _ := none;
processReadRequestGuard¶
- Condition
- Message type is ShardMsgKVSReadRequest.
processReadRequestGuard
  (trigger : TimestampedTrigger ShardTimerHandle Anoma.Msg)
  (cfg : EngineCfg ShardCfg)
  (env : ShardEnv)
  : Option ShardGuardOutput :=
  case getEngineMsgFromTimestampedTrigger trigger of
    | some mkEngineMsg@{msg := Anoma.MsgShard (ShardMsgKVSReadRequest _)} :=
      some
        mkGuardOutput@{
          action := processReadRequestActionLabel;
          args := [];
        }
    | _ := none;
updateSeenAllGuard¶
- Condition
- Message type is ShardMsgUpdateSeenAll.
updateSeenAllGuard
  (trigger : TimestampedTrigger ShardTimerHandle Anoma.Msg)
  (cfg : EngineCfg ShardCfg)
  (env : ShardEnv)
  : Option ShardGuardOutput :=
  case getEngineMsgFromTimestampedTrigger trigger of
    | some mkEngineMsg@{msg := Anoma.MsgShard (ShardMsgUpdateSeenAll _)} :=
      some
        mkGuardOutput@{
          action := updateSeenAllActionLabel;
          args := [];
        }
    | _ := none;
The Shard Behaviour¶
ShardBehaviour¶
ShardBehaviour : Type :=
  EngineBehaviour
    ShardCfg
    ShardLocalState
    ShardMailboxState
    ShardTimerHandle
    ShardActionArguments
    Anoma.Msg
    Anoma.Cfg
    Anoma.Env;
Instantiation¶
shardBehaviour : ShardBehaviour :=
  mkEngineBehaviour@{
    guards :=
      First
        [
          acquireLockGuard;
          processWriteGuard;
          processReadRequestGuard;
          updateSeenAllGuard;
        ];
  };
Shard Action Flowchart¶
acquireLock Flowchart¶
flowchart TD
  subgraph C[Conditions]
    CMsg>ShardMsgKVSAcquireLock]
  end
  G(acquireLockGuard)
  A(acquireLockAction)
  C --> G -- *acquireLockActionLabel* --> A --> E
  subgraph E[Effects]
    EEnv[(Update DAG with locks)]
    EMsg>KVSLockAcquired]
  endacquireLock flowchart
processWrite Flowchart¶
flowchart TD
  subgraph C[Conditions]
    CMsg>ShardMsgKVSWrite]
  end
  G(processWriteGuard)
  A(processWriteAction)
  C --> G -- *processWriteActionLabel* --> A --> E
  subgraph E[Effects]
    EEnv[(Update DAG)]
    EMsg>KVSRead messages]
  endprocessWrite flowchart
processReadRequest Flowchart¶
flowchart TD
  subgraph C[Conditions]
    CMsg>ShardMsgKVSReadRequest]
  end
  G(processReadRequestGuard)
  A(processReadRequestAction)
  C --> G -- *processReadRequestActionLabel* --> A --> E
  subgraph E[Effects]
    EEnv[(Update read status)]
    EMsg>KVSRead message]
  endprocessReadRequest flowchart
updateSeenAll Flowchart¶
flowchart TD
  subgraph C[Conditions]
    CMsg>ShardMsgUpdateSeenAll]
  end
  G(updateSeenAllGuard)
  A(updateSeenAllAction)
  C --> G -- *updateSeenAllActionLabel* --> A --> E
  subgraph E[Effects]
    EEnv[(Update barriers)]
    EMsg>KVSRead messages]
  endupdateSeenAll flowchart