Juvix imports
module arch.node.engines.executor_behaviour;
import arch.node.engines.executor_messages open;
import arch.node.engines.executor_config open;
import arch.node.engines.executor_environment open;
import arch.node.engines.shard_messages open;
import prelude open;
import arch.node.types.basics open;
import arch.node.types.identities open;
import arch.node.types.messages open;
import arch.node.types.engine open;
import arch.node.types.anoma as Anoma open;
Executor Behaviour¶
Overview¶
The executor behaviour defines how it processes incoming read responses and performs state transitions to execute the transaction program.
Action arguments¶
ExecutorActionArguments¶
syntax alias ExecutorActionArguments := Unit;
Actions¶
Auxiliary Juvix code
ExecutorAction¶
ExecutorAction : Type :=
  Action
    ExecutorCfg
    ExecutorLocalState
    ExecutorMailboxState
    ExecutorTimerHandle
    ExecutorActionArguments
    Anoma.Msg
    Anoma.Cfg
    Anoma.Env;
ExecutorActionInput¶
ExecutorActionInput : Type :=
  ActionInput
    ExecutorCfg
    ExecutorLocalState
    ExecutorMailboxState
    ExecutorTimerHandle
    ExecutorActionArguments
    Anoma.Msg;
ExecutorActionEffect¶
ExecutorActionEffect : Type :=
  ActionEffect
    ExecutorLocalState
    ExecutorMailboxState
    ExecutorTimerHandle
    Anoma.Msg
    Anoma.Cfg
    Anoma.Env;
ExecutorActionExec¶
ExecutorActionExec : Type :=
  ActionExec
    ExecutorCfg
    ExecutorLocalState
    ExecutorMailboxState
    ExecutorTimerHandle
    ExecutorActionArguments
    Anoma.Msg
    Anoma.Cfg
    Anoma.Env;
processReadAction¶
Process a read response and execute the next program step.
- State update
- Updates the program state with executed step results and tracks completed reads/writes
- Messages to be sent
- 
- Read/Write messages to shards based on program outputs
 
- 
- Notification messages for stale locks if program halts
 
- 
- ExecutorFinished message if program halts
 
- Engines to be spawned
- No engines are created by this action.
- Timer updates
- No timers are set or cancelled.
processReadAction (input : ExecutorActionInput) : Option ExecutorActionEffect :=
  let
    cfg := EngineCfg.cfg (ActionInput.cfg input);
    env := ActionInput.env input;
    trigger := ActionInput.trigger input;
  in case getMsgFromTimestampedTrigger trigger of
       | some (MsgShard (ShardMsgKVSRead mkKVSReadMsg@{
                                           key := readKey;
                                           data := readValue;
                                         })) :=
         let
           envelope
             (target : EngineID) (msg : Anoma.Msg) : EngineMsg Anoma.Msg :=
             mkEngineMsg@{
               sender := getEngineIDFromEngineCfg (ActionInput.cfg input);
               target := target;
               mailbox := some 0;
               msg := msg;
             };
           local := EngineEnv.localState env;
           reads := ExecutorLocalState.completed_reads local;
           writes := ExecutorLocalState.completed_writes local;
           staleReadMsg (key : KVSKey) : EngineMsg Anoma.Msg :=
             envelope
               (keyToShard key)
               (MsgShard
                 (ShardMsgKVSReadRequest
                   mkKVSReadRequestMsg@{
                     timestamp := ExecutorCfg.timestamp cfg;
                     key := key;
                     actual := false;
                   }));
           staleWriteMsg (key : KVSKey) : EngineMsg Anoma.Msg :=
             envelope
               (keyToShard key)
               (MsgShard
                 (ShardMsgKVSWrite
                   mkKVSWriteMsg@{
                     timestamp := ExecutorCfg.timestamp cfg;
                     key := key;
                     datum := none;
                   }));
           staleReadLocations :=
             Set.difference
               (ExecutorCfg.lazy_read_keys cfg)
               (Set.fromList (Map.keys reads));
           readStaleMsgs := map staleReadMsg (Set.toList staleReadLocations);
           staleWriteLocations :=
             Set.difference
               (ExecutorCfg.may_write_keys cfg)
               (Set.fromList (Map.keys writes));
           writeStaleMsgs := map staleWriteMsg (Set.toList staleWriteLocations);
           staleMsgs := readStaleMsgs ++ writeStaleMsgs;
           stepInput := mkPair readKey readValue;
           stepResult :=
             executeStep
               (ExecutorCfg.executable cfg)
               (ExecutorLocalState.program_state local)
               stepInput;
         in case stepResult of {
              | error err :=
                let
                  local := EngineEnv.localState env;
                  finishedMsg :=
                    envelope
                      (ExecutorCfg.issuer cfg)
                      (MsgExecutor
                        (ExecutorMsgExecutorFinished
                          mkExecutorFinishedMsg@{
                            success := false;
                            values_read :=
                              mkPair readKey readValue :: Map.toList reads;
                            values_written := Map.toList writes;
                          }));
                in some
                  mkActionEffect@{
                    env := env;
                    msgs := finishedMsg :: staleMsgs;
                    timers := [];
                    engines := [];
                  }
              | ok (mkPair program' outputs) :=
                let
                  accReads
                    (key : KVSKey)
                    (msgs : List (EngineMsg Anoma.Msg))
                    : List (EngineMsg Anoma.Msg) :=
                    let
                      msg :=
                        envelope
                          (keyToShard key)
                          (MsgShard
                            (ShardMsgKVSReadRequest
                              mkKVSReadRequestMsg@{
                                timestamp := ExecutorCfg.timestamp cfg;
                                key := key;
                                actual := true;
                              }));
                    in case
                         or
                           (Set.isMember key (ExecutorCfg.lazy_read_keys cfg))
                           (Set.isMember key (ExecutorCfg.eager_read_keys cfg))
                       of
                         | true := msg :: msgs
                         | false := msgs;
                  accWrites
                    (key : KVSKey)
                    (value : KVSDatum)
                    (msgs : List (EngineMsg Anoma.Msg))
                    : List (EngineMsg Anoma.Msg) :=
                    let
                      msg :=
                        envelope
                          (keyToShard key)
                          (MsgShard
                            (ShardMsgKVSWrite
                              mkKVSWriteMsg@{
                                timestamp := ExecutorCfg.timestamp cfg;
                                key := key;
                                datum := some value;
                              }));
                    in case
                         or
                           (Set.isMember key (ExecutorCfg.will_write_keys cfg))
                           (Set.isMember key (ExecutorCfg.may_write_keys cfg))
                       of
                         | true := msg :: msgs
                         | false := msgs;
                  sendHelper
                    (acc : Pair ExecutorLocalState (List (EngineMsg Anoma.Msg)))
                    (out : Either KVSKey (Pair KVSKey KVSDatum))
                    : Pair ExecutorLocalState (List (EngineMsg Anoma.Msg)) :=
                    let
                      state := fst acc;
                      msgs := snd acc;
                    in case out of
                         | left key := mkPair state (accReads key msgs)
                         | right (mkPair key value) :=
                           let
                             newState :=
                               state@ExecutorLocalState{completed_writes := Map.insert
                                 key
                                 value
                                 (ExecutorLocalState.completed_writes state)};
                           in mkPair newState (accWrites key value msgs);
                  initial :=
                    mkPair
                      local@ExecutorLocalState{
                        program_state := program';
                        completed_reads := Map.insert
                          readKey
                          readValue
                          (ExecutorLocalState.completed_reads local);
                      }
                      [];
                  final := foldl sendHelper initial outputs;
                  newLocalState := fst final;
                  msgList := snd final;
                  newEnv := env@EngineEnv{localState := newLocalState};
                in case ProgramState.halted program' of {
                     | false :=
                       some
                         mkActionEffect@{
                           env := newEnv;
                           msgs := msgList;
                           timers := [];
                           engines := [];
                         }
                     | true :=
                       let
                         finishedMsg :=
                           envelope
                             (ExecutorCfg.issuer cfg)
                             (MsgExecutor
                               (ExecutorMsgExecutorFinished
                                 mkExecutorFinishedMsg@{
                                   success := true;
                                   values_read := Map.toList reads;
                                   values_written := Map.toList writes;
                                 }));
                       in some
                         mkActionEffect@{
                           env := newEnv;
                           msgs := msgList ++ finishedMsg :: staleMsgs;
                           timers := [];
                           engines := [];
                         }
                   }
            }
       | _ := none;
