module arch.node.engines.executor_behaviour;
import arch.node.engines.executor_messages open;
import arch.node.engines.executor_config open;
import arch.node.engines.executor_environment open;
import arch.node.engines.shard_messages open;
import prelude open;
import arch.node.types.basics open;
import arch.node.types.identities open;
import arch.node.types.messages open;
import arch.node.types.engine open;
import arch.node.types.anoma as Anoma open;
syntax alias ExecutorActionArguments := Unit;
ExecutorAction (KVSKey KVSDatum Executable ProgramState : Type) : Type :=
Action
(ExecutorCfg KVSKey Executable)
(ExecutorLocalState KVSKey KVSDatum ProgramState)
ExecutorMailboxState
ExecutorTimerHandle
ExecutorActionArguments
(Anoma.PreMsg KVSKey KVSDatum Executable)
(Anoma.PreCfg KVSKey KVSDatum Executable)
(Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);
ExecutorActionInput (KVSKey KVSDatum Executable ProgramState : Type) : Type :=
ActionInput
(ExecutorCfg KVSKey Executable)
(ExecutorLocalState KVSKey KVSDatum ProgramState)
ExecutorMailboxState
ExecutorTimerHandle
ExecutorActionArguments
(Anoma.PreMsg KVSKey KVSDatum Executable);
ExecutorActionEffect (KVSKey KVSDatum Executable ProgramState : Type) : Type :=
ActionEffect
(ExecutorLocalState KVSKey KVSDatum ProgramState)
ExecutorMailboxState
ExecutorTimerHandle
(Anoma.PreMsg KVSKey KVSDatum Executable)
(Anoma.PreCfg KVSKey KVSDatum Executable)
(Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);
ExecutorActionExec (KVSKey KVSDatum Executable ProgramState : Type) : Type :=
ActionExec
(ExecutorCfg KVSKey Executable)
(ExecutorLocalState KVSKey KVSDatum ProgramState)
ExecutorMailboxState
ExecutorTimerHandle
ExecutorActionArguments
(Anoma.PreMsg KVSKey KVSDatum Executable)
(Anoma.PreCfg KVSKey KVSDatum Executable)
(Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);
processReadAction
{KVSKey KVSDatum Executable ProgramState}
{{Ord KVSKey}}
{{rinst : Runnable KVSKey KVSDatum Executable ProgramState}}
(input : ExecutorActionInput KVSKey KVSDatum Executable ProgramState)
: Option (ExecutorActionEffect KVSKey KVSDatum Executable ProgramState) :=
let
cfg := EngineCfg.cfg (ActionInput.cfg input);
env := ActionInput.env input;
trigger := ActionInput.trigger input;
in case getMsgFromTimestampedTrigger trigger of
| some (MsgShard (ShardMsgKVSRead mkKVSReadMsg@{
key := readKey;
data := readValue;
})) :=
let
envelope
(target : EngineID)
(msg : Anoma.PreMsg KVSKey KVSDatum Executable)
: EngineMsg (Anoma.PreMsg KVSKey KVSDatum Executable) :=
mkEngineMsg@{
sender := getEngineIDFromEngineCfg (ActionInput.cfg input);
target := target;
mailbox := some 0;
msg := msg;
};
local := EngineEnv.localState env;
reads := ExecutorLocalState.completed_reads local;
writes := ExecutorLocalState.completed_writes local;
staleReadMsg
(key : KVSKey)
: EngineMsg (Anoma.PreMsg KVSKey KVSDatum Executable) :=
envelope
(keyToShard key)
(MsgShard
(ShardMsgKVSReadRequest
mkKVSReadRequestMsg@{
timestamp := ExecutorCfg.timestamp cfg;
key := key;
actual := false;
}));
staleWriteMsg
(key : KVSKey)
: EngineMsg (Anoma.PreMsg KVSKey KVSDatum Executable) :=
envelope
(keyToShard key)
(MsgShard
(ShardMsgKVSWrite
mkKVSWriteMsg@{
timestamp := ExecutorCfg.timestamp cfg;
key := key;
datum := none;
}));
staleReadLocations :=
Set.difference
(ExecutorCfg.lazy_read_keys cfg)
(Set.fromList (Map.keys reads));
readStaleMsgs := map staleReadMsg (Set.toList staleReadLocations);
staleWriteLocations :=
Set.difference
(ExecutorCfg.may_write_keys cfg)
(Set.fromList (Map.keys writes));
writeStaleMsgs := map staleWriteMsg (Set.toList staleWriteLocations);
staleMsgs := readStaleMsgs ++ writeStaleMsgs;
stepInput := mkPair readKey readValue;
stepResult :=
Runnable.executeStep
(ExecutorCfg.executable cfg)
(ExecutorLocalState.program_state local)
stepInput;
in case stepResult of {
| error err :=
let
local := EngineEnv.localState env;
finishedMsg :=
envelope
(ExecutorCfg.issuer cfg)
(MsgExecutor
(ExecutorMsgExecutorFinished
mkExecutorFinishedMsg@{
success := false;
values_read :=
mkPair readKey readValue :: Map.toList reads;
values_written := Map.toList writes;
}));
in some
mkActionEffect@{
env := env;
msgs := finishedMsg :: staleMsgs;
timers := [];
engines := [];
}
| ok (mkPair program' outputs) :=
let
accReads
(key : KVSKey)
(msgs : List
(EngineMsg (Anoma.PreMsg KVSKey KVSDatum Executable)))
: List
(EngineMsg (Anoma.PreMsg KVSKey KVSDatum Executable)) :=
let
msg :=
envelope
(keyToShard key)
(MsgShard
(ShardMsgKVSReadRequest
mkKVSReadRequestMsg@{
timestamp := ExecutorCfg.timestamp cfg;
key := key;
actual := true;
}));
in case
or
(Set.isMember key (ExecutorCfg.lazy_read_keys cfg))
(Set.isMember key (ExecutorCfg.eager_read_keys cfg))
of
| true := msg :: msgs
| false := msgs;
accWrites
(key : KVSKey)
(value : KVSDatum)
(msgs : List
(EngineMsg (Anoma.PreMsg KVSKey KVSDatum Executable)))
: List
(EngineMsg (Anoma.PreMsg KVSKey KVSDatum Executable)) :=
let
msg :=
envelope
(keyToShard key)
(MsgShard
(ShardMsgKVSWrite
mkKVSWriteMsg@{
timestamp := ExecutorCfg.timestamp cfg;
key := key;
datum := some value;
}));
in case
or
(Set.isMember key (ExecutorCfg.will_write_keys cfg))
(Set.isMember key (ExecutorCfg.may_write_keys cfg))
of
| true := msg :: msgs
| false := msgs;
sendHelper
(acc : Pair
(ExecutorLocalState KVSKey KVSDatum ProgramState)
(List
(EngineMsg (Anoma.PreMsg KVSKey KVSDatum Executable))))
(out : Either KVSKey (Pair KVSKey KVSDatum))
: Pair
(ExecutorLocalState KVSKey KVSDatum ProgramState)
(List
(EngineMsg
(Anoma.PreMsg KVSKey KVSDatum Executable))) :=
let
state := fst acc;
msgs := snd acc;
in case out of
| left key := mkPair state (accReads key msgs)
| right (mkPair key value) :=
let
newState :=
state@ExecutorLocalState{completed_writes := Map.insert
key
value
(ExecutorLocalState.completed_writes state)};
in mkPair newState (accWrites key value msgs);
initial :=
mkPair
local@ExecutorLocalState{
program_state := program';
completed_reads := Map.insert
readKey
readValue
(ExecutorLocalState.completed_reads local);
}
[];
final := foldl sendHelper initial outputs;
newLocalState := fst final;
msgList := snd final;
newEnv := env@EngineEnv{localState := newLocalState};
in case Runnable.halted {{rinst}} program' of {
| false :=
some
mkActionEffect@{
env := newEnv;
msgs := msgList;
timers := [];
engines := [];
}
| true :=
let
finishedMsg :=
envelope
(ExecutorCfg.issuer cfg)
(MsgExecutor
(ExecutorMsgExecutorFinished
mkExecutorFinishedMsg@{
success := true;
values_read := Map.toList reads;
values_written := Map.toList writes;
}));
in some
mkActionEffect@{
env := newEnv;
msgs := msgList ++ finishedMsg :: staleMsgs;
timers := [];
engines := [];
}
}
}
| _ := none;
processReadActionLabel
{KVSKey KVSDatum Executable ProgramState}
{{Ord KVSKey}}
{{Runnable KVSKey KVSDatum Executable ProgramState}}
: ExecutorActionExec KVSKey KVSDatum Executable ProgramState :=
Seq [processReadAction];
ExecutorGuard (KVSKey KVSDatum Executable ProgramState : Type) : Type :=
Guard
(ExecutorCfg KVSKey Executable)
(ExecutorLocalState KVSKey KVSDatum ProgramState)
ExecutorMailboxState
ExecutorTimerHandle
ExecutorActionArguments
(Anoma.PreMsg KVSKey KVSDatum Executable)
(Anoma.PreCfg KVSKey KVSDatum Executable)
(Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);
ExecutorGuardOutput (KVSKey KVSDatum Executable ProgramState : Type) : Type :=
GuardOutput
(ExecutorCfg KVSKey Executable)
(ExecutorLocalState KVSKey KVSDatum ProgramState)
ExecutorMailboxState
ExecutorTimerHandle
ExecutorActionArguments
(Anoma.PreMsg KVSKey KVSDatum Executable)
(Anoma.PreCfg KVSKey KVSDatum Executable)
(Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);
ExecutorGuardEval (KVSKey KVSDatum Executable ProgramState : Type) : Type :=
GuardEval
(ExecutorCfg KVSKey Executable)
(ExecutorLocalState KVSKey KVSDatum ProgramState)
ExecutorMailboxState
ExecutorTimerHandle
ExecutorActionArguments
(Anoma.PreMsg KVSKey KVSDatum Executable)
(Anoma.PreCfg KVSKey KVSDatum Executable)
(Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);
processReadGuard
{KVSKey KVSDatum Executable ProgramState}
{{Ord KVSKey}}
{{Runnable KVSKey KVSDatum Executable ProgramState}}
(trigger : TimestampedTrigger
ExecutorTimerHandle
(Anoma.PreMsg KVSKey KVSDatum Executable))
(cfg : EngineCfg (ExecutorCfg KVSKey Executable))
(env : ExecutorEnv KVSKey KVSDatum ProgramState)
: Option (ExecutorGuardOutput KVSKey KVSDatum Executable ProgramState) :=
case getEngineMsgFromTimestampedTrigger trigger of
| some mkEngineMsg@{
msg := MsgShard (ShardMsgKVSRead mkKVSReadMsg@{
timestamp := timestamp;
key := _;
data := _;
});
} :=
case timestamp == ExecutorCfg.timestamp (EngineCfg.cfg cfg) of {
| true :=
some
mkGuardOutput@{
action := processReadActionLabel;
args := unit;
}
| false := none
}
| _ := none;
ExecutorBehaviour (KVSKey KVSDatum Executable ProgramState : Type) : Type :=
EngineBehaviour
(ExecutorCfg KVSKey Executable)
(ExecutorLocalState KVSKey KVSDatum ProgramState)
ExecutorMailboxState
ExecutorTimerHandle
ExecutorActionArguments
(Anoma.PreMsg KVSKey KVSDatum Executable)
(Anoma.PreCfg KVSKey KVSDatum Executable)
(Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);
instance
dummyRunnable : Runnable String String ByteString String :=
mkRunnable@{
executeStep := \{_ _ _ := error "Not implemented"};
halted := \{_ := false};
startingState := "";
};
executorBehaviour : ExecutorBehaviour String String ByteString String :=
mkEngineBehaviour@{
guards := First [processReadGuard];
};