module arch.node.engines.mempool_worker_behaviour;

import arch.node.engines.mempool_worker_messages open;
import arch.node.engines.mempool_worker_config open;
import arch.node.engines.mempool_worker_environment open;
import arch.node.engines.shard_messages open;
import arch.node.engines.executor_messages open;
import arch.node.engines.executor_config open;
import arch.node.engines.executor_environment open;

import prelude open;
import Stdlib.Data.Nat open;
import Stdlib.Data.List as List;
import arch.node.types.basics open;
import arch.node.types.identities open;
import arch.node.types.messages open;
import arch.node.types.engine open;
import arch.node.types.anoma as Anoma open;

axiom sign
  {KVSKey Executable}
  : TxFingerprint -> TransactionCandidate KVSKey KVSKey Executable -> Signature;

axiom hash
  {KVSKey Executable}
  : TxFingerprint -> TransactionCandidate KVSKey KVSKey Executable -> Hash;

syntax alias MempoolWorkerActionArgument := Unit;

MempoolWorkerActionArguments : Type := List MempoolWorkerActionArgument;

MempoolWorkerAction (KVSKey KVSDatum Executable ProgramState : Type) : Type :=
  Action
    MempoolWorkerCfg
    (MempoolWorkerLocalState KVSKey KVSDatum Executable)
    MempoolWorkerMailboxState
    MempoolWorkerTimerHandle
    MempoolWorkerActionArguments
    (Anoma.PreMsg KVSKey KVSDatum Executable)
    (Anoma.PreCfg KVSKey KVSDatum Executable)
    (Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);

MempoolWorkerActionInput (KVSKey KVSDatum Executable : Type) : Type :=
  ActionInput
    MempoolWorkerCfg
    (MempoolWorkerLocalState KVSKey KVSDatum Executable)
    MempoolWorkerMailboxState
    MempoolWorkerTimerHandle
    MempoolWorkerActionArguments
    (Anoma.PreMsg KVSKey KVSDatum Executable);

MempoolWorkerActionEffect
  (KVSKey KVSDatum Executable ProgramState : Type) : Type :=
  ActionEffect
    (MempoolWorkerLocalState KVSKey KVSDatum Executable)
    MempoolWorkerMailboxState
    MempoolWorkerTimerHandle
    (Anoma.PreMsg KVSKey KVSDatum Executable)
    (Anoma.PreCfg KVSKey KVSDatum Executable)
    (Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);

MempoolWorkerActionExec
  (KVSKey KVSDatum Executable ProgramState : Type) : Type :=
  ActionExec
    MempoolWorkerCfg
    (MempoolWorkerLocalState KVSKey KVSDatum Executable)
    MempoolWorkerMailboxState
    MempoolWorkerTimerHandle
    MempoolWorkerActionArguments
    (Anoma.PreMsg KVSKey KVSDatum Executable)
    (Anoma.PreCfg KVSKey KVSDatum Executable)
    (Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);

