module arch.node.engines.executor_behaviour;

import arch.node.engines.executor_messages open;
import arch.node.engines.executor_config open;
import arch.node.engines.executor_environment open;
import arch.node.engines.shard_messages open;

import prelude open;
import arch.node.types.basics open;
import arch.node.types.identities open;
import arch.node.types.messages open;
import arch.node.types.engine open;
import arch.node.types.anoma as Anoma open;

syntax alias ExecutorActionArguments := Unit;

ExecutorAction (KVSKey KVSDatum Executable ProgramState : Type) : Type :=
  Action
    (ExecutorCfg KVSKey Executable)
    (ExecutorLocalState KVSKey KVSDatum ProgramState)
    ExecutorMailboxState
    ExecutorTimerHandle
    ExecutorActionArguments
    (Anoma.PreMsg KVSKey KVSDatum Executable)
    (Anoma.PreCfg KVSKey KVSDatum Executable)
    (Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);

ExecutorActionInput (KVSKey KVSDatum Executable ProgramState : Type) : Type :=
  ActionInput
    (ExecutorCfg KVSKey Executable)
    (ExecutorLocalState KVSKey KVSDatum ProgramState)
    ExecutorMailboxState
    ExecutorTimerHandle
    ExecutorActionArguments
    (Anoma.PreMsg KVSKey KVSDatum Executable);

ExecutorActionEffect (KVSKey KVSDatum Executable ProgramState : Type) : Type :=
  ActionEffect
    (ExecutorLocalState KVSKey KVSDatum ProgramState)
    ExecutorMailboxState
    ExecutorTimerHandle
    (Anoma.PreMsg KVSKey KVSDatum Executable)
    (Anoma.PreCfg KVSKey KVSDatum Executable)
    (Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);

ExecutorActionExec (KVSKey KVSDatum Executable ProgramState : Type) : Type :=
  ActionExec
    (ExecutorCfg KVSKey Executable)
    (ExecutorLocalState KVSKey KVSDatum ProgramState)
    ExecutorMailboxState
    ExecutorTimerHandle
    ExecutorActionArguments
    (Anoma.PreMsg KVSKey KVSDatum Executable)
    (Anoma.PreCfg KVSKey KVSDatum Executable)
    (Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);

