module arch.node.engines.executor_behaviour;
import Stdlib.Data.Set as Set;
import arch.node.engines.executor_messages open;
import arch.node.engines.executor_config open;
import arch.node.engines.executor_environment open;
import arch.node.engines.shard_messages open;
import prelude open;
import arch.node.types.basics open;
import arch.node.types.identities open;
import arch.node.types.messages open;
import arch.node.types.engine open;
import arch.node.types.anoma as Anoma open;
import arch.system.state.resource_machine.notes.runnable open;
syntax alias ExecutorActionArguments := Unit;
ExecutorAction : Type :=
Action
ExecutorLocalCfg
ExecutorLocalState
ExecutorMailboxState
ExecutorTimerHandle
ExecutorActionArguments
Anoma.Msg
Anoma.Cfg
Anoma.Env;
ExecutorActionInput : Type :=
ActionInput
ExecutorLocalCfg
ExecutorLocalState
ExecutorMailboxState
ExecutorTimerHandle
ExecutorActionArguments
Anoma.Msg;
ExecutorActionEffect : Type :=
ActionEffect
ExecutorLocalState
ExecutorMailboxState
ExecutorTimerHandle
Anoma.Msg
Anoma.Cfg
Anoma.Env;
ExecutorActionExec : Type :=
ActionExec
ExecutorLocalCfg
ExecutorLocalState
ExecutorMailboxState
ExecutorTimerHandle
ExecutorActionArguments
Anoma.Msg
Anoma.Cfg
Anoma.Env;
processReadAction
(input : ExecutorActionInput) : Option (ExecutorActionEffect) :=
let
cfg := EngineCfg.cfg (ActionInput.cfg input);
env := ActionInput.env input;
trigger := ActionInput.trigger input;
in case getMsgFromTimestampedTrigger trigger of
| some (Msg.Shard (ShardMsg.KVSRead KVSReadMsg.mkKVSReadMsg@{
key := readKey;
data := readValue;
})) :=
let
envelope
(target : EngineID) (msg : Anoma.Msg) : EngineMsg Anoma.Msg :=
EngineMsg.mk@{
sender := getEngineIDFromEngineCfg (ActionInput.cfg input);
target := target;
mailbox := some 0;
msg := msg;
};
local := EngineEnv.localState env;
reads := ExecutorLocalState.completed_reads local;
writes := ExecutorLocalState.completed_writes local;
staleReadMsg (key : KVSKey) : EngineMsg Anoma.Msg :=
envelope
(ExecutorLocalCfg.keyToShard cfg key)
(Msg.Shard
(ShardMsg.KVSReadRequest
KVSReadRequestMsg.mkKVSReadRequestMsg@{
timestamp := ExecutorLocalCfg.timestamp cfg;
key := key;
actual := false;
}));
staleWriteMsg (key : KVSKey) : EngineMsg Anoma.Msg :=
envelope
(ExecutorLocalCfg.keyToShard cfg key)
(Msg.Shard
(ShardMsg.KVSWrite
KVSWriteMsg.mkKVSWriteMsg@{
timestamp := ExecutorLocalCfg.timestamp cfg;
key := key;
datum := none;
}));
staleReadLocations :=
Set.difference
(ExecutorLocalCfg.lazy_read_keys cfg)
(Set.fromList (Map.keys reads));
readStaleMsgs := map staleReadMsg (Set.toList staleReadLocations);
staleWriteLocations :=
Set.difference
(ExecutorLocalCfg.may_write_keys cfg)
(Set.fromList (Map.keys writes));
writeStaleMsgs := map staleWriteMsg (Set.toList staleWriteLocations);
staleMsgs := readStaleMsgs ++ writeStaleMsgs;
stepInput := mkPair readKey readValue;
stepResult :=
Runnable.executeStep
(ExecutorLocalCfg.executable cfg)
(ExecutorLocalState.program_state local)
stepInput;
in case stepResult of {
| error err :=
let
local := EngineEnv.localState env;
finishedMsg :=
envelope
(ExecutorLocalCfg.issuer cfg)
(Msg.Executor
(ExecutorMsg.ExecutorFinished
ExecutorFinishedMsg.mkExecutorFinishedMsg@{
success := false;
values_read :=
mkPair readKey readValue :: Map.toList reads;
values_written := Map.toList writes;
}));
in some
ActionEffect.mk@{
env := env;
msgs := finishedMsg :: staleMsgs;
timers := [];
engines := [];
}
| ok (mkPair program' outputs) :=
let
accReads
(key : KVSKey)
(msgs : List (EngineMsg Anoma.Msg))
: List (EngineMsg Anoma.Msg) :=
let
msg :=
envelope
(ExecutorLocalCfg.keyToShard cfg key)
(Msg.Shard
(ShardMsg.KVSReadRequest
KVSReadRequestMsg.mkKVSReadRequestMsg@{
timestamp := ExecutorLocalCfg.timestamp cfg;
key := key;
actual := true;
}));
in case
or
(Set.isMember
key
(ExecutorLocalCfg.lazy_read_keys cfg))
(Set.isMember
key
(ExecutorLocalCfg.eager_read_keys cfg))
of
| true := msg :: msgs
| false := msgs;
accWrites
(key : KVSKey)
(value : KVSDatum)
(msgs : List (EngineMsg Anoma.Msg))
: List (EngineMsg Anoma.Msg) :=
let
msg :=
envelope
(ExecutorLocalCfg.keyToShard cfg key)
(Msg.Shard
(ShardMsg.KVSWrite
KVSWriteMsg.mkKVSWriteMsg@{
timestamp := ExecutorLocalCfg.timestamp cfg;
key := key;
datum := some value;
}));
in case
or
(Set.isMember
key
(ExecutorLocalCfg.will_write_keys cfg))
(Set.isMember
key
(ExecutorLocalCfg.may_write_keys cfg))
of
| true := msg :: msgs
| false := msgs;
sendHelper
(acc : Pair ExecutorLocalState (List (EngineMsg Anoma.Msg)))
(out : Either KVSKey (Pair KVSKey KVSDatum))
: Pair ExecutorLocalState (List (EngineMsg Anoma.Msg)) :=
let
state := fst acc;
msgs := snd acc;
in case out of
| left key := mkPair state (accReads key msgs)
| right (mkPair key value) :=
let
newState :=
state@ExecutorLocalState{completed_writes := Map.insert
key
value
(ExecutorLocalState.completed_writes state)};
in mkPair newState (accWrites key value msgs);
initial :=
mkPair
local@ExecutorLocalState{
program_state := program';
completed_reads := Map.insert
readKey
readValue
(ExecutorLocalState.completed_reads local);
}
[];
final := foldl sendHelper initial outputs;
newLocalState := fst final;
msgList := snd final;
newEnv := env@EngineEnv{localState := newLocalState};
in case Runnable.halted program' of {
| false :=
some
ActionEffect.mk@{
env := newEnv;
msgs := msgList;
timers := [];
engines := [];
}
| true :=
let
finishedMsg :=
envelope
(ExecutorLocalCfg.issuer cfg)
(Msg.Executor
(ExecutorMsg.ExecutorFinished
ExecutorFinishedMsg.mkExecutorFinishedMsg@{
success := true;
values_read := Map.toList reads;
values_written := Map.toList writes;
}));
in some
ActionEffect.mk@{
env := newEnv;
msgs := msgList ++ finishedMsg :: staleMsgs;
timers := [];
engines := [];
}
}
}
| _ := none;
processReadActionLabel : ExecutorActionExec :=
ActionExec.Seq [processReadAction];
ExecutorGuard : Type :=
Guard
ExecutorLocalCfg
ExecutorLocalState
ExecutorMailboxState
ExecutorTimerHandle
ExecutorActionArguments
Anoma.Msg
Anoma.Cfg
Anoma.Env;
ExecutorGuardOutput : Type :=
GuardOutput
ExecutorLocalCfg
ExecutorLocalState
ExecutorMailboxState
ExecutorTimerHandle
ExecutorActionArguments
Anoma.Msg
Anoma.Cfg
Anoma.Env;
ExecutorGuardEval : Type :=
GuardEval
ExecutorLocalCfg
ExecutorLocalState
ExecutorMailboxState
ExecutorTimerHandle
ExecutorActionArguments
Anoma.Msg
Anoma.Cfg
Anoma.Env;
processReadGuard
(trigger : TimestampedTrigger ExecutorTimerHandle Anoma.Msg)
(cfg : ExecutorCfg)
(env : ExecutorEnv)
: Option ExecutorGuardOutput :=
case getEngineMsgFromTimestampedTrigger trigger of
| some EngineMsg.mk@{
msg := Msg.Shard (ShardMsg.KVSRead KVSReadMsg.mkKVSReadMsg@{
timestamp := timestamp;
key := _;
data := _;
});
} :=
case timestamp == ExecutorLocalCfg.timestamp (EngineCfg.cfg cfg) of {
| true :=
some
GuardOutput.mk@{
action := processReadActionLabel;
args := unit;
}
| false := none
}
| _ := none;
ExecutorBehaviour : Type :=
EngineBehaviour
ExecutorLocalCfg
ExecutorLocalState
ExecutorMailboxState
ExecutorTimerHandle
ExecutorActionArguments
Anoma.Msg
Anoma.Cfg
Anoma.Env;
executorBehaviour : ExecutorBehaviour :=
EngineBehaviour.mk@{
guards := GuardEval.First [processReadGuard];
};