Juvix imports
module arch.node.engines.executor_behaviour;
import arch.node.engines.executor_messages open;
import arch.node.engines.executor_config open;
import arch.node.engines.executor_environment open;
import arch.node.engines.shard_messages open;
import prelude open;
import arch.node.types.basics open;
import arch.node.types.identities open;
import arch.node.types.messages open;
import arch.node.types.engine open;
import arch.node.types.anoma as Anoma open;
Executor Behaviour¶
Overview¶
The executor behaviour defines how it processes incoming read responses and performs state transitions to execute the transaction program.
Executor Action Flowcharts¶
processRead Flowchart¶
flowchart TD
    Start([Receive Message]) --> Msg[ShardMsgKVSRead<br/>key: KVSKey<br/>data: KVSDatum]
    subgraph Guard["processReadGuard"]
        Msg --> CheckMsg{Is message<br/>ShardMsgKVSRead?}
        CheckMsg -->|No| Reject([Reject Message])
        CheckMsg -->|Yes| ValidTS{Matching<br/>timestamp?}
        ValidTS -->|No| Reject
        ValidTS -->|Yes| ActionEntry[Enter Action Phase]
    end
    ActionEntry --> Action
    subgraph Action["processReadAction"]
        direction TB
        ComputeStale["Compute stale locks:<br/>1. Find uncompleted reads<br/>2. Find uncompleted writes<br/>3. Create cleanup messages"]
        ExecStep[Execute program step<br/>with read data]
        ExecStep --> StepResult{Step<br/>Result?}
        StepResult -->|Error| ErrBranch[Create error response<br/>with read/write history]
        StepResult -->|Success| SuccessBranch[Update program state<br/>Track completed read]
        SuccessBranch --> CheckHalt{Program<br/>Halted?}
        CheckHalt -->|Yes| FinishOk[Create success response<br/>with read/write history]
        CheckHalt -->|No| GenMsgs[Generate messages for<br/>new reads/writes]
    end
    ErrBranch --> AddStaleErr[Add stale lock<br/>cleanup messages]
    FinishOk --> AddStaleOk[Add stale lock<br/>cleanup messages]
    GenMsgs --> Parse[Parse step outputs]
    subgraph ProcessOutputs["Process Step Outputs"]
        Parse --> CheckType{Output<br/>Type?}
        CheckType -->|Read Request| ReadBranch[Create KVSReadRequest<br/>if key in read sets]
        CheckType -->|Write Request| WriteBranch[Create KVSWrite<br/>if key in write sets]
        ReadBranch --> ValidRead{Key in<br/>read sets?}
        ValidRead -->|Yes| AddRead[Add to read<br/>message list]
        ValidRead -->|No| SkipRead[Skip invalid<br/>read request]
        WriteBranch --> ValidWrite{Key in<br/>write sets?}
        ValidWrite -->|Yes| AddWrite[Add to write<br/>message list]
        ValidWrite -->|No| SkipWrite[Skip invalid<br/>write request]
    end
    AddRead --> Collect[Collect all<br/>generated messages]
    AddWrite --> Collect
    SkipRead --> Collect
    SkipWrite --> Collect
    subgraph StaleComputation["Stale Lock Processing"]
        ComputeStale --> FindReads[Find difference between<br/>lazy_read_keys and<br/>completed reads]
        ComputeStale --> FindWrites[Find difference between<br/>may_write_keys and<br/>completed writes]
        FindReads --> CreateRead[Create cleanup read<br/>messages with actual=false]
        FindWrites --> CreateWrite[Create cleanup write<br/>messages with datum=none]
        CreateRead & CreateWrite --> CombineMsgs[Combine cleanup messages]
    end
    CombineMsgs -.-> AddStaleErr
    CombineMsgs -.-> AddStaleOk
    AddStaleErr --> NotifyFail[Send ExecutorFinished<br/>with error + cleanup messages]
    AddStaleOk --> NotifySuccess[Send ExecutorFinished<br/>with success + cleanup messages]
    Collect --> SendMsgs[Send generated<br/>read/write messages]
    NotifyFail & NotifySuccess & SendMsgs --> End([Complete])