processReadAction
  {KVSKey KVSDatum Executable ProgramState}
  {{Ord KVSKey}}
  {{rinst : Runnable KVSKey KVSDatum Executable ProgramState}}
  (input : ExecutorActionInput KVSKey KVSDatum Executable ProgramState)
  : Option (ExecutorActionEffect KVSKey KVSDatum Executable ProgramState) :=
  let
    cfg := EngineCfg.cfg (ActionInput.cfg input);
    env := ActionInput.env input;
    trigger := ActionInput.trigger input;
  in case getMsgFromTimestampedTrigger trigger of
       | some (MsgShard (ShardMsgKVSRead mkKVSReadMsg@{
                                           key := readKey;
                                           data := readValue;
                                         })) :=
         let
           envelope
             (target : EngineID)
             (msg : Anoma.PreMsg KVSKey KVSDatum Executable)
             : EngineMsg (Anoma.PreMsg KVSKey KVSDatum Executable) :=
             mkEngineMsg@{
               sender := getEngineIDFromEngineCfg (ActionInput.cfg input);
               target := target;
               mailbox := some 0;
               msg := msg;
             };
           local := EngineEnv.localState env;
           reads := ExecutorLocalState.completed_reads local;
           writes := ExecutorLocalState.completed_writes local;
           
           -- Precompute messages to notify shards of stale locks
           -- These inform the shards that they can release pending locks in the
           -- case that the executor halts.
           staleReadMsg
             (key : KVSKey)
             : EngineMsg (Anoma.PreMsg KVSKey KVSDatum Executable) :=
             envelope
               (keyToShard key)
               (MsgShard
                 (ShardMsgKVSReadRequest
                   mkKVSReadRequestMsg@{
                     timestamp := ExecutorCfg.timestamp cfg;
                     key := key;
                     actual := false;
                   }));
           staleWriteMsg
             (key : KVSKey)
             : EngineMsg (Anoma.PreMsg KVSKey KVSDatum Executable) :=
             envelope
               (keyToShard key)
               (MsgShard
                 (ShardMsgKVSWrite
                   mkKVSWriteMsg@{
                     timestamp := ExecutorCfg.timestamp cfg;
                     key := key;
                     datum := none;
                   }));
           staleReadLocations :=
             Set.difference
               (ExecutorCfg.lazy_read_keys cfg)
               (Set.fromList (Map.keys reads));
           readStaleMsgs := map staleReadMsg (Set.toList staleReadLocations);
           staleWriteLocations :=
             Set.difference
               (ExecutorCfg.may_write_keys cfg)
               (Set.fromList (Map.keys writes));
           writeStaleMsgs := map staleWriteMsg (Set.toList staleWriteLocations);
           staleMsgs := readStaleMsgs ++ writeStaleMsgs;
           
           stepInput := mkPair readKey readValue;
           stepResult :=
             Runnable.executeStep
               (ExecutorCfg.executable cfg)
               (ExecutorLocalState.program_state local)
               stepInput;
         in case stepResult of {
              | error err :=
                let
                  local := EngineEnv.localState env;
                  finishedMsg :=
                    envelope
                      (ExecutorCfg.issuer cfg)
                      (MsgExecutor
                        (ExecutorMsgExecutorFinished
                          mkExecutorFinishedMsg@{
                            success := false;
                            values_read :=
                              mkPair readKey readValue :: Map.toList reads;
                            values_written := Map.toList writes;
                          }));
                in some
                  mkActionEffect@{
                    env := env;
                    msgs := finishedMsg :: staleMsgs;
                    timers := [];
                    engines := [];
                  }
              | ok (mkPair program' outputs) :=
                let
                  accReads
                    (key : KVSKey)
                    (msgs : List
                      (EngineMsg (Anoma.PreMsg KVSKey KVSDatum Executable)))
                    : List
                      (EngineMsg (Anoma.PreMsg KVSKey KVSDatum Executable)) :=
                    let
                      msg :=
                        envelope
                          (keyToShard key)
                          (MsgShard
                            (ShardMsgKVSReadRequest
                              mkKVSReadRequestMsg@{
                                timestamp := ExecutorCfg.timestamp cfg;
                                key := key;
                                actual := true;
                              }));
                    in case
                         or
                           (Set.isMember key (ExecutorCfg.lazy_read_keys cfg))
                           (Set.isMember key (ExecutorCfg.eager_read_keys cfg))
                       of
                         | true := msg :: msgs
                         | false := msgs;
                  accWrites
                    (key : KVSKey)
                    (value : KVSDatum)
                    (msgs : List
                      (EngineMsg (Anoma.PreMsg KVSKey KVSDatum Executable)))
                    : List
                      (EngineMsg (Anoma.PreMsg KVSKey KVSDatum Executable)) :=
                    let
                      msg :=
                        envelope
                          (keyToShard key)
                          (MsgShard
                            (ShardMsgKVSWrite
                              mkKVSWriteMsg@{
                                timestamp := ExecutorCfg.timestamp cfg;
                                key := key;
                                datum := some value;
                              }));
                    in case
                         or
                           (Set.isMember key (ExecutorCfg.will_write_keys cfg))
                           (Set.isMember key (ExecutorCfg.may_write_keys cfg))
                       of
                         | true := msg :: msgs
                         | false := msgs;
                  sendHelper
                    (acc : Pair
                      (ExecutorLocalState KVSKey KVSDatum ProgramState)
                      (List
                        (EngineMsg (Anoma.PreMsg KVSKey KVSDatum Executable))))
                    (out : Either KVSKey (Pair KVSKey KVSDatum))
                    : Pair
                      (ExecutorLocalState KVSKey KVSDatum ProgramState)
                      (List
                        (EngineMsg
                          (Anoma.PreMsg KVSKey KVSDatum Executable))) :=
                    let
                      state := fst acc;
                      msgs := snd acc;
                    in case out of
                         | left key := mkPair state (accReads key msgs)
                         | right (mkPair key value) :=
                           let
                             newState :=
                               state@ExecutorLocalState{completed_writes := Map.insert
                                 key
                                 value
                                 (ExecutorLocalState.completed_writes state)};
                           in mkPair newState (accWrites key value msgs);
                  initial :=
                    mkPair
                      local@ExecutorLocalState{
                        program_state := program';
                        completed_reads := Map.insert
                          readKey
                          readValue
                          (ExecutorLocalState.completed_reads local);
                      }
                      [];
                  final := foldl sendHelper initial outputs;
                  newLocalState := fst final;
                  msgList := snd final;
                  newEnv := env@EngineEnv{localState := newLocalState};
                in case Runnable.halted {{rinst}} program' of {
                     | false :=
                       some
                         mkActionEffect@{
                           env := newEnv;
                           msgs := msgList;
                           timers := [];
                           engines := [];
                         }
                     | true :=
                       let
                         finishedMsg :=
                           envelope
                             (ExecutorCfg.issuer cfg)
                             (MsgExecutor
                               (ExecutorMsgExecutorFinished
                                 mkExecutorFinishedMsg@{
                                   success := true;
                                   values_read := Map.toList reads;
                                   values_written := Map.toList writes;
                                 }));
                       in some
                         mkActionEffect@{
                           env := newEnv;
                           msgs := msgList ++ finishedMsg :: staleMsgs;
                           timers := [];
                           engines := [];
                         }
                   }
            }
       | _ := none;

