module arch.node.engines.executor_behaviour;

import Stdlib.Data.Set as Set;
import arch.node.engines.executor_messages open;
import arch.node.engines.executor_config open;
import arch.node.engines.executor_environment open;
import arch.node.engines.shard_messages open;

import prelude open;
import arch.node.types.basics open;
import arch.node.types.identities open;
import arch.node.types.messages open;
import arch.node.types.engine open;
import arch.node.types.anoma as Anoma open;

import arch.system.state.resource_machine.notes.runnable open;

syntax alias ExecutorActionArguments := Unit;

ExecutorAction : Type :=
  Action
    ExecutorLocalCfg
    ExecutorLocalState
    ExecutorMailboxState
    ExecutorTimerHandle
    ExecutorActionArguments
    Anoma.Msg
    Anoma.Cfg
    Anoma.Env;

ExecutorActionInput : Type :=
  ActionInput
    ExecutorLocalCfg
    ExecutorLocalState
    ExecutorMailboxState
    ExecutorTimerHandle
    ExecutorActionArguments
    Anoma.Msg;

ExecutorActionEffect : Type :=
  ActionEffect
    ExecutorLocalState
    ExecutorMailboxState
    ExecutorTimerHandle
    Anoma.Msg
    Anoma.Cfg
    Anoma.Env;

ExecutorActionExec : Type :=
  ActionExec
    ExecutorLocalCfg
    ExecutorLocalState
    ExecutorMailboxState
    ExecutorTimerHandle
    ExecutorActionArguments
    Anoma.Msg
    Anoma.Cfg
    Anoma.Env;

processReadAction
  (input : ExecutorActionInput) : Option (ExecutorActionEffect) :=
  let
    cfg := EngineCfg.cfg (ActionInput.cfg input);
    env := ActionInput.env input;
    trigger := ActionInput.trigger input;
  in case getMsgFromTimestampedTrigger trigger of
       | some (Msg.Shard (ShardMsg.KVSRead KVSReadMsg.mkKVSReadMsg@{
                                             key := readKey;
                                             data := readValue;
                                           })) :=
         let
           envelope
             (target : EngineID) (msg : Anoma.Msg) : EngineMsg Anoma.Msg :=
             EngineMsg.mk@{
               sender := getEngineIDFromEngineCfg (ActionInput.cfg input);
               target := target;
               mailbox := some 0;
               msg := msg;
             };
           local := EngineEnv.localState env;
           reads := ExecutorLocalState.completed_reads local;
           writes := ExecutorLocalState.completed_writes local;
           
           -- Precompute messages to notify shards of stale locks
           -- These inform the shards that they can release pending locks in the
           -- case that the executor halts.
           staleReadMsg (key : KVSKey) : EngineMsg Anoma.Msg :=
             envelope
               (ExecutorLocalCfg.keyToShard cfg key)
               (Msg.Shard
                 (ShardMsg.KVSReadRequest
                   KVSReadRequestMsg.mkKVSReadRequestMsg@{
                     timestamp := ExecutorLocalCfg.timestamp cfg;
                     key := key;
                     actual := false;
                   }));
           staleWriteMsg (key : KVSKey) : EngineMsg Anoma.Msg :=
             envelope
               (ExecutorLocalCfg.keyToShard cfg key)
               (Msg.Shard
                 (ShardMsg.KVSWrite
                   KVSWriteMsg.mkKVSWriteMsg@{
                     timestamp := ExecutorLocalCfg.timestamp cfg;
                     key := key;
                     datum := none;
                   }));
           staleReadLocations :=
             Set.difference
               (ExecutorLocalCfg.lazy_read_keys cfg)
               (Set.fromList (Map.keys reads));
           readStaleMsgs := map staleReadMsg (Set.toList staleReadLocations);
           staleWriteLocations :=
             Set.difference
               (ExecutorLocalCfg.may_write_keys cfg)
               (Set.fromList (Map.keys writes));
           writeStaleMsgs := map staleWriteMsg (Set.toList staleWriteLocations);
           staleMsgs := readStaleMsgs ++ writeStaleMsgs;
           
           stepInput := mkPair readKey readValue;
           stepResult :=
             Runnable.executeStep
               (ExecutorLocalCfg.executable cfg)
               (ExecutorLocalState.program_state local)
               stepInput;
         in case stepResult of {
              | error err :=
                let
                  local := EngineEnv.localState env;
                  finishedMsg :=
                    envelope
                      (ExecutorLocalCfg.issuer cfg)
                      (Msg.Executor
                        (ExecutorMsg.ExecutorFinished
                          ExecutorFinishedMsg.mkExecutorFinishedMsg@{
                            success := false;
                            values_read :=
                              mkPair readKey readValue :: Map.toList reads;
                            values_written := Map.toList writes;
                          }));
                in some
                  ActionEffect.mk@{
                    env := env;
                    msgs := finishedMsg :: staleMsgs;
                    timers := [];
                    engines := [];
                  }
              | ok (mkPair program' outputs) :=
                let
                  accReads
                    (key : KVSKey)
                    (msgs : List (EngineMsg Anoma.Msg))
                    : List (EngineMsg Anoma.Msg) :=
                    let
                      msg :=
                        envelope
                          (ExecutorLocalCfg.keyToShard cfg key)
                          (Msg.Shard
                            (ShardMsg.KVSReadRequest
                              KVSReadRequestMsg.mkKVSReadRequestMsg@{
                                timestamp := ExecutorLocalCfg.timestamp cfg;
                                key := key;
                                actual := true;
                              }));
                    in case
                         or
                           (Set.isMember
                             key
                             (ExecutorLocalCfg.lazy_read_keys cfg))
                           (Set.isMember
                             key
                             (ExecutorLocalCfg.eager_read_keys cfg))
                       of
                         | true := msg :: msgs
                         | false := msgs;
                  accWrites
                    (key : KVSKey)
                    (value : KVSDatum)
                    (msgs : List (EngineMsg Anoma.Msg))
                    : List (EngineMsg Anoma.Msg) :=
                    let
                      msg :=
                        envelope
                          (ExecutorLocalCfg.keyToShard cfg key)
                          (Msg.Shard
                            (ShardMsg.KVSWrite
                              KVSWriteMsg.mkKVSWriteMsg@{
                                timestamp := ExecutorLocalCfg.timestamp cfg;
                                key := key;
                                datum := some value;
                              }));
                    in case
                         or
                           (Set.isMember
                             key
                             (ExecutorLocalCfg.will_write_keys cfg))
                           (Set.isMember
                             key
                             (ExecutorLocalCfg.may_write_keys cfg))
                       of
                         | true := msg :: msgs
                         | false := msgs;
                  sendHelper
                    (acc : Pair ExecutorLocalState (List (EngineMsg Anoma.Msg)))
                    (out : Either KVSKey (Pair KVSKey KVSDatum))
                    : Pair ExecutorLocalState (List (EngineMsg Anoma.Msg)) :=
                    let
                      state := fst acc;
                      msgs := snd acc;
                    in case out of
                         | left key := mkPair state (accReads key msgs)
                         | right (mkPair key value) :=
                           let
                             newState :=
                               state@ExecutorLocalState{completed_writes := Map.insert
                                 key
                                 value
                                 (ExecutorLocalState.completed_writes state)};
                           in mkPair newState (accWrites key value msgs);
                  initial :=
                    mkPair
                      local@ExecutorLocalState{
                        program_state := program';
                        completed_reads := Map.insert
                          readKey
                          readValue
                          (ExecutorLocalState.completed_reads local);
                      }
                      [];
                  final := foldl sendHelper initial outputs;
                  newLocalState := fst final;
                  msgList := snd final;
                  newEnv := env@EngineEnv{localState := newLocalState};
                in case Runnable.halted program' of {
                     | false :=
                       some
                         ActionEffect.mk@{
                           env := newEnv;
                           msgs := msgList;
                           timers := [];
                           engines := [];
                         }
                     | true :=
                       let
                         finishedMsg :=
                           envelope
                             (ExecutorLocalCfg.issuer cfg)
                             (Msg.Executor
                               (ExecutorMsg.ExecutorFinished
                                 ExecutorFinishedMsg.mkExecutorFinishedMsg@{
                                   success := true;
                                   values_read := Map.toList reads;
                                   values_written := Map.toList writes;
                                 }));
                       in some
                         ActionEffect.mk@{
                           env := newEnv;
                           msgs := msgList ++ finishedMsg :: staleMsgs;
                           timers := [];
                           engines := [];
                         }
                   }
            }
       | _ := none;