transactionRequestAction
  {KVSKey KVSDatum Executable ProgramState}
  {{Ord KVSKey}}
  {{rinst : Runnable KVSKey KVSDatum Executable ProgramState}}
  (input : MempoolWorkerActionInput KVSKey KVSDatum Executable)
  : Option
    (MempoolWorkerActionEffect KVSKey KVSDatum Executable ProgramState) :=
  let
    env := ActionInput.env input;
    cfg := ActionInput.cfg input;
    local := EngineEnv.localState env;
    trigger := ActionInput.trigger input;
  in case getEngineMsgFromTimestampedTrigger trigger of
       | some emsg :=
         case emsg of {
           | mkEngineMsg@{
               msg := Anoma.MsgMempoolWorker (MempoolWorkerMsgTransactionRequest request);
               sender := sender;
             } :=
             let
               fingerprint := MempoolWorkerLocalState.gensym local + 1;
               worker_id := getEngineIDFromEngineCfg cfg;
               candidate := TransactionRequest.tx request;
               executor_name := nameGen "executor" (snd worker_id) worker_id;
               executor_id := mkPair none executor_name;
               executorCfg :=
                 Anoma.CfgExecutor
                   mkExecutorCfg@{
                     timestamp := fingerprint;
                     executable := TransactionCandidate.executable candidate;
                     lazy_read_keys := Set.empty;
                     eager_read_keys :=
                       Set.fromList
                         (TransactionLabel.read
                           (TransactionCandidate.label candidate));
                     will_write_keys :=
                       Set.fromList
                         (TransactionLabel.write
                           (TransactionCandidate.label candidate));
                     may_write_keys := Set.empty;
                     worker := worker_id;
                     issuer := sender;
                   };
               executorEnv :=
                 Anoma.EnvExecutor
                   mkEngineEnv@{
                     localState :=
                       mkExecutorLocalState@{
                         program_state := Runnable.startingState {{rinst}};
                         completed_reads := Map.empty;
                         completed_writes := Map.empty;
                       };
                     mailboxCluster := Map.empty;
                     acquaintances := Set.empty;
                     timers := [];
                   };
               newState :=
                 local@MempoolWorkerLocalState{
                   gensym := fingerprint;
                   transactions := Map.insert
                     fingerprint
                     candidate
                     (MempoolWorkerLocalState.transactions local);
                   transactionEngines := Map.insert
                     executor_id
                     fingerprint
                     (MempoolWorkerLocalState.transactionEngines local);
                 };
               newEnv := env@EngineEnv{localState := newState};
               read_keys :=
                 Set.fromList
                   (TransactionLabel.read
                     (TransactionCandidate.label candidate));
               write_keys :=
                 Set.fromList
                   (TransactionLabel.write
                     (TransactionCandidate.label candidate));
               shards :=
                 Set.toList
                   (Set.map keyToShard (Set.union read_keys write_keys));
               shardMsgs :=
                 map
                   \{shard :=
                     let
                       shard_read_keys :=
                         Set.filter
                           \{key := snd (keyToShard key) == snd shard}
                           read_keys;
                       shard_write_keys :=
                         Set.filter
                           \{key := snd (keyToShard key) == snd shard}
                           write_keys;
                       lockRequest :=
                         mkKVSAcquireLockMsg@{
                           lazy_read_keys := Set.empty;
                           eager_read_keys := shard_read_keys;
                           will_write_keys := shard_write_keys;
                           may_write_keys := Set.empty;
                           worker := worker_id;
                           executor := executor_id;
                           timestamp := fingerprint;
                         };
                     in mkEngineMsg@{
                          sender := worker_id;
                          target := shard;
                          mailbox := some 0;
                          msg :=
                            Anoma.MsgShard (ShardMsgKVSAcquireLock lockRequest);
                        }}
                   shards;
               ackMsg :=
                 mkEngineMsg@{
                   sender := worker_id;
                   target := sender;
                   mailbox := some 0;
                   msg :=
                     Anoma.MsgMempoolWorker
                       (MempoolWorkerMsgTransactionAck
                         mkTransactionAck@{
                           tx_hash := hash fingerprint candidate;
                           batch_number :=
                             MempoolWorkerLocalState.batch_number local;
                           batch_start := 0;
                           worker_id := worker_id;
                           signature := sign fingerprint candidate;
                         });
                 };
             in some
               mkActionEffect@{
                 env := newEnv;
                 msgs := ackMsg :: shardMsgs;
                 timers := [];
                 engines := [mkPair executorCfg executorEnv];
               }
           | _ := none
         }
       | _ := none;

