Juvix imports
module arch.node.engines.executor_behaviour;
import arch.node.engines.executor_messages open;
import arch.node.engines.executor_config open;
import arch.node.engines.executor_environment open;
import arch.node.engines.shard_messages open;
import prelude open;
import arch.node.types.basics open;
import arch.node.types.identities open;
import arch.node.types.messages open;
import arch.node.types.engine open;
import arch.node.types.anoma as Anoma open;
Executor Behaviour¶
Overview¶
The executor behaviour defines how it processes incoming read responses and performs state transitions to execute the transaction program.
Executor Action Flowcharts¶
processRead
Flowchart¶
flowchart TD
Start([Receive Message]) --> Msg[ShardMsgKVSRead<br/>key: KVSKey<br/>data: KVSDatum]
subgraph Guard["processReadGuard"]
Msg --> CheckMsg{Is message<br/>ShardMsgKVSRead?}
CheckMsg -->|No| Reject([Reject Message])
CheckMsg -->|Yes| ValidTS{Matching<br/>timestamp?}
ValidTS -->|No| Reject
ValidTS -->|Yes| ActionEntry[Enter Action Phase]
end
ActionEntry --> Action
subgraph Action["processReadAction"]
direction TB
ComputeStale["Compute stale locks:<br/>1. Find uncompleted reads<br/>2. Find uncompleted writes<br/>3. Create cleanup messages"]
ExecStep[Execute program step<br/>with read data]
ExecStep --> StepResult{Step<br/>Result?}
StepResult -->|Error| ErrBranch[Create error response<br/>with read/write history]
StepResult -->|Success| SuccessBranch[Update program state<br/>Track completed read]
SuccessBranch --> CheckHalt{Program<br/>Halted?}
CheckHalt -->|Yes| FinishOk[Create success response<br/>with read/write history]
CheckHalt -->|No| GenMsgs[Generate messages for<br/>new reads/writes]
end
ErrBranch --> AddStaleErr[Add stale lock<br/>cleanup messages]
FinishOk --> AddStaleOk[Add stale lock<br/>cleanup messages]
GenMsgs --> Parse[Parse step outputs]
subgraph ProcessOutputs["Process Step Outputs"]
Parse --> CheckType{Output<br/>Type?}
CheckType -->|Read Request| ReadBranch[Create KVSReadRequest<br/>if key in read sets]
CheckType -->|Write Request| WriteBranch[Create KVSWrite<br/>if key in write sets]
ReadBranch --> ValidRead{Key in<br/>read sets?}
ValidRead -->|Yes| AddRead[Add to read<br/>message list]
ValidRead -->|No| SkipRead[Skip invalid<br/>read request]
WriteBranch --> ValidWrite{Key in<br/>write sets?}
ValidWrite -->|Yes| AddWrite[Add to write<br/>message list]
ValidWrite -->|No| SkipWrite[Skip invalid<br/>write request]
end
AddRead --> Collect[Collect all<br/>generated messages]
AddWrite --> Collect
SkipRead --> Collect
SkipWrite --> Collect
subgraph StaleComputation["Stale Lock Processing"]
ComputeStale --> FindReads[Find difference between<br/>lazy_read_keys and<br/>completed reads]
ComputeStale --> FindWrites[Find difference between<br/>may_write_keys and<br/>completed writes]
FindReads --> CreateRead[Create cleanup read<br/>messages with actual=false]
FindWrites --> CreateWrite[Create cleanup write<br/>messages with datum=none]
CreateRead & CreateWrite --> CombineMsgs[Combine cleanup messages]
end
CombineMsgs -.-> AddStaleErr
CombineMsgs -.-> AddStaleOk
AddStaleErr --> NotifyFail[Send ExecutorFinished<br/>with error + cleanup messages]
AddStaleOk --> NotifySuccess[Send ExecutorFinished<br/>with success + cleanup messages]
Collect --> SendMsgs[Send generated<br/>read/write messages]
NotifyFail & NotifySuccess & SendMsgs --> End([Complete])
processRead
flowchart showing read handling and execution steps
Explanation¶
-
Initial Request Processing
- A client sends a
ShardMsgKVSRead
message containing:key
: The state key that was read.data
: The actual data value for that key.- A timestamp that identifies this execution context.
- The message first passes through the guard phase which:
- Validates the message is a
ShardMsgKVSRead
. - Ensures the timestamp matches this executor's configured timestamp.
- Rejects messages that fail either check.
- Routes valid messages to the action phase.
- Validates the message is a
- A client sends a
-
Program Execution
- The action phase begins by executing the next program step:
- Takes the current program state as context.
- Provides the newly read key-value pair as input.
- Produces either an error or a new program state with outputs.
- On error:
- Creates response detailing why execution failed.
- Includes lists of all completed reads and writes.
- Triggers stale lock cleanup before responding.
- On success:
- Updates internal program state with execution results.
- Records the completed read in its tracking.
- Determines if program has halted or continues.
- The action phase begins by executing the next program step:
-
Continuation Flow
- If program hasn't halted:
- Processes program outputs to generate new messages.
- For read requests:
- Validates key is in allowed read sets (lazy or eager).
- Creates
KVSReadRequest
messages for valid reads.
- For write operations:
- Validates key is in allowed write sets (will or may).
- Creates
KVSWrite
messages for valid writes
- Sends all generated messages to appropriate shards.
- Awaits next read response to continue execution.
- If program hasn't halted:
-
Completion Flow
- When program halts (either naturally or from error):
- Computes stale lock information:
- Finds difference between lazy_read_keys and actual reads.
- Finds difference between may_write_keys and actual writes.
- Generates cleanup messages:
KVSReadRequest
with actual=false for unused reads.KVSWrite
with datum=none for unused writes.
- Creates
ExecutorFinished
message containing:- Success/failure status
- Complete list of values read
- Complete list of values written
- Sends cleanup messages and finished notification.
- Terminates executor instance.
- Computes stale lock information:
- When program halts (either naturally or from error):
-
Reply Delivery
- All responses are sent back using:
- Executor's ID as sender.
- Original requester as target.
- Mailbox 0 (default response mailbox).
- Three possible response patterns:
- Error case: ExecutorFinished (success=false) + stale cleanup.
- Success case: ExecutorFinished (success=true) + stale cleanup.
- Continuation case: New read/write messages.
- All responses are sent back using:
Action arguments¶
ExecutorActionArguments
¶
syntax alias ExecutorActionArguments := Unit;
Actions¶
Auxiliary Juvix code
ExecutorAction
¶
ExecutorAction (KVSKey KVSDatum Executable ProgramState : Type) : Type :=
Action
(ExecutorCfg KVSKey Executable)
(ExecutorLocalState KVSKey KVSDatum ProgramState)
ExecutorMailboxState
ExecutorTimerHandle
ExecutorActionArguments
(Anoma.PreMsg KVSKey KVSDatum Executable)
(Anoma.PreCfg KVSKey KVSDatum Executable)
(Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);
ExecutorActionInput
¶
ExecutorActionInput (KVSKey KVSDatum Executable ProgramState : Type) : Type :=
ActionInput
(ExecutorCfg KVSKey Executable)
(ExecutorLocalState KVSKey KVSDatum ProgramState)
ExecutorMailboxState
ExecutorTimerHandle
ExecutorActionArguments
(Anoma.PreMsg KVSKey KVSDatum Executable);
ExecutorActionEffect
¶
ExecutorActionEffect (KVSKey KVSDatum Executable ProgramState : Type) : Type :=
ActionEffect
(ExecutorLocalState KVSKey KVSDatum ProgramState)
ExecutorMailboxState
ExecutorTimerHandle
(Anoma.PreMsg KVSKey KVSDatum Executable)
(Anoma.PreCfg KVSKey KVSDatum Executable)
(Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);
ExecutorActionExec
¶
ExecutorActionExec (KVSKey KVSDatum Executable ProgramState : Type) : Type :=
ActionExec
(ExecutorCfg KVSKey Executable)
(ExecutorLocalState KVSKey KVSDatum ProgramState)
ExecutorMailboxState
ExecutorTimerHandle
ExecutorActionArguments
(Anoma.PreMsg KVSKey KVSDatum Executable)
(Anoma.PreCfg KVSKey KVSDatum Executable)
(Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);
processReadAction
¶
Process a read response and execute the next program step.
- State update
- Updates the program state with executed step results and tracks completed reads/writes
- Messages to be sent
-
- Read/Write messages to shards based on program outputs
-
- Notification messages for stale locks if program halts
-
- ExecutorFinished message if program halts
- Engines to be spawned
- No engines are created by this action.
- Timer updates
- No timers are set or cancelled.
processReadAction
{KVSKey KVSDatum Executable ProgramState}
{{Ord KVSKey}}
{{rinst : Runnable KVSKey KVSDatum Executable ProgramState}}
(input : ExecutorActionInput KVSKey KVSDatum Executable ProgramState)
: Option (ExecutorActionEffect KVSKey KVSDatum Executable ProgramState) :=
let
cfg := EngineCfg.cfg (ActionInput.cfg input);
env := ActionInput.env input;
trigger := ActionInput.trigger input;
in case getMsgFromTimestampedTrigger trigger of
| some (MsgShard (ShardMsgKVSRead mkKVSReadMsg@{
key := readKey;
data := readValue;
})) :=
let
envelope
(target : EngineID)
(msg : Anoma.PreMsg KVSKey KVSDatum Executable)
: EngineMsg (Anoma.PreMsg KVSKey KVSDatum Executable) :=
mkEngineMsg@{
sender := getEngineIDFromEngineCfg (ActionInput.cfg input);
target := target;
mailbox := some 0;
msg := msg;
};
local := EngineEnv.localState env;
reads := ExecutorLocalState.completed_reads local;
writes := ExecutorLocalState.completed_writes local;
staleReadMsg
(key : KVSKey)
: EngineMsg (Anoma.PreMsg KVSKey KVSDatum Executable) :=
envelope
(keyToShard key)
(MsgShard
(ShardMsgKVSReadRequest
mkKVSReadRequestMsg@{
timestamp := ExecutorCfg.timestamp cfg;
key := key;
actual := false;
}));
staleWriteMsg
(key : KVSKey)
: EngineMsg (Anoma.PreMsg KVSKey KVSDatum Executable) :=
envelope
(keyToShard key)
(MsgShard
(ShardMsgKVSWrite
mkKVSWriteMsg@{
timestamp := ExecutorCfg.timestamp cfg;
key := key;
datum := none;
}));
staleReadLocations :=
Set.difference
(ExecutorCfg.lazy_read_keys cfg)
(Set.fromList (Map.keys reads));
readStaleMsgs := map staleReadMsg (Set.toList staleReadLocations);
staleWriteLocations :=
Set.difference
(ExecutorCfg.may_write_keys cfg)
(Set.fromList (Map.keys writes));
writeStaleMsgs := map staleWriteMsg (Set.toList staleWriteLocations);
staleMsgs := readStaleMsgs ++ writeStaleMsgs;
stepInput := mkPair readKey readValue;
stepResult :=
Runnable.executeStep
(ExecutorCfg.executable cfg)
(ExecutorLocalState.program_state local)
stepInput;
in case stepResult of {
| error err :=
let
local := EngineEnv.localState env;
finishedMsg :=
envelope
(ExecutorCfg.issuer cfg)
(MsgExecutor
(ExecutorMsgExecutorFinished
mkExecutorFinishedMsg@{
success := false;
values_read :=
mkPair readKey readValue :: Map.toList reads;
values_written := Map.toList writes;
}));
in some
mkActionEffect@{
env := env;
msgs := finishedMsg :: staleMsgs;
timers := [];
engines := [];
}
| ok (mkPair program' outputs) :=
let
accReads
(key : KVSKey)
(msgs : List
(EngineMsg (Anoma.PreMsg KVSKey KVSDatum Executable)))
: List
(EngineMsg (Anoma.PreMsg KVSKey KVSDatum Executable)) :=
let
msg :=
envelope
(keyToShard key)
(MsgShard
(ShardMsgKVSReadRequest
mkKVSReadRequestMsg@{
timestamp := ExecutorCfg.timestamp cfg;
key := key;
actual := true;
}));
in case
or
(Set.isMember key (ExecutorCfg.lazy_read_keys cfg))
(Set.isMember key (ExecutorCfg.eager_read_keys cfg))
of
| true := msg :: msgs
| false := msgs;
accWrites
(key : KVSKey)
(value : KVSDatum)
(msgs : List
(EngineMsg (Anoma.PreMsg KVSKey KVSDatum Executable)))
: List
(EngineMsg (Anoma.PreMsg KVSKey KVSDatum Executable)) :=
let
msg :=
envelope
(keyToShard key)
(MsgShard
(ShardMsgKVSWrite
mkKVSWriteMsg@{
timestamp := ExecutorCfg.timestamp cfg;
key := key;
datum := some value;
}));
in case
or
(Set.isMember key (ExecutorCfg.will_write_keys cfg))
(Set.isMember key (ExecutorCfg.may_write_keys cfg))
of
| true := msg :: msgs
| false := msgs;
sendHelper
(acc : Pair
(ExecutorLocalState KVSKey KVSDatum ProgramState)
(List
(EngineMsg (Anoma.PreMsg KVSKey KVSDatum Executable))))
(out : Either KVSKey (Pair KVSKey KVSDatum))
: Pair
(ExecutorLocalState KVSKey KVSDatum ProgramState)
(List
(EngineMsg
(Anoma.PreMsg KVSKey KVSDatum Executable))) :=
let
state := fst acc;
msgs := snd acc;
in case out of
| left key := mkPair state (accReads key msgs)
| right (mkPair key value) :=
let
newState :=
state@ExecutorLocalState{completed_writes := Map.insert
key
value
(ExecutorLocalState.completed_writes state)};
in mkPair newState (accWrites key value msgs);
initial :=
mkPair
local@ExecutorLocalState{
program_state := program';
completed_reads := Map.insert
readKey
readValue
(ExecutorLocalState.completed_reads local);
}
[];
final := foldl sendHelper initial outputs;
newLocalState := fst final;
msgList := snd final;
newEnv := env@EngineEnv{localState := newLocalState};
in case Runnable.halted {{rinst}} program' of {
| false :=
some
mkActionEffect@{
env := newEnv;
msgs := msgList;
timers := [];
engines := [];
}
| true :=
let
finishedMsg :=
envelope
(ExecutorCfg.issuer cfg)
(MsgExecutor
(ExecutorMsgExecutorFinished
mkExecutorFinishedMsg@{
success := true;
values_read := Map.toList reads;
values_written := Map.toList writes;
}));
in some
mkActionEffect@{
env := newEnv;
msgs := msgList ++ finishedMsg :: staleMsgs;
timers := [];
engines := [];
}
}
}
| _ := none;
Action Labels¶
processReadActionLabel
{KVSKey KVSDatum Executable ProgramState}
{{Ord KVSKey}}
{{Runnable KVSKey KVSDatum Executable ProgramState}}
: ExecutorActionExec KVSKey KVSDatum Executable ProgramState :=
Seq [processReadAction];
Guards¶
Auxiliary Juvix code
ExecutorGuard
¶
ExecutorGuard (KVSKey KVSDatum Executable ProgramState : Type) : Type :=
Guard
(ExecutorCfg KVSKey Executable)
(ExecutorLocalState KVSKey KVSDatum ProgramState)
ExecutorMailboxState
ExecutorTimerHandle
ExecutorActionArguments
(Anoma.PreMsg KVSKey KVSDatum Executable)
(Anoma.PreCfg KVSKey KVSDatum Executable)
(Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);
ExecutorGuardOutput
¶
ExecutorGuardOutput (KVSKey KVSDatum Executable ProgramState : Type) : Type :=
GuardOutput
(ExecutorCfg KVSKey Executable)
(ExecutorLocalState KVSKey KVSDatum ProgramState)
ExecutorMailboxState
ExecutorTimerHandle
ExecutorActionArguments
(Anoma.PreMsg KVSKey KVSDatum Executable)
(Anoma.PreCfg KVSKey KVSDatum Executable)
(Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);
ExecutorGuardEval
¶
ExecutorGuardEval (KVSKey KVSDatum Executable ProgramState : Type) : Type :=
GuardEval
(ExecutorCfg KVSKey Executable)
(ExecutorLocalState KVSKey KVSDatum ProgramState)
ExecutorMailboxState
ExecutorTimerHandle
ExecutorActionArguments
(Anoma.PreMsg KVSKey KVSDatum Executable)
(Anoma.PreCfg KVSKey KVSDatum Executable)
(Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);
processReadGuard
¶
Guard for processing read responses.
processReadGuard
{KVSKey KVSDatum Executable ProgramState}
{{Ord KVSKey}}
{{Runnable KVSKey KVSDatum Executable ProgramState}}
(trigger : TimestampedTrigger
ExecutorTimerHandle
(Anoma.PreMsg KVSKey KVSDatum Executable))
(cfg : EngineCfg (ExecutorCfg KVSKey Executable))
(env : ExecutorEnv KVSKey KVSDatum ProgramState)
: Option (ExecutorGuardOutput KVSKey KVSDatum Executable ProgramState) :=
case getEngineMsgFromTimestampedTrigger trigger of
| some mkEngineMsg@{
msg := MsgShard (ShardMsgKVSRead mkKVSReadMsg@{
timestamp := timestamp;
key := _;
data := _;
});
} :=
case timestamp == ExecutorCfg.timestamp (EngineCfg.cfg cfg) of {
| true :=
some
mkGuardOutput@{
action := processReadActionLabel;
args := unit;
}
| false := none
}
| _ := none;
The Executor Behaviour¶
ExecutorBehaviour
¶
ExecutorBehaviour (KVSKey KVSDatum Executable ProgramState : Type) : Type :=
EngineBehaviour
(ExecutorCfg KVSKey Executable)
(ExecutorLocalState KVSKey KVSDatum ProgramState)
ExecutorMailboxState
ExecutorTimerHandle
ExecutorActionArguments
(Anoma.PreMsg KVSKey KVSDatum Executable)
(Anoma.PreCfg KVSKey KVSDatum Executable)
(Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);
Instantiation¶
instance
dummyRunnable : Runnable String String ByteString String :=
mkRunnable@{
executeStep := \{_ _ _ := error "Not implemented"};
halted := \{_ := false};
startingState := "";
};
executorBehaviour : ExecutorBehaviour String String ByteString String :=
mkEngineBehaviour@{
guards := First [processReadGuard];
};