processRead flowchart showing read handling and execution steps
Explanation¶
- 
Initial Request Processing
- A client sends a 
ShardMsgKVSReadmessage containing:key: The state key that was read.data: The actual data value for that key.- A timestamp that identifies this execution context.
 
 - The message first passes through the guard phase which:
- Validates the message is a 
ShardMsgKVSRead. - Ensures the timestamp matches this executor's configured timestamp.
 - Rejects messages that fail either check.
 - Routes valid messages to the action phase.
 
 - Validates the message is a 
 
 - A client sends a 
 - 
Program Execution
- The action phase begins by executing the next program step:
- Takes the current program state as context.
 - Provides the newly read key-value pair as input.
 - Produces either an error or a new program state with outputs.
 
 - On error:
- Creates response detailing why execution failed.
 - Includes lists of all completed reads and writes.
 - Triggers stale lock cleanup before responding.
 
 - On success:
- Updates internal program state with execution results.
 - Records the completed read in its tracking.
 - Determines if program has halted or continues.
 
 
 - The action phase begins by executing the next program step:
 - 
Continuation Flow
- If program hasn't halted:
- Processes program outputs to generate new messages.
 - For read requests:
- Validates key is in allowed read sets (lazy or eager).
 - Creates 
KVSReadRequestmessages for valid reads. 
 - For write operations:
- Validates key is in allowed write sets (will or may).
 - Creates 
KVSWritemessages for valid writes 
 - Sends all generated messages to appropriate shards.
 - Awaits next read response to continue execution.
 
 
 - If program hasn't halted:
 - 
Completion Flow
- When program halts (either naturally or from error):
- Computes stale lock information:
- Finds difference between lazy_read_keys and actual reads.
 - Finds difference between may_write_keys and actual writes.
 
 - Generates cleanup messages:
KVSReadRequestwith actual=false for unused reads.KVSWritewith datum=none for unused writes.
 - Creates 
ExecutorFinishedmessage containing:- Success/failure status
 - Complete list of values read
 - Complete list of values written
 
 - Sends cleanup messages and finished notification.
 - Terminates executor instance.
 
 - Computes stale lock information:
 
 - When program halts (either naturally or from error):
 - 
Reply Delivery
- All responses are sent back using:
- Executor's ID as sender.
 - Original requester as target.
 - Mailbox 0 (default response mailbox).
 
 - Three possible response patterns:
- Error case: ExecutorFinished (success=false) + stale cleanup.
 - Success case: ExecutorFinished (success=true) + stale cleanup.
 - Continuation case: New read/write messages.
 
 
 - All responses are sent back using:
 
Action arguments¶
ExecutorActionArguments¶
syntax alias ExecutorActionArguments := Unit;
Actions¶
Auxiliary Juvix code
ExecutorAction¶
ExecutorAction (KVSKey KVSDatum Executable ProgramState : Type) : Type :=
  Action
    (ExecutorCfg KVSKey Executable)
    (ExecutorLocalState KVSKey KVSDatum ProgramState)
    ExecutorMailboxState
    ExecutorTimerHandle
    ExecutorActionArguments
    (Anoma.PreMsg KVSKey KVSDatum Executable)
    (Anoma.PreCfg KVSKey KVSDatum Executable)
    (Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);
ExecutorActionInput¶
ExecutorActionInput (KVSKey KVSDatum Executable ProgramState : Type) : Type :=
  ActionInput
    (ExecutorCfg KVSKey Executable)
    (ExecutorLocalState KVSKey KVSDatum ProgramState)
    ExecutorMailboxState
    ExecutorTimerHandle
    ExecutorActionArguments
    (Anoma.PreMsg KVSKey KVSDatum Executable);
ExecutorActionEffect¶
ExecutorActionEffect (KVSKey KVSDatum Executable ProgramState : Type) : Type :=
  ActionEffect
    (ExecutorLocalState KVSKey KVSDatum ProgramState)
    ExecutorMailboxState
    ExecutorTimerHandle
    (Anoma.PreMsg KVSKey KVSDatum Executable)
    (Anoma.PreCfg KVSKey KVSDatum Executable)
    (Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);