allLocksAcquired
  {KVSKey Executable}
  (isWrite : Bool)
  (tx : TransactionCandidate KVSKey KVSKey Executable)
  (txNum : TxFingerprint)
  (locks : List (Pair EngineID KVSLockAcquiredMsg))
  : Bool :=
  let
    keys :=
      case isWrite of
        | true := TransactionLabel.write (TransactionCandidate.label tx)
        | false := TransactionLabel.read (TransactionCandidate.label tx);
    neededShards := Set.fromList (map keyToShard keys);
    lockingShards :=
      Set.fromList
        (map
          fst
          (List.filter
            \{lock := KVSLockAcquiredMsg.timestamp (snd lock) == txNum}
            locks));
  in Set.isSubset neededShards lockingShards;

--- Finds the highest transaction fingerprint N such that all transactions with fingerprints 1..N
--- have acquired all their necessary locks of the specified type (read or write). This represents
--- the "safe point" up to which shards can process transactions without worrying about missing locks.
terminating
findMaxConsecutiveLocked
  {KVSKey Executable}
  (isWrite : Bool)
  (transactions : Map
    TxFingerprint
    (TransactionCandidate KVSKey KVSKey Executable))
  (locks : List (Pair EngineID KVSLockAcquiredMsg))
  (current : TxFingerprint)
  (prev : TxFingerprint)
  : TxFingerprint :=
  case Map.lookup current transactions of
    | none := prev
    | some tx :=
      case allLocksAcquired isWrite tx current locks of
        | true :=
          findMaxConsecutiveLocked
            isWrite
            transactions
            locks
            (current + 1)
            current
        | false := prev;

getAllShards
  {KVSKey Executable}
  (transactions : Map
    TxFingerprint
    (TransactionCandidate KVSKey KVSKey Executable))
  : Set EngineID :=
  let
    getAllKeysFromLabel
      (label : TransactionLabel KVSKey KVSKey) : List KVSKey :=
      TransactionLabel.read label ++ TransactionLabel.write label;
    allKeys :=
      List.concatMap
        \{tx := getAllKeysFromLabel (TransactionCandidate.label tx)}
        (Map.values transactions);
  in Set.fromList (map keyToShard allKeys);

lockAcquiredAction
  {KVSKey KVSDatum Executable ProgramState}
  (input : MempoolWorkerActionInput KVSKey KVSDatum Executable)
  : Option
    (MempoolWorkerActionEffect KVSKey KVSDatum Executable ProgramState) :=
  let
    env := ActionInput.env input;
    local := EngineEnv.localState env;
    trigger := ActionInput.trigger input;
  in case getEngineMsgFromTimestampedTrigger trigger of
       | some emsg :=
         case emsg of {
           | mkEngineMsg@{
               msg := Anoma.MsgShard (ShardMsgKVSLockAcquired lockMsg);
               sender := sender;
             } :=
             let
               timestamp := KVSLockAcquiredMsg.timestamp lockMsg;
               newLocks :=
                 mkPair sender lockMsg
                   :: MempoolWorkerLocalState.locks_acquired local;
               maxConsecutiveWrite :=
                 findMaxConsecutiveLocked
                   true
                   (MempoolWorkerLocalState.transactions local)
                   newLocks
                   1
                   0;
               maxConsecutiveRead :=
                 findMaxConsecutiveLocked
                   false
                   (MempoolWorkerLocalState.transactions local)
                   newLocks
                   1
                   0;
               newState :=
                 local@MempoolWorkerLocalState{
                   locks_acquired := newLocks;
                   seen_all_writes := maxConsecutiveWrite;
                   seen_all_reads := maxConsecutiveRead;
                 };
               newEnv := env@EngineEnv{localState := newState};
               allShards :=
                 getAllShards (MempoolWorkerLocalState.transactions local);
               makeUpdateMsg
                 (target : EngineID)
                 (isWrite : Bool)
                 (timestamp : TxFingerprint)
                 : EngineMsg (Anoma.PreMsg KVSKey KVSDatum Executable) :=
                 mkEngineMsg@{
                   sender := getEngineIDFromEngineCfg (ActionInput.cfg input);
                   target := target;
                   mailbox := some 0;
                   msg :=
                     Anoma.MsgShard
                       (ShardMsgUpdateSeenAll
                         mkUpdateSeenAllMsg@{
                           timestamp := timestamp;
                           write := isWrite;
                         });
                 };
               writeMessages :=
                 map
                   \{shard := makeUpdateMsg shard true maxConsecutiveWrite}
                   (Set.toList allShards);
               readMessages :=
                 map
                   \{shard := makeUpdateMsg shard false maxConsecutiveRead}
                   (Set.toList allShards);
             in some
               mkActionEffect@{
                 env := newEnv;
                 msgs := writeMessages ++ readMessages;
                 timers := [];
                 engines := [];
               }
           | _ := none
         }
       | _ := none;

