module arch.node.engines.mempool_worker_behaviour; import arch.node.engines.mempool_worker_messages open; import arch.node.engines.mempool_worker_config open; import arch.node.engines.mempool_worker_environment open; import arch.node.engines.shard_messages open; import arch.node.engines.executor_messages open; import arch.node.engines.executor_config open; import arch.node.engines.executor_environment open; import prelude open; import Stdlib.Data.Nat open; import Stdlib.Data.List as List; import arch.node.types.basics open; import arch.node.types.identities open; import arch.node.types.messages open; import arch.node.types.engine open; import arch.node.types.anoma as Anoma open; axiom sign : TxFingerprint -> TransactionCandidate -> Signature; axiom TChash : TxFingerprint -> TransactionCandidate -> Hash; syntax alias MempoolWorkerActionArgument := Unit; MempoolWorkerActionArguments : Type := List MempoolWorkerActionArgument; MempoolWorkerAction : Type := Action MempoolWorkerCfg MempoolWorkerLocalState MempoolWorkerMailboxState MempoolWorkerTimerHandle MempoolWorkerActionArguments Anoma.Msg Anoma.Cfg Anoma.Env; MempoolWorkerActionInput : Type := ActionInput MempoolWorkerCfg MempoolWorkerLocalState MempoolWorkerMailboxState MempoolWorkerTimerHandle MempoolWorkerActionArguments Anoma.Msg; MempoolWorkerActionEffect : Type := ActionEffect MempoolWorkerLocalState MempoolWorkerMailboxState MempoolWorkerTimerHandle Anoma.Msg Anoma.Cfg Anoma.Env; MempoolWorkerActionExec : Type := ActionExec MempoolWorkerCfg MempoolWorkerLocalState MempoolWorkerMailboxState MempoolWorkerTimerHandle MempoolWorkerActionArguments Anoma.Msg Anoma.Cfg Anoma.Env; handleTransactionRequest (input : MempoolWorkerActionInput) : Option MempoolWorkerActionEffect := let env := ActionInput.env input; cfg := ActionInput.cfg input; local := EngineEnv.localState env; trigger := ActionInput.trigger input; in case getEngineMsgFromTimestampedTrigger trigger of | some emsg := case emsg of { | mkEngineMsg@{ msg := Anoma.MsgMempoolWorker (MempoolWorkerMsgTransactionRequest request); sender := sender; } := let fingerprint := MempoolWorkerLocalState.gensym local + 1; worker_id := getEngineIDFromEngineCfg cfg; candidate := TransactionRequest.tx request; executor_name := nameGen "executor" (snd worker_id) worker_id; executor_id := mkPair none executor_name; executorCfg := Anoma.CfgExecutor mkExecutorCfg@{ timestamp := fingerprint; executable := TransactionCandidate.executable candidate; lazy_read_keys := Set.empty; eager_read_keys := Set.fromList (TransactionLabel.read (TransactionCandidate.label candidate)); will_write_keys := Set.fromList (TransactionLabel.write (TransactionCandidate.label candidate)); may_write_keys := Set.empty; worker := worker_id; issuer := sender; }; executorEnv := Anoma.EnvExecutor mkEngineEnv@{ localState := mkExecutorLocalState@{ program_state := mkProgramState@{ data := ""; halted := false; }; completed_reads := Map.empty; completed_writes := Map.empty; }; mailboxCluster := Map.empty; acquaintances := Set.empty; timers := []; }; newState := local@MempoolWorkerLocalState{ gensym := fingerprint; transactions := Map.insert fingerprint candidate (MempoolWorkerLocalState.transactions local); transactionEngines := Map.insert executor_id fingerprint (MempoolWorkerLocalState.transactionEngines local); }; newEnv := env@EngineEnv{localState := newState}; read_keys := Set.fromList (TransactionLabel.read (TransactionCandidate.label candidate)); write_keys := Set.fromList (TransactionLabel.write (TransactionCandidate.label candidate)); shards := Set.toList (Set.map keyToShard (Set.union read_keys write_keys)); shardMsgs := map \{shard := let shard_read_keys := Set.filter \{key := snd (keyToShard key) == snd shard} read_keys; shard_write_keys := Set.filter \{key := snd (keyToShard key) == snd shard} write_keys; lockRequest := mkKVSAcquireLockMsg@{ lazy_read_keys := Set.empty; eager_read_keys := shard_read_keys; will_write_keys := shard_write_keys; may_write_keys := Set.empty; worker := worker_id; executor := executor_id; timestamp := fingerprint; }; in mkEngineMsg@{ sender := worker_id; target := shard; mailbox := some 0; msg := Anoma.MsgShard (ShardMsgKVSAcquireLock lockRequest); }} shards; ackMsg := mkEngineMsg@{ sender := worker_id; target := sender; mailbox := some 0; msg := Anoma.MsgMempoolWorker (MempoolWorkerMsgTransactionAck mkTransactionAck@{ tx_hash := TChash fingerprint candidate; batch_number := MempoolWorkerLocalState.batch_number local; batch_start := 0; worker_id := worker_id; signature := sign fingerprint candidate; }); }; in some mkActionEffect@{ env := newEnv; msgs := ackMsg :: shardMsgs; timers := []; engines := [mkPair executorCfg executorEnv]; } | _ := none } | _ := none; allLocksAcquired (isWrite : Bool) (tx : TransactionCandidate) (txNum : TxFingerprint) (locks : List (Pair EngineID KVSLockAcquiredMsg)) : Bool := let keys := case isWrite of | true := TransactionLabel.write (TransactionCandidate.label tx) | false := TransactionLabel.read (TransactionCandidate.label tx); neededShards := Set.fromList (map keyToShard keys); lockingShards := Set.fromList (map fst (List.filter \{lock := KVSLockAcquiredMsg.timestamp (snd lock) == txNum} locks)); in Set.isSubset neededShards lockingShards; terminating findMaxConsecutiveLocked (isWrite : Bool) (transactions : Map TxFingerprint TransactionCandidate) (locks : List (Pair EngineID KVSLockAcquiredMsg)) (current : TxFingerprint) (prev : TxFingerprint) : TxFingerprint := case Map.lookup current transactions of | none := prev | some tx := case allLocksAcquired isWrite tx current locks of | true := findMaxConsecutiveLocked isWrite transactions locks (current + 1) current | false := prev; getAllShards (transactions : Map TxFingerprint TransactionCandidate) : Set EngineID := let getAllKeysFromLabel (label : TransactionLabel) : List KVSKey := TransactionLabel.read label ++ TransactionLabel.write label; allKeys := List.concatMap \{tx := getAllKeysFromLabel (TransactionCandidate.label tx)} (Map.values transactions); in Set.fromList (map keyToShard allKeys); handleLockAcquired (input : MempoolWorkerActionInput) : Option MempoolWorkerActionEffect := let env := ActionInput.env input; local := EngineEnv.localState env; trigger := ActionInput.trigger input; in case getEngineMsgFromTimestampedTrigger trigger of | some emsg := case emsg of { | mkEngineMsg@{ msg := Anoma.MsgShard (ShardMsgKVSLockAcquired lockMsg); sender := sender; } := let timestamp := KVSLockAcquiredMsg.timestamp lockMsg; newLocks := mkPair sender lockMsg :: MempoolWorkerLocalState.locks_acquired local; maxConsecutiveWrite := findMaxConsecutiveLocked true (MempoolWorkerLocalState.transactions local) newLocks 1 0; maxConsecutiveRead := findMaxConsecutiveLocked false (MempoolWorkerLocalState.transactions local) newLocks 1 0; newState := local@MempoolWorkerLocalState{ locks_acquired := newLocks; seen_all_writes := maxConsecutiveWrite; seen_all_reads := maxConsecutiveRead; }; newEnv := env@EngineEnv{localState := newState}; allShards := getAllShards (MempoolWorkerLocalState.transactions local); makeUpdateMsg (target : EngineID) (isWrite : Bool) (timestamp : TxFingerprint) : EngineMsg Anoma.Msg := mkEngineMsg@{ sender := getEngineIDFromEngineCfg (ActionInput.cfg input); target := target; mailbox := some 0; msg := Anoma.MsgShard (ShardMsgUpdateSeenAll mkUpdateSeenAllMsg@{ timestamp := timestamp; write := isWrite; }); }; writeMessages := map \{shard := makeUpdateMsg shard true maxConsecutiveWrite} (Set.toList allShards); readMessages := map \{shard := makeUpdateMsg shard false maxConsecutiveRead} (Set.toList allShards); in some mkActionEffect@{ env := newEnv; msgs := writeMessages ++ readMessages; timers := []; engines := []; } | _ := none } | _ := none; handleExecutorFinished (input : MempoolWorkerActionInput) : Option MempoolWorkerActionEffect := let env := ActionInput.env input; local := EngineEnv.localState env; trigger := ActionInput.trigger input; in case getEngineMsgFromTimestampedTrigger trigger of | some emsg := case emsg of { | mkEngineMsg@{ msg := Anoma.MsgExecutor (ExecutorMsgExecutorFinished summary); sender := sender; } := case Map.lookup sender (MempoolWorkerLocalState.transactionEngines local) of { | some tr := let newState := local@MempoolWorkerLocalState{execution_summaries := Map.insert tr summary (MempoolWorkerLocalState.execution_summaries local)}; newEnv := env@EngineEnv{localState := newState}; in some mkActionEffect@{ env := newEnv; msgs := []; timers := []; engines := []; } | _ := none } | _ := none } | _ := none; handleTransactionRequestLabel : MempoolWorkerActionExec := Seq [handleTransactionRequest]; handleLockAcquiredLabel : MempoolWorkerActionExec := Seq [handleLockAcquired]; handleExecutorFinishedLabel : MempoolWorkerActionExec := Seq [handleExecutorFinished]; MempoolWorkerGuard : Type := Guard MempoolWorkerCfg MempoolWorkerLocalState MempoolWorkerMailboxState MempoolWorkerTimerHandle MempoolWorkerActionArguments Anoma.Msg Anoma.Cfg Anoma.Env; MempoolWorkerGuardOutput : Type := GuardOutput MempoolWorkerCfg MempoolWorkerLocalState MempoolWorkerMailboxState MempoolWorkerTimerHandle MempoolWorkerActionArguments Anoma.Msg Anoma.Cfg Anoma.Env; MempoolWorkerGuardEval : Type := GuardEval MempoolWorkerCfg MempoolWorkerLocalState MempoolWorkerMailboxState MempoolWorkerTimerHandle MempoolWorkerActionArguments Anoma.Msg Anoma.Cfg Anoma.Env; handleTransactionRequestGuard (trigger : TimestampedTrigger MempoolWorkerTimerHandle Anoma.Msg) (cfg : EngineCfg MempoolWorkerCfg) (env : MempoolWorkerEnv) : Option MempoolWorkerGuardOutput := case getEngineMsgFromTimestampedTrigger trigger of | some mkEngineMsg@{ msg := Anoma.MsgMempoolWorker (MempoolWorkerMsgTransactionRequest _); } := some mkGuardOutput@{ action := handleTransactionRequestLabel; args := []; } | _ := none; handleLockAcquiredGuard (trigger : TimestampedTrigger MempoolWorkerTimerHandle Anoma.Msg) (cfg : EngineCfg MempoolWorkerCfg) (env : MempoolWorkerEnv) : Option MempoolWorkerGuardOutput := case getEngineMsgFromTimestampedTrigger trigger of | some mkEngineMsg@{msg := Anoma.MsgShard (ShardMsgKVSLockAcquired _)} := some mkGuardOutput@{ action := handleLockAcquiredLabel; args := []; } | _ := none; handleExecutorFinishedGuard (trigger : TimestampedTrigger MempoolWorkerTimerHandle Anoma.Msg) (cfg : EngineCfg MempoolWorkerCfg) (env : MempoolWorkerEnv) : Option MempoolWorkerGuardOutput := case getEngineMsgFromTimestampedTrigger trigger of | some mkEngineMsg@{ msg := Anoma.MsgExecutor (ExecutorMsgExecutorFinished _); } := some mkGuardOutput@{ action := handleExecutorFinishedLabel; args := []; } | _ := none; MempoolWorkerBehaviour : Type := EngineBehaviour MempoolWorkerCfg MempoolWorkerLocalState MempoolWorkerMailboxState MempoolWorkerTimerHandle MempoolWorkerActionArguments Anoma.Msg Anoma.Cfg Anoma.Env; mempoolWorkerBehaviour : MempoolWorkerBehaviour := mkEngineBehaviour@{ guards := First [ handleTransactionRequestGuard; handleLockAcquiredGuard; handleExecutorFinishedGuard; ]; };