ExecutorActionExec¶
ExecutorActionExec (KVSKey KVSDatum Executable ProgramState : Type) : Type :=
  ActionExec
    (ExecutorCfg KVSKey Executable)
    (ExecutorLocalState KVSKey KVSDatum ProgramState)
    ExecutorMailboxState
    ExecutorTimerHandle
    ExecutorActionArguments
    (Anoma.PreMsg KVSKey KVSDatum Executable)
    (Anoma.PreCfg KVSKey KVSDatum Executable)
    (Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);
processReadAction¶
Process a read response and execute the next program step.
- State update
 - Updates the program state with executed step results and tracks completed reads/writes
 - Messages to be sent
 - 
- Read/Write messages to shards based on program outputs
 
 - 
- Notification messages for stale locks if program halts
 
 - 
- ExecutorFinished message if program halts
 
 - Engines to be spawned
 - No engines are created by this action.
 - Timer updates
 - No timers are set or cancelled.
 
processReadAction
  {KVSKey KVSDatum Executable ProgramState}
  {{Ord KVSKey}}
  {{rinst : Runnable KVSKey KVSDatum Executable ProgramState}}
  (input : ExecutorActionInput KVSKey KVSDatum Executable ProgramState)
  : Option (ExecutorActionEffect KVSKey KVSDatum Executable ProgramState) :=
  let
    cfg := EngineCfg.cfg (ActionInput.cfg input);
    env := ActionInput.env input;
    trigger := ActionInput.trigger input;
  in case getMsgFromTimestampedTrigger trigger of
       | some (MsgShard (ShardMsgKVSRead mkKVSReadMsg@{
                                           key := readKey;
                                           data := readValue;
                                         })) :=
         let
           envelope
             (target : EngineID)
             (msg : Anoma.PreMsg KVSKey KVSDatum Executable)
             : EngineMsg (Anoma.PreMsg KVSKey KVSDatum Executable) :=
             mkEngineMsg@{
               sender := getEngineIDFromEngineCfg (ActionInput.cfg input);
               target := target;
               mailbox := some 0;
               msg := msg;
             };
           local := EngineEnv.localState env;
           reads := ExecutorLocalState.completed_reads local;
           writes := ExecutorLocalState.completed_writes local;
           staleReadMsg
             (key : KVSKey)
             : EngineMsg (Anoma.PreMsg KVSKey KVSDatum Executable) :=
             envelope
               (keyToShard key)
               (MsgShard
                 (ShardMsgKVSReadRequest
                   mkKVSReadRequestMsg@{
                     timestamp := ExecutorCfg.timestamp cfg;
                     key := key;
                     actual := false;
                   }));
           staleWriteMsg
             (key : KVSKey)
             : EngineMsg (Anoma.PreMsg KVSKey KVSDatum Executable) :=
             envelope
               (keyToShard key)
               (MsgShard
                 (ShardMsgKVSWrite
                   mkKVSWriteMsg@{
                     timestamp := ExecutorCfg.timestamp cfg;
                     key := key;
                     datum := none;
                   }));
           staleReadLocations :=
             Set.difference
               (ExecutorCfg.lazy_read_keys cfg)
               (Set.fromList (Map.keys reads));
           readStaleMsgs := map staleReadMsg (Set.toList staleReadLocations);
           staleWriteLocations :=
             Set.difference
               (ExecutorCfg.may_write_keys cfg)
               (Set.fromList (Map.keys writes));
           writeStaleMsgs := map staleWriteMsg (Set.toList staleWriteLocations);
           staleMsgs := readStaleMsgs ++ writeStaleMsgs;
           stepInput := mkPair readKey readValue;
           stepResult :=
             Runnable.executeStep
               (ExecutorCfg.executable cfg)
               (ExecutorLocalState.program_state local)
               stepInput;
         in case stepResult of {
              | error err :=
                let
                  local := EngineEnv.localState env;
                  finishedMsg :=
                    envelope
                      (ExecutorCfg.issuer cfg)
                      (MsgExecutor
                        (ExecutorMsgExecutorFinished
                          mkExecutorFinishedMsg@{
                            success := false;
                            values_read :=
                              mkPair readKey readValue :: Map.toList reads;
                            values_written := Map.toList writes;
                          }));
                in some
                  mkActionEffect@{
                    env := env;
                    msgs := finishedMsg :: staleMsgs;
                    timers := [];
                    engines := [];
                  }
              | ok (mkPair program' outputs) :=
                let
                  accReads
                    (key : KVSKey)
                    (msgs : List
                      (EngineMsg (Anoma.PreMsg KVSKey KVSDatum Executable)))
                    : List
                      (EngineMsg (Anoma.PreMsg KVSKey KVSDatum Executable)) :=
                    let
                      msg :=
                        envelope
                          (keyToShard key)
                          (MsgShard
                            (ShardMsgKVSReadRequest
                              mkKVSReadRequestMsg@{
                                timestamp := ExecutorCfg.timestamp cfg;
                                key := key;
                                actual := true;
                              }));
                    in case
                         or
                           (Set.isMember key (ExecutorCfg.lazy_read_keys cfg))
                           (Set.isMember key (ExecutorCfg.eager_read_keys cfg))
                       of
                         | true := msg :: msgs
                         | false := msgs;
                  accWrites
                    (key : KVSKey)
                    (value : KVSDatum)
                    (msgs : List
                      (EngineMsg (Anoma.PreMsg KVSKey KVSDatum Executable)))
                    : List
                      (EngineMsg (Anoma.PreMsg KVSKey KVSDatum Executable)) :=
                    let
                      msg :=
                        envelope
                          (keyToShard key)
                          (MsgShard
                            (ShardMsgKVSWrite
                              mkKVSWriteMsg@{
                                timestamp := ExecutorCfg.timestamp cfg;
                                key := key;
                                datum := some value;
                              }));
                    in case
                         or
                           (Set.isMember key (ExecutorCfg.will_write_keys cfg))
                           (Set.isMember key (ExecutorCfg.may_write_keys cfg))
                       of
                         | true := msg :: msgs
                         | false := msgs;
                  sendHelper
                    (acc : Pair
                      (ExecutorLocalState KVSKey KVSDatum ProgramState)
                      (List
                        (EngineMsg (Anoma.PreMsg KVSKey KVSDatum Executable))))
                    (out : Either KVSKey (Pair KVSKey KVSDatum))
                    : Pair
                      (ExecutorLocalState KVSKey KVSDatum ProgramState)
                      (List
                        (EngineMsg
                          (Anoma.PreMsg KVSKey KVSDatum Executable))) :=
                    let
                      state := fst acc;
                      msgs := snd acc;
                    in case out of
                         | left key := mkPair state (accReads key msgs)
                         | right (mkPair key value) :=
                           let
                             newState :=
                               state@ExecutorLocalState{completed_writes := Map.insert
                                 key
                                 value
                                 (ExecutorLocalState.completed_writes state)};
                           in mkPair newState (accWrites key value msgs);
                  initial :=
                    mkPair
                      local@ExecutorLocalState{
                        program_state := program';
                        completed_reads := Map.insert
                          readKey
                          readValue
                          (ExecutorLocalState.completed_reads local);
                      }
                      [];
                  final := foldl sendHelper initial outputs;
                  newLocalState := fst final;
                  msgList := snd final;
                  newEnv := env@EngineEnv{localState := newLocalState};
                in case Runnable.halted {{rinst}} program' of {
                     | false :=
                       some
                         mkActionEffect@{
                           env := newEnv;
                           msgs := msgList;
                           timers := [];
                           engines := [];
                         }
                     | true :=
                       let
                         finishedMsg :=
                           envelope
                             (ExecutorCfg.issuer cfg)
                             (MsgExecutor
                               (ExecutorMsgExecutorFinished
                                 mkExecutorFinishedMsg@{
                                   success := true;
                                   values_read := Map.toList reads;
                                   values_written := Map.toList writes;
                                 }));
                       in some
                         mkActionEffect@{
                           env := newEnv;
                           msgs := msgList ++ finishedMsg :: staleMsgs;
                           timers := [];
                           engines := [];
                         }
                   }
            }
       | _ := none;