processReadActionLabel : ExecutorActionExec :=
  ActionExec.Seq [processReadAction];

ExecutorGuard : Type :=
  Guard
    ExecutorLocalCfg
    ExecutorLocalState
    ExecutorMailboxState
    ExecutorTimerHandle
    ExecutorActionArguments
    Anoma.Msg
    Anoma.Cfg
    Anoma.Env;

ExecutorGuardOutput : Type :=
  GuardOutput
    ExecutorLocalCfg
    ExecutorLocalState
    ExecutorMailboxState
    ExecutorTimerHandle
    ExecutorActionArguments
    Anoma.Msg
    Anoma.Cfg
    Anoma.Env;

ExecutorGuardEval : Type :=
  GuardEval
    ExecutorLocalCfg
    ExecutorLocalState
    ExecutorMailboxState
    ExecutorTimerHandle
    ExecutorActionArguments
    Anoma.Msg
    Anoma.Cfg
    Anoma.Env;

processReadGuard
  (trigger : TimestampedTrigger ExecutorTimerHandle Anoma.Msg)
  (cfg : ExecutorCfg)
  (env : ExecutorEnv)
  : Option ExecutorGuardOutput :=
  case getEngineMsgFromTimestampedTrigger trigger of
    | some EngineMsg.mk@{
             msg := Msg.Shard (ShardMsg.KVSRead KVSReadMsg.mkKVSReadMsg@{
                                                  timestamp := timestamp;
                                                  key := _;
                                                  data := _;
                                                });
           } :=
      case timestamp == ExecutorLocalCfg.timestamp (EngineCfg.cfg cfg) of {
        | true :=
          some
            GuardOutput.mk@{
              action := processReadActionLabel;
              args := unit;
            }
        | false := none
      }
    | _ := none;

ExecutorBehaviour : Type :=
  EngineBehaviour
    ExecutorLocalCfg
    ExecutorLocalState
    ExecutorMailboxState
    ExecutorTimerHandle
    ExecutorActionArguments
    Anoma.Msg
    Anoma.Cfg
    Anoma.Env;

executorBehaviour : ExecutorBehaviour :=
  EngineBehaviour.mk@{
    guards := GuardEval.First [processReadGuard];
  };