Action Labels¶
processReadActionLabel : ExecutorActionExec := Seq [processReadAction];
Guards¶
Auxiliary Juvix code
ExecutorGuard¶
ExecutorGuard : Type :=
  Guard
    ExecutorCfg
    ExecutorLocalState
    ExecutorMailboxState
    ExecutorTimerHandle
    ExecutorActionArguments
    Anoma.Msg
    Anoma.Cfg
    Anoma.Env;
ExecutorGuardOutput¶
ExecutorGuardOutput : Type :=
  GuardOutput
    ExecutorCfg
    ExecutorLocalState
    ExecutorMailboxState
    ExecutorTimerHandle
    ExecutorActionArguments
    Anoma.Msg
    Anoma.Cfg
    Anoma.Env;
ExecutorGuardEval¶
ExecutorGuardEval : Type :=
  GuardEval
    ExecutorCfg
    ExecutorLocalState
    ExecutorMailboxState
    ExecutorTimerHandle
    ExecutorActionArguments
    Anoma.Msg
    Anoma.Cfg
    Anoma.Env;
processReadGuard¶
Guard for processing read responses.
processReadGuard
  (trigger : TimestampedTrigger ExecutorTimerHandle Anoma.Msg)
  (cfg : EngineCfg ExecutorCfg)
  (env : ExecutorEnv)
  : Option ExecutorGuardOutput :=
  case getEngineMsgFromTimestampedTrigger trigger of
    | some mkEngineMsg@{
             msg := MsgShard (ShardMsgKVSRead mkKVSReadMsg@{
                                                timestamp := timestamp;
                                                key := _;
                                                data := _;
                                              });
           } :=
      case timestamp == ExecutorCfg.timestamp (EngineCfg.cfg cfg) of {
        | true :=
          some
            mkGuardOutput@{
              action := processReadActionLabel;
              args := unit;
            }
        | false := none
      }
    | _ := none;
The Executor Behaviour¶
ExecutorBehaviour¶
ExecutorBehaviour : Type :=
  EngineBehaviour
    ExecutorCfg
    ExecutorLocalState
    ExecutorMailboxState
    ExecutorTimerHandle
    ExecutorActionArguments
    Anoma.Msg
    Anoma.Cfg
    Anoma.Env;
Instantiation¶
executorBehaviour : ExecutorBehaviour :=
  mkEngineBehaviour@{
    guards := First [processReadGuard];
  };
Executor Action Flowcharts¶
processRead Flowchart¶
flowchart TD
  subgraph C[Conditions]
    CMsg>ShardMsgKVSRead]
  end
  G(processReadGuard)
  A(processReadAction)
  C --> G -- *processReadActionLabel* --> A --> E
  subgraph E[Effects]
    EEnv[(Update program state and tracked reads/writes)]
    EStep[(Execute step)]
    EMsg>Write messages to shards]
    EFin>ExecutorFinished if halted]
  endprocessRead flowchart showing read handling and execution steps