Action Labels¶
processReadActionLabel
  {KVSKey KVSDatum Executable ProgramState}
  {{Ord KVSKey}}
  {{Runnable KVSKey KVSDatum Executable ProgramState}}
  : ExecutorActionExec KVSKey KVSDatum Executable ProgramState :=
  Seq [processReadAction];
Guards¶
Auxiliary Juvix code
ExecutorGuard¶
ExecutorGuard (KVSKey KVSDatum Executable ProgramState : Type) : Type :=
  Guard
    (ExecutorCfg KVSKey Executable)
    (ExecutorLocalState KVSKey KVSDatum ProgramState)
    ExecutorMailboxState
    ExecutorTimerHandle
    ExecutorActionArguments
    (Anoma.PreMsg KVSKey KVSDatum Executable)
    (Anoma.PreCfg KVSKey KVSDatum Executable)
    (Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);
ExecutorGuardOutput¶
ExecutorGuardOutput (KVSKey KVSDatum Executable ProgramState : Type) : Type :=
  GuardOutput
    (ExecutorCfg KVSKey Executable)
    (ExecutorLocalState KVSKey KVSDatum ProgramState)
    ExecutorMailboxState
    ExecutorTimerHandle
    ExecutorActionArguments
    (Anoma.PreMsg KVSKey KVSDatum Executable)
    (Anoma.PreCfg KVSKey KVSDatum Executable)
    (Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);
