Juvix imports
module arch.node.engines.executor_behaviour;
import arch.node.engines.executor_messages open;
import arch.node.engines.executor_config open;
import arch.node.engines.executor_environment open;
import arch.node.engines.shard_messages open;
import prelude open;
import arch.node.types.basics open;
import arch.node.types.identities open;
import arch.node.types.messages open;
import arch.node.types.engine open;
import arch.node.types.anoma as Anoma open;
Executor Behaviour¶
Overview¶
The executor behaviour defines how it processes incoming read responses and performs state transitions to execute the transaction program.
Action arguments¶
ExecutorActionArguments
¶
syntax alias ExecutorActionArguments := Unit;
Actions¶
Auxiliary Juvix code
ExecutorAction¶
ExecutorAction : Type :=
Action
ExecutorCfg
ExecutorLocalState
ExecutorMailboxState
ExecutorTimerHandle
ExecutorActionArguments
Anoma.Msg
Anoma.Cfg
Anoma.Env;
ExecutorActionInput¶
ExecutorActionInput : Type :=
ActionInput
ExecutorCfg
ExecutorLocalState
ExecutorMailboxState
ExecutorTimerHandle
ExecutorActionArguments
Anoma.Msg;
ExecutorActionEffect¶
ExecutorActionEffect : Type :=
ActionEffect
ExecutorLocalState
ExecutorMailboxState
ExecutorTimerHandle
Anoma.Msg
Anoma.Cfg
Anoma.Env;
ExecutorActionExec¶
ExecutorActionExec : Type :=
ActionExec
ExecutorCfg
ExecutorLocalState
ExecutorMailboxState
ExecutorTimerHandle
ExecutorActionArguments
Anoma.Msg
Anoma.Cfg
Anoma.Env;
processReadAction
¶
Process a read response and execute the next program step.
- State update
- Updates the program state with executed step results and tracks completed reads/writes
- Messages to be sent
-
- Read/Write messages to shards based on program outputs
-
- Notification messages for stale locks if program halts
-
- ExecutorFinished message if program halts
- Engines to be spawned
- No engines are created by this action.
- Timer updates
- No timers are set or cancelled.
processReadAction (input : ExecutorActionInput) : Option ExecutorActionEffect :=
let
cfg := EngineCfg.cfg (ActionInput.cfg input);
env := ActionInput.env input;
trigger := ActionInput.trigger input;
in case getMsgFromTimestampedTrigger trigger of
| some (MsgShard (ShardMsgKVSRead mkKVSReadMsg@{
key := readKey;
data := readValue;
})) :=
let
envelope
(target : EngineID) (msg : Anoma.Msg) : EngineMsg Anoma.Msg :=
mkEngineMsg@{
sender := getEngineIDFromEngineCfg (ActionInput.cfg input);
target := target;
mailbox := some 0;
msg := msg;
};
local := EngineEnv.localState env;
reads := ExecutorLocalState.completed_reads local;
writes := ExecutorLocalState.completed_writes local;
staleReadMsg (key : KVSKey) : EngineMsg Anoma.Msg :=
envelope
(keyToShard key)
(MsgShard
(ShardMsgKVSReadRequest
mkKVSReadRequestMsg@{
timestamp := ExecutorCfg.timestamp cfg;
key := key;
actual := false;
}));
staleWriteMsg (key : KVSKey) : EngineMsg Anoma.Msg :=
envelope
(keyToShard key)
(MsgShard
(ShardMsgKVSWrite
mkKVSWriteMsg@{
timestamp := ExecutorCfg.timestamp cfg;
key := key;
datum := none;
}));
staleReadLocations :=
Set.difference
(ExecutorCfg.lazy_read_keys cfg)
(Set.fromList (Map.keys reads));
readStaleMsgs := map staleReadMsg (Set.toList staleReadLocations);
staleWriteLocations :=
Set.difference
(ExecutorCfg.may_write_keys cfg)
(Set.fromList (Map.keys writes));
writeStaleMsgs := map staleWriteMsg (Set.toList staleWriteLocations);
staleMsgs := readStaleMsgs ++ writeStaleMsgs;
stepInput := mkPair readKey readValue;
stepResult :=
executeStep
(ExecutorCfg.executable cfg)
(ExecutorLocalState.program_state local)
stepInput;
in case stepResult of {
| error err :=
let
local := EngineEnv.localState env;
finishedMsg :=
envelope
(ExecutorCfg.issuer cfg)
(MsgExecutor
(ExecutorMsgExecutorFinished
mkExecutorFinishedMsg@{
success := false;
values_read :=
mkPair readKey readValue :: Map.toList reads;
values_written := Map.toList writes;
}));
in some
mkActionEffect@{
env := env;
msgs := finishedMsg :: staleMsgs;
timers := [];
engines := [];
}
| ok (mkPair program' outputs) :=
let
accReads
(key : KVSKey)
(msgs : List (EngineMsg Anoma.Msg))
: List (EngineMsg Anoma.Msg) :=
let
msg :=
envelope
(keyToShard key)
(MsgShard
(ShardMsgKVSReadRequest
mkKVSReadRequestMsg@{
timestamp := ExecutorCfg.timestamp cfg;
key := key;
actual := true;
}));
in case
or
(Set.isMember key (ExecutorCfg.lazy_read_keys cfg))
(Set.isMember key (ExecutorCfg.eager_read_keys cfg))
of
| true := msg :: msgs
| false := msgs;
accWrites
(key : KVSKey)
(value : KVSDatum)
(msgs : List (EngineMsg Anoma.Msg))
: List (EngineMsg Anoma.Msg) :=
let
msg :=
envelope
(keyToShard key)
(MsgShard
(ShardMsgKVSWrite
mkKVSWriteMsg@{
timestamp := ExecutorCfg.timestamp cfg;
key := key;
datum := some value;
}));
in case
or
(Set.isMember key (ExecutorCfg.will_write_keys cfg))
(Set.isMember key (ExecutorCfg.may_write_keys cfg))
of
| true := msg :: msgs
| false := msgs;
sendHelper
(acc : Pair ExecutorLocalState (List (EngineMsg Anoma.Msg)))
(out : Either KVSKey (Pair KVSKey KVSDatum))
: Pair ExecutorLocalState (List (EngineMsg Anoma.Msg)) :=
let
state := fst acc;
msgs := snd acc;
in case out of
| left key := mkPair state (accReads key msgs)
| right (mkPair key value) :=
let
newState :=
state@ExecutorLocalState{completed_writes := Map.insert
key
value
(ExecutorLocalState.completed_writes state)};
in mkPair newState (accWrites key value msgs);
initial :=
mkPair
local@ExecutorLocalState{
program_state := program';
completed_reads := Map.insert
readKey
readValue
(ExecutorLocalState.completed_reads local);
}
[];
final := foldl sendHelper initial outputs;
newLocalState := fst final;
msgList := snd final;
newEnv := env@EngineEnv{localState := newLocalState};
in case ProgramState.halted program' of {
| false :=
some
mkActionEffect@{
env := newEnv;
msgs := msgList;
timers := [];
engines := [];
}
| true :=
let
finishedMsg :=
envelope
(ExecutorCfg.issuer cfg)
(MsgExecutor
(ExecutorMsgExecutorFinished
mkExecutorFinishedMsg@{
success := true;
values_read := Map.toList reads;
values_written := Map.toList writes;
}));
in some
mkActionEffect@{
env := newEnv;
msgs := msgList ++ finishedMsg :: staleMsgs;
timers := [];
engines := [];
}
}
}
| _ := none;
Action Labels¶
processReadActionLabel : ExecutorActionExec := Seq [processReadAction];
Guards¶
Auxiliary Juvix code
ExecutorGuard
¶
ExecutorGuard : Type :=
Guard
ExecutorCfg
ExecutorLocalState
ExecutorMailboxState
ExecutorTimerHandle
ExecutorActionArguments
Anoma.Msg
Anoma.Cfg
Anoma.Env;
ExecutorGuardOutput
¶
ExecutorGuardOutput : Type :=
GuardOutput
ExecutorCfg
ExecutorLocalState
ExecutorMailboxState
ExecutorTimerHandle
ExecutorActionArguments
Anoma.Msg
Anoma.Cfg
Anoma.Env;
ExecutorGuardEval
¶
ExecutorGuardEval : Type :=
GuardEval
ExecutorCfg
ExecutorLocalState
ExecutorMailboxState
ExecutorTimerHandle
ExecutorActionArguments
Anoma.Msg
Anoma.Cfg
Anoma.Env;
processReadGuard
¶
Guard for processing read responses.
processReadGuard
(trigger : TimestampedTrigger ExecutorTimerHandle Anoma.Msg)
(cfg : EngineCfg ExecutorCfg)
(env : ExecutorEnv)
: Option ExecutorGuardOutput :=
case getEngineMsgFromTimestampedTrigger trigger of
| some mkEngineMsg@{
msg := MsgShard (ShardMsgKVSRead mkKVSReadMsg@{
timestamp := timestamp;
key := _;
data := _;
});
} :=
case timestamp == ExecutorCfg.timestamp (EngineCfg.cfg cfg) of {
| true :=
some
mkGuardOutput@{
action := processReadActionLabel;
args := unit;
}
| false := none
}
| _ := none;
The Executor Behaviour¶
ExecutorBehaviour
¶
ExecutorBehaviour : Type :=
EngineBehaviour
ExecutorCfg
ExecutorLocalState
ExecutorMailboxState
ExecutorTimerHandle
ExecutorActionArguments
Anoma.Msg
Anoma.Cfg
Anoma.Env;
Instantiation¶
executorBehaviour : ExecutorBehaviour :=
mkEngineBehaviour@{
guards := First [processReadGuard];
};