executorFinishedAction
  {KVSKey KVSDatum Executable ProgramState}
  (input : MempoolWorkerActionInput KVSKey KVSDatum Executable)
  : Option
    (MempoolWorkerActionEffect KVSKey KVSDatum Executable ProgramState) :=
  let
    env := ActionInput.env input;
    local := EngineEnv.localState env;
    trigger := ActionInput.trigger input;
  in case getEngineMsgFromTimestampedTrigger trigger of
       | some emsg :=
         case emsg of {
           | mkEngineMsg@{
               msg := Anoma.MsgExecutor (ExecutorMsgExecutorFinished summary);
               sender := sender;
             } :=
             case
               Map.lookup
                 sender
                 (MempoolWorkerLocalState.transactionEngines local)
             of {
               | some tr :=
                 let
                   newState :=
                     local@MempoolWorkerLocalState{execution_summaries := Map.insert
                       tr
                       summary
                       (MempoolWorkerLocalState.execution_summaries local)};
                   newEnv := env@EngineEnv{localState := newState};
                 in some
                   mkActionEffect@{
                     env := newEnv;
                     msgs := [];
                     timers := [];
                     engines := [];
                   }
               | _ := none
             }
           | _ := none
         }
       | _ := none;

transactionRequestActionLabel
  {KVSKey KVSDatum Executable ProgramState}
  {{Ord KVSKey}}
  {{Runnable KVSKey KVSDatum Executable ProgramState}}
  : MempoolWorkerActionExec KVSKey KVSDatum Executable ProgramState :=
  Seq [transactionRequestAction];

lockAcquiredActionLabel
  {KVSKey KVSDatum Executable ProgramState}
  : MempoolWorkerActionExec KVSKey KVSDatum Executable ProgramState :=
  Seq [lockAcquiredAction];

executorFinishedActionLabel
  {KVSKey KVSDatum Executable ProgramState}
  : MempoolWorkerActionExec KVSKey KVSDatum Executable ProgramState :=
  Seq [executorFinishedAction];

MempoolWorkerGuard (KVSKey KVSDatum Executable ProgramState : Type) : Type :=
  Guard
    MempoolWorkerCfg
    (MempoolWorkerLocalState KVSKey KVSDatum Executable)
    MempoolWorkerMailboxState
    MempoolWorkerTimerHandle
    MempoolWorkerActionArguments
    (Anoma.PreMsg KVSKey KVSDatum Executable)
    (Anoma.PreCfg KVSKey KVSDatum Executable)
    (Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);

MempoolWorkerGuardOutput
  (KVSKey KVSDatum Executable ProgramState : Type) : Type :=
  GuardOutput
    MempoolWorkerCfg
    (MempoolWorkerLocalState KVSKey KVSDatum Executable)
    MempoolWorkerMailboxState
    MempoolWorkerTimerHandle
    MempoolWorkerActionArguments
    (Anoma.PreMsg KVSKey KVSDatum Executable)
    (Anoma.PreCfg KVSKey KVSDatum Executable)
    (Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);