ExecutorGuardEval¶
ExecutorGuardEval (KVSKey KVSDatum Executable ProgramState : Type) : Type :=
  GuardEval
    (ExecutorCfg KVSKey Executable)
    (ExecutorLocalState KVSKey KVSDatum ProgramState)
    ExecutorMailboxState
    ExecutorTimerHandle
    ExecutorActionArguments
    (Anoma.PreMsg KVSKey KVSDatum Executable)
    (Anoma.PreCfg KVSKey KVSDatum Executable)
    (Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);
processReadGuard¶
Guard for processing read responses.
processReadGuard
  {KVSKey KVSDatum Executable ProgramState}
  {{Ord KVSKey}}
  {{Runnable KVSKey KVSDatum Executable ProgramState}}
  (trigger : TimestampedTrigger
    ExecutorTimerHandle
    (Anoma.PreMsg KVSKey KVSDatum Executable))
  (cfg : EngineCfg (ExecutorCfg KVSKey Executable))
  (env : ExecutorEnv KVSKey KVSDatum ProgramState)
  : Option (ExecutorGuardOutput KVSKey KVSDatum Executable ProgramState) :=
  case getEngineMsgFromTimestampedTrigger trigger of
    | some mkEngineMsg@{
             msg := MsgShard (ShardMsgKVSRead mkKVSReadMsg@{
                                                timestamp := timestamp;
                                                key := _;
                                                data := _;
                                              });
           } :=
      case timestamp == ExecutorCfg.timestamp (EngineCfg.cfg cfg) of {
        | true :=
          some
            mkGuardOutput@{
              action := processReadActionLabel;
              args := unit;
            }
        | false := none
      }
    | _ := none;
The Executor Behaviour¶
ExecutorBehaviour¶
ExecutorBehaviour (KVSKey KVSDatum Executable ProgramState : Type) : Type :=
  EngineBehaviour
    (ExecutorCfg KVSKey Executable)
    (ExecutorLocalState KVSKey KVSDatum ProgramState)
    ExecutorMailboxState
    ExecutorTimerHandle
    ExecutorActionArguments
    (Anoma.PreMsg KVSKey KVSDatum Executable)
    (Anoma.PreCfg KVSKey KVSDatum Executable)
    (Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);
Instantiation¶
instance
dummyRunnable : Runnable String String ByteString String :=
  mkRunnable@{
    executeStep := \{_ _ _ := error "Not implemented"};
    halted := \{_ := false};
    startingState := "";
  };
executorBehaviour : ExecutorBehaviour String String ByteString String :=
  mkEngineBehaviour@{
    guards := First [processReadGuard];
  };