processReadActionLabel
  {KVSKey KVSDatum Executable ProgramState}
  {{Ord KVSKey}}
  {{Runnable KVSKey KVSDatum Executable ProgramState}}
  : ExecutorActionExec KVSKey KVSDatum Executable ProgramState :=
  Seq [processReadAction];

ExecutorGuard (KVSKey KVSDatum Executable ProgramState : Type) : Type :=
  Guard
    (ExecutorCfg KVSKey Executable)
    (ExecutorLocalState KVSKey KVSDatum ProgramState)
    ExecutorMailboxState
    ExecutorTimerHandle
    ExecutorActionArguments
    (Anoma.PreMsg KVSKey KVSDatum Executable)
    (Anoma.PreCfg KVSKey KVSDatum Executable)
    (Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);

ExecutorGuardOutput (KVSKey KVSDatum Executable ProgramState : Type) : Type :=
  GuardOutput
    (ExecutorCfg KVSKey Executable)
    (ExecutorLocalState KVSKey KVSDatum ProgramState)
    ExecutorMailboxState
    ExecutorTimerHandle
    ExecutorActionArguments
    (Anoma.PreMsg KVSKey KVSDatum Executable)
    (Anoma.PreCfg KVSKey KVSDatum Executable)
    (Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);

ExecutorGuardEval (KVSKey KVSDatum Executable ProgramState : Type) : Type :=
  GuardEval
    (ExecutorCfg KVSKey Executable)
    (ExecutorLocalState KVSKey KVSDatum ProgramState)
    ExecutorMailboxState
    ExecutorTimerHandle
    ExecutorActionArguments
    (Anoma.PreMsg KVSKey KVSDatum Executable)
    (Anoma.PreCfg KVSKey KVSDatum Executable)
    (Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);

processReadGuard
  {KVSKey KVSDatum Executable ProgramState}
  {{Ord KVSKey}}
  {{Runnable KVSKey KVSDatum Executable ProgramState}}
  (trigger : TimestampedTrigger
    ExecutorTimerHandle
    (Anoma.PreMsg KVSKey KVSDatum Executable))
  (cfg : EngineCfg (ExecutorCfg KVSKey Executable))
  (env : ExecutorEnv KVSKey KVSDatum ProgramState)
  : Option (ExecutorGuardOutput KVSKey KVSDatum Executable ProgramState) :=
  case getEngineMsgFromTimestampedTrigger trigger of
    | some mkEngineMsg@{
             msg := MsgShard (ShardMsgKVSRead mkKVSReadMsg@{
                                                timestamp := timestamp;
                                                key := _;
                                                data := _;
                                              });
           } :=
      case timestamp == ExecutorCfg.timestamp (EngineCfg.cfg cfg) of {
        | true :=
          some
            mkGuardOutput@{
              action := processReadActionLabel;
              args := unit;
            }
        | false := none
      }
    | _ := none;

ExecutorBehaviour (KVSKey KVSDatum Executable ProgramState : Type) : Type :=
  EngineBehaviour
    (ExecutorCfg KVSKey Executable)
    (ExecutorLocalState KVSKey KVSDatum ProgramState)
    ExecutorMailboxState
    ExecutorTimerHandle
    ExecutorActionArguments
    (Anoma.PreMsg KVSKey KVSDatum Executable)
    (Anoma.PreCfg KVSKey KVSDatum Executable)
    (Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);

instance
dummyRunnable : Runnable String String ByteString String :=
  mkRunnable@{
    executeStep := \{_ _ _ := error "Not implemented"};
    halted := \{_ := false};
    startingState := "";
  };

executorBehaviour : ExecutorBehaviour String String ByteString String :=
  mkEngineBehaviour@{
    guards := First [processReadGuard];
  };