MempoolWorkerGuardEval
  (KVSKey KVSDatum Executable ProgramState : Type) : Type :=
  GuardEval
    MempoolWorkerCfg
    (MempoolWorkerLocalState KVSKey KVSDatum Executable)
    MempoolWorkerMailboxState
    MempoolWorkerTimerHandle
    MempoolWorkerActionArguments
    (Anoma.PreMsg KVSKey KVSDatum Executable)
    (Anoma.PreCfg KVSKey KVSDatum Executable)
    (Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);

transactionRequestGuard
  {KVSKey KVSDatum Executable ProgramState}
  {{Ord KVSKey}}
  {{Runnable KVSKey KVSDatum Executable ProgramState}}
  (trigger : TimestampedTrigger
    MempoolWorkerTimerHandle
    (Anoma.PreMsg KVSKey KVSDatum Executable))
  (cfg : EngineCfg MempoolWorkerCfg)
  (env : MempoolWorkerEnv KVSKey KVSDatum Executable)
  : Option (MempoolWorkerGuardOutput KVSKey KVSDatum Executable ProgramState) :=
  case getEngineMsgFromTimestampedTrigger trigger of
    | some mkEngineMsg@{
             msg := Anoma.MsgMempoolWorker (MempoolWorkerMsgTransactionRequest _);
           } :=
      some
        mkGuardOutput@{
          action := transactionRequestActionLabel;
          args := [];
        }
    | _ := none;

lockAcquiredGuard
  {KVSKey KVSDatum Executable ProgramState}
  (trigger : TimestampedTrigger
    MempoolWorkerTimerHandle
    (Anoma.PreMsg KVSKey KVSDatum Executable))
  (cfg : EngineCfg MempoolWorkerCfg)
  (env : MempoolWorkerEnv KVSKey KVSDatum Executable)
  : Option (MempoolWorkerGuardOutput KVSKey KVSDatum Executable ProgramState) :=
  case getEngineMsgFromTimestampedTrigger trigger of
    | some mkEngineMsg@{msg := Anoma.MsgShard (ShardMsgKVSLockAcquired _)} :=
      some
        mkGuardOutput@{
          action := lockAcquiredActionLabel;
          args := [];
        }
    | _ := none;

executorFinishedGuard
  {KVSKey KVSDatum Executable ProgramState}
  (trigger : TimestampedTrigger
    MempoolWorkerTimerHandle
    (Anoma.PreMsg KVSKey KVSDatum Executable))
  (cfg : EngineCfg MempoolWorkerCfg)
  (env : MempoolWorkerEnv KVSKey KVSDatum Executable)
  : Option (MempoolWorkerGuardOutput KVSKey KVSDatum Executable ProgramState) :=
  case getEngineMsgFromTimestampedTrigger trigger of
    | some mkEngineMsg@{
             msg := Anoma.MsgExecutor (ExecutorMsgExecutorFinished _);
           } :=
      some
        mkGuardOutput@{
          action := executorFinishedActionLabel;
          args := [];
        }
    | _ := none;

MempoolWorkerBehaviour
  (KVSKey KVSDatum Executable ProgramState : Type) : Type :=
  EngineBehaviour
    MempoolWorkerCfg
    (MempoolWorkerLocalState KVSKey KVSDatum Executable)
    MempoolWorkerMailboxState
    MempoolWorkerTimerHandle
    MempoolWorkerActionArguments
    (Anoma.PreMsg KVSKey KVSDatum Executable)
    (Anoma.PreCfg KVSKey KVSDatum Executable)
    (Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);

instance
dummyRunnable : Runnable String String ByteString String :=
  mkRunnable@{
    executeStep := \{_ _ _ := error "Not implemented"};
    halted := \{_ := false};
    startingState := "";
  };

mempoolWorkerBehaviour
  : MempoolWorkerBehaviour String String ByteString String :=
  mkEngineBehaviour@{
    guards :=
      First [transactionRequestGuard; lockAcquiredGuard; executorFinishedGuard];
  };