Skip to content
Juvix imports

module arch.node.engines.executor_behaviour;

import arch.node.engines.executor_messages open;
import arch.node.engines.executor_config open;
import arch.node.engines.executor_environment open;
import arch.node.engines.shard_messages open;
import prelude open;
import arch.node.types.basics open;
import arch.node.types.identities open;
import arch.node.types.messages open;
import arch.node.types.engine open;
import arch.node.types.anoma as Anoma open;

Executor Behaviour

Overview

The executor behaviour defines how it processes incoming read responses and performs state transitions to execute the transaction program.

Action arguments

ExecutorActionArguments

syntax alias ExecutorActionArguments := Unit;

Actions

Auxiliary Juvix code

ExecutorAction

ExecutorAction : Type :=
Action
ExecutorCfg
ExecutorLocalState
ExecutorMailboxState
ExecutorTimerHandle
ExecutorActionArguments
Anoma.Msg
Anoma.Cfg
Anoma.Env;

ExecutorActionInput

ExecutorActionInput : Type :=
ActionInput
ExecutorCfg
ExecutorLocalState
ExecutorMailboxState
ExecutorTimerHandle
ExecutorActionArguments
Anoma.Msg;

ExecutorActionEffect

ExecutorActionEffect : Type :=
ActionEffect
ExecutorLocalState
ExecutorMailboxState
ExecutorTimerHandle
Anoma.Msg
Anoma.Cfg
Anoma.Env;

ExecutorActionExec

ExecutorActionExec : Type :=
ActionExec
ExecutorCfg
ExecutorLocalState
ExecutorMailboxState
ExecutorTimerHandle
ExecutorActionArguments
Anoma.Msg
Anoma.Cfg
Anoma.Env;

processReadAction

Process a read response and execute the next program step.

State update
Updates the program state with executed step results and tracks completed reads/writes
Messages to be sent
  • Read/Write messages to shards based on program outputs
  • Notification messages for stale locks if program halts
  • ExecutorFinished message if program halts
Engines to be spawned
No engines are created by this action.
Timer updates
No timers are set or cancelled.
processReadAction (input : ExecutorActionInput) : Option ExecutorActionEffect :=
let
cfg := EngineCfg.cfg (ActionInput.cfg input);
env := ActionInput.env input;
trigger := ActionInput.trigger input;
in case getMsgFromTimestampedTrigger trigger of
| some (MsgShard (ShardMsgKVSRead mkKVSReadMsg@{
key := readKey;
data := readValue;
})) :=
let
envelope
(target : EngineID) (msg : Anoma.Msg) : EngineMsg Anoma.Msg :=
mkEngineMsg@{
sender := getEngineIDFromEngineCfg (ActionInput.cfg input);
target := target;
mailbox := some 0;
msg := msg;
};
local := EngineEnv.localState env;
reads := ExecutorLocalState.completed_reads local;
writes := ExecutorLocalState.completed_writes local;
staleReadMsg (key : KVSKey) : EngineMsg Anoma.Msg :=
envelope
(keyToShard key)
(MsgShard
(ShardMsgKVSReadRequest
mkKVSReadRequestMsg@{
timestamp := ExecutorCfg.timestamp cfg;
key := key;
actual := false;
}));
staleWriteMsg (key : KVSKey) : EngineMsg Anoma.Msg :=
envelope
(keyToShard key)
(MsgShard
(ShardMsgKVSWrite
mkKVSWriteMsg@{
timestamp := ExecutorCfg.timestamp cfg;
key := key;
datum := none;
}));
staleReadLocations :=
Set.difference
(ExecutorCfg.lazy_read_keys cfg)
(Set.fromList (Map.keys reads));
readStaleMsgs := map staleReadMsg (Set.toList staleReadLocations);
staleWriteLocations :=
Set.difference
(ExecutorCfg.may_write_keys cfg)
(Set.fromList (Map.keys writes));
writeStaleMsgs := map staleWriteMsg (Set.toList staleWriteLocations);
staleMsgs := readStaleMsgs ++ writeStaleMsgs;
stepInput := mkPair readKey readValue;
stepResult :=
executeStep
(ExecutorCfg.executable cfg)
(ExecutorLocalState.program_state local)
stepInput;
in case stepResult of {
| error err :=
let
local := EngineEnv.localState env;
finishedMsg :=
envelope
(ExecutorCfg.issuer cfg)
(MsgExecutor
(ExecutorMsgExecutorFinished
mkExecutorFinishedMsg@{
success := false;
values_read :=
mkPair readKey readValue :: Map.toList reads;
values_written := Map.toList writes;
}));
in some
mkActionEffect@{
env := env;
msgs := finishedMsg :: staleMsgs;
timers := [];
engines := [];
}
| ok (mkPair program' outputs) :=
let
accReads
(key : KVSKey)
(msgs : List (EngineMsg Anoma.Msg))
: List (EngineMsg Anoma.Msg) :=
let
msg :=
envelope
(keyToShard key)
(MsgShard
(ShardMsgKVSReadRequest
mkKVSReadRequestMsg@{
timestamp := ExecutorCfg.timestamp cfg;
key := key;
actual := true;
}));
in case
or
(Set.isMember key (ExecutorCfg.lazy_read_keys cfg))
(Set.isMember key (ExecutorCfg.eager_read_keys cfg))
of
| true := msg :: msgs
| false := msgs;
accWrites
(key : KVSKey)
(value : KVSDatum)
(msgs : List (EngineMsg Anoma.Msg))
: List (EngineMsg Anoma.Msg) :=
let
msg :=
envelope
(keyToShard key)
(MsgShard
(ShardMsgKVSWrite
mkKVSWriteMsg@{
timestamp := ExecutorCfg.timestamp cfg;
key := key;
datum := some value;
}));
in case
or
(Set.isMember key (ExecutorCfg.will_write_keys cfg))
(Set.isMember key (ExecutorCfg.may_write_keys cfg))
of
| true := msg :: msgs
| false := msgs;
sendHelper
(acc : Pair ExecutorLocalState (List (EngineMsg Anoma.Msg)))
(out : Either KVSKey (Pair KVSKey KVSDatum))
: Pair ExecutorLocalState (List (EngineMsg Anoma.Msg)) :=
let
state := fst acc;
msgs := snd acc;
in case out of
| left key := mkPair state (accReads key msgs)
| right (mkPair key value) :=
let
newState :=
state@ExecutorLocalState{completed_writes := Map.insert
key
value
(ExecutorLocalState.completed_writes state)};
in mkPair newState (accWrites key value msgs);
initial :=
mkPair
local@ExecutorLocalState{
program_state := program';
completed_reads := Map.insert
readKey
readValue
(ExecutorLocalState.completed_reads local);
}
[];
final := foldl sendHelper initial outputs;
newLocalState := fst final;
msgList := snd final;
newEnv := env@EngineEnv{localState := newLocalState};
in case ProgramState.halted program' of {
| false :=
some
mkActionEffect@{
env := newEnv;
msgs := msgList;
timers := [];
engines := [];
}
| true :=
let
finishedMsg :=
envelope
(ExecutorCfg.issuer cfg)
(MsgExecutor
(ExecutorMsgExecutorFinished
mkExecutorFinishedMsg@{
success := true;
values_read := Map.toList reads;
values_written := Map.toList writes;
}));
in some
mkActionEffect@{
env := newEnv;
msgs := msgList ++ finishedMsg :: staleMsgs;
timers := [];
engines := [];
}
}
}
| _ := none;

Action Labels

processReadActionLabel : ExecutorActionExec := Seq [processReadAction];

Guards

Auxiliary Juvix code

ExecutorGuard

ExecutorGuard : Type :=
Guard
ExecutorCfg
ExecutorLocalState
ExecutorMailboxState
ExecutorTimerHandle
ExecutorActionArguments
Anoma.Msg
Anoma.Cfg
Anoma.Env;

ExecutorGuardOutput

ExecutorGuardOutput : Type :=
GuardOutput
ExecutorCfg
ExecutorLocalState
ExecutorMailboxState
ExecutorTimerHandle
ExecutorActionArguments
Anoma.Msg
Anoma.Cfg
Anoma.Env;

ExecutorGuardEval

ExecutorGuardEval : Type :=
GuardEval
ExecutorCfg
ExecutorLocalState
ExecutorMailboxState
ExecutorTimerHandle
ExecutorActionArguments
Anoma.Msg
Anoma.Cfg
Anoma.Env;

processReadGuard

Guard for processing read responses.

processReadGuard
(trigger : TimestampedTrigger ExecutorTimerHandle Anoma.Msg)
(cfg : EngineCfg ExecutorCfg)
(env : ExecutorEnv)
: Option ExecutorGuardOutput :=
case getEngineMsgFromTimestampedTrigger trigger of
| some mkEngineMsg@{
msg := MsgShard (ShardMsgKVSRead mkKVSReadMsg@{
timestamp := timestamp;
key := _;
data := _;
});
} :=
case timestamp == ExecutorCfg.timestamp (EngineCfg.cfg cfg) of {
| true :=
some
mkGuardOutput@{
action := processReadActionLabel;
args := unit;
}
| false := none
}
| _ := none;

The Executor Behaviour

ExecutorBehaviour

ExecutorBehaviour : Type :=
EngineBehaviour
ExecutorCfg
ExecutorLocalState
ExecutorMailboxState
ExecutorTimerHandle
ExecutorActionArguments
Anoma.Msg
Anoma.Cfg
Anoma.Env;

Instantiation

executorBehaviour : ExecutorBehaviour :=
mkEngineBehaviour@{
guards := First [processReadGuard];
};

Executor Action Flowcharts

processRead Flowchart

flowchart TD
  subgraph C[Conditions]
    CMsg>ShardMsgKVSRead]
  end

  G(processReadGuard)
  A(processReadAction)

  C --> G -- *processReadActionLabel* --> A --> E

  subgraph E[Effects]
    EEnv[(Update program state and tracked reads/writes)]
    EStep[(Execute step)]
    EMsg>Write messages to shards]
    EFin>ExecutorFinished if halted]
  end
processRead flowchart showing read handling and execution steps