module arch.node.engines.mempool_worker_behaviour; import arch.node.engines.mempool_worker_messages open; import arch.node.engines.mempool_worker_config open; import arch.node.engines.mempool_worker_environment open; import arch.node.engines.shard_messages open; import arch.node.engines.executor_messages open; import arch.node.engines.executor_config open; import arch.node.engines.executor_environment open; import prelude open; import Stdlib.Data.Nat open; import Stdlib.Data.List as List; import arch.node.types.basics open; import arch.node.types.identities open; import arch.node.types.messages open; import arch.node.types.engine open; import arch.node.types.anoma as Anoma open; axiom sign {KVSKey Executable} : TxFingerprint -> TransactionCandidate KVSKey KVSKey Executable -> Signature; axiom hash {KVSKey Executable} : TxFingerprint -> TransactionCandidate KVSKey KVSKey Executable -> Hash; syntax alias MempoolWorkerActionArgument := Unit; MempoolWorkerActionArguments : Type := List MempoolWorkerActionArgument; MempoolWorkerAction (KVSKey KVSDatum Executable ProgramState : Type) : Type := Action MempoolWorkerCfg (MempoolWorkerLocalState KVSKey KVSDatum Executable) MempoolWorkerMailboxState MempoolWorkerTimerHandle MempoolWorkerActionArguments (Anoma.PreMsg KVSKey KVSDatum Executable) (Anoma.PreCfg KVSKey KVSDatum Executable) (Anoma.PreEnv KVSKey KVSDatum Executable ProgramState); MempoolWorkerActionInput (KVSKey KVSDatum Executable : Type) : Type := ActionInput MempoolWorkerCfg (MempoolWorkerLocalState KVSKey KVSDatum Executable) MempoolWorkerMailboxState MempoolWorkerTimerHandle MempoolWorkerActionArguments (Anoma.PreMsg KVSKey KVSDatum Executable); MempoolWorkerActionEffect (KVSKey KVSDatum Executable ProgramState : Type) : Type := ActionEffect (MempoolWorkerLocalState KVSKey KVSDatum Executable) MempoolWorkerMailboxState MempoolWorkerTimerHandle (Anoma.PreMsg KVSKey KVSDatum Executable) (Anoma.PreCfg KVSKey KVSDatum Executable) (Anoma.PreEnv KVSKey KVSDatum Executable ProgramState); MempoolWorkerActionExec (KVSKey KVSDatum Executable ProgramState : Type) : Type := ActionExec MempoolWorkerCfg (MempoolWorkerLocalState KVSKey KVSDatum Executable) MempoolWorkerMailboxState MempoolWorkerTimerHandle MempoolWorkerActionArguments (Anoma.PreMsg KVSKey KVSDatum Executable) (Anoma.PreCfg KVSKey KVSDatum Executable) (Anoma.PreEnv KVSKey KVSDatum Executable ProgramState); transactionRequestAction {KVSKey KVSDatum Executable ProgramState} {{Ord KVSKey}} {{rinst : Runnable KVSKey KVSDatum Executable ProgramState}} (input : MempoolWorkerActionInput KVSKey KVSDatum Executable) : Option (MempoolWorkerActionEffect KVSKey KVSDatum Executable ProgramState) := let env := ActionInput.env input; cfg := ActionInput.cfg input; local := EngineEnv.localState env; trigger := ActionInput.trigger input; in case getEngineMsgFromTimestampedTrigger trigger of | some emsg := case emsg of { | mkEngineMsg@{ msg := Anoma.MsgMempoolWorker (MempoolWorkerMsgTransactionRequest request); sender := sender; } := let fingerprint := MempoolWorkerLocalState.gensym local + 1; worker_id := getEngineIDFromEngineCfg cfg; candidate := TransactionRequest.tx request; executor_name := nameGen "executor" (snd worker_id) worker_id; executor_id := mkPair none executor_name; executorCfg := Anoma.CfgExecutor mkExecutorCfg@{ timestamp := fingerprint; executable := TransactionCandidate.executable candidate; lazy_read_keys := Set.empty; eager_read_keys := Set.fromList (TransactionLabel.read (TransactionCandidate.label candidate)); will_write_keys := Set.fromList (TransactionLabel.write (TransactionCandidate.label candidate)); may_write_keys := Set.empty; worker := worker_id; issuer := sender; }; executorEnv := Anoma.EnvExecutor mkEngineEnv@{ localState := mkExecutorLocalState@{ program_state := Runnable.startingState {{rinst}}; completed_reads := Map.empty; completed_writes := Map.empty; }; mailboxCluster := Map.empty; acquaintances := Set.empty; timers := []; }; newState := local@MempoolWorkerLocalState{ gensym := fingerprint; transactions := Map.insert fingerprint candidate (MempoolWorkerLocalState.transactions local); transactionEngines := Map.insert executor_id fingerprint (MempoolWorkerLocalState.transactionEngines local); }; newEnv := env@EngineEnv{localState := newState}; read_keys := Set.fromList (TransactionLabel.read (TransactionCandidate.label candidate)); write_keys := Set.fromList (TransactionLabel.write (TransactionCandidate.label candidate)); shards := Set.toList (Set.map keyToShard (Set.union read_keys write_keys)); shardMsgs := map \{shard := let shard_read_keys := Set.filter \{key := snd (keyToShard key) == snd shard} read_keys; shard_write_keys := Set.filter \{key := snd (keyToShard key) == snd shard} write_keys; lockRequest := mkKVSAcquireLockMsg@{ lazy_read_keys := Set.empty; eager_read_keys := shard_read_keys; will_write_keys := shard_write_keys; may_write_keys := Set.empty; worker := worker_id; executor := executor_id; timestamp := fingerprint; }; in mkEngineMsg@{ sender := worker_id; target := shard; mailbox := some 0; msg := Anoma.MsgShard (ShardMsgKVSAcquireLock lockRequest); }} shards; ackMsg := mkEngineMsg@{ sender := worker_id; target := sender; mailbox := some 0; msg := Anoma.MsgMempoolWorker (MempoolWorkerMsgTransactionAck mkTransactionAck@{ tx_hash := hash fingerprint candidate; batch_number := MempoolWorkerLocalState.batch_number local; batch_start := 0; worker_id := worker_id; signature := sign fingerprint candidate; }); }; in some mkActionEffect@{ env := newEnv; msgs := ackMsg :: shardMsgs; timers := []; engines := [mkPair executorCfg executorEnv]; } | _ := none } | _ := none; allLocksAcquired {KVSKey Executable} (isWrite : Bool) (tx : TransactionCandidate KVSKey KVSKey Executable) (txNum : TxFingerprint) (locks : List (Pair EngineID KVSLockAcquiredMsg)) : Bool := let keys := case isWrite of | true := TransactionLabel.write (TransactionCandidate.label tx) | false := TransactionLabel.read (TransactionCandidate.label tx); neededShards := Set.fromList (map keyToShard keys); lockingShards := Set.fromList (map fst (List.filter \{lock := KVSLockAcquiredMsg.timestamp (snd lock) == txNum} locks)); in Set.isSubset neededShards lockingShards; --- Finds the highest transaction fingerprint N such that all transactions with fingerprints 1..N --- have acquired all their necessary locks of the specified type (read or write). This represents --- the "safe point" up to which shards can process transactions without worrying about missing locks. terminating findMaxConsecutiveLocked {KVSKey Executable} (isWrite : Bool) (transactions : Map TxFingerprint (TransactionCandidate KVSKey KVSKey Executable)) (locks : List (Pair EngineID KVSLockAcquiredMsg)) (current : TxFingerprint) (prev : TxFingerprint) : TxFingerprint := case Map.lookup current transactions of | none := prev | some tx := case allLocksAcquired isWrite tx current locks of | true := findMaxConsecutiveLocked isWrite transactions locks (current + 1) current | false := prev; getAllShards {KVSKey Executable} (transactions : Map TxFingerprint (TransactionCandidate KVSKey KVSKey Executable)) : Set EngineID := let getAllKeysFromLabel (label : TransactionLabel KVSKey KVSKey) : List KVSKey := TransactionLabel.read label ++ TransactionLabel.write label; allKeys := List.concatMap \{tx := getAllKeysFromLabel (TransactionCandidate.label tx)} (Map.values transactions); in Set.fromList (map keyToShard allKeys); lockAcquiredAction {KVSKey KVSDatum Executable ProgramState} (input : MempoolWorkerActionInput KVSKey KVSDatum Executable) : Option (MempoolWorkerActionEffect KVSKey KVSDatum Executable ProgramState) := let env := ActionInput.env input; local := EngineEnv.localState env; trigger := ActionInput.trigger input; in case getEngineMsgFromTimestampedTrigger trigger of | some emsg := case emsg of { | mkEngineMsg@{ msg := Anoma.MsgShard (ShardMsgKVSLockAcquired lockMsg); sender := sender; } := let timestamp := KVSLockAcquiredMsg.timestamp lockMsg; newLocks := mkPair sender lockMsg :: MempoolWorkerLocalState.locks_acquired local; maxConsecutiveWrite := findMaxConsecutiveLocked true (MempoolWorkerLocalState.transactions local) newLocks 1 0; maxConsecutiveRead := findMaxConsecutiveLocked false (MempoolWorkerLocalState.transactions local) newLocks 1 0; newState := local@MempoolWorkerLocalState{ locks_acquired := newLocks; seen_all_writes := maxConsecutiveWrite; seen_all_reads := maxConsecutiveRead; }; newEnv := env@EngineEnv{localState := newState}; allShards := getAllShards (MempoolWorkerLocalState.transactions local); makeUpdateMsg (target : EngineID) (isWrite : Bool) (timestamp : TxFingerprint) : EngineMsg (Anoma.PreMsg KVSKey KVSDatum Executable) := mkEngineMsg@{ sender := getEngineIDFromEngineCfg (ActionInput.cfg input); target := target; mailbox := some 0; msg := Anoma.MsgShard (ShardMsgUpdateSeenAll mkUpdateSeenAllMsg@{ timestamp := timestamp; write := isWrite; }); }; writeMessages := map \{shard := makeUpdateMsg shard true maxConsecutiveWrite} (Set.toList allShards); readMessages := map \{shard := makeUpdateMsg shard false maxConsecutiveRead} (Set.toList allShards); in some mkActionEffect@{ env := newEnv; msgs := writeMessages ++ readMessages; timers := []; engines := []; } | _ := none } | _ := none; executorFinishedAction {KVSKey KVSDatum Executable ProgramState} (input : MempoolWorkerActionInput KVSKey KVSDatum Executable) : Option (MempoolWorkerActionEffect KVSKey KVSDatum Executable ProgramState) := let env := ActionInput.env input; local := EngineEnv.localState env; trigger := ActionInput.trigger input; in case getEngineMsgFromTimestampedTrigger trigger of | some emsg := case emsg of { | mkEngineMsg@{ msg := Anoma.MsgExecutor (ExecutorMsgExecutorFinished summary); sender := sender; } := case Map.lookup sender (MempoolWorkerLocalState.transactionEngines local) of { | some tr := let newState := local@MempoolWorkerLocalState{execution_summaries := Map.insert tr summary (MempoolWorkerLocalState.execution_summaries local)}; newEnv := env@EngineEnv{localState := newState}; in some mkActionEffect@{ env := newEnv; msgs := []; timers := []; engines := []; } | _ := none } | _ := none } | _ := none; transactionRequestActionLabel {KVSKey KVSDatum Executable ProgramState} {{Ord KVSKey}} {{Runnable KVSKey KVSDatum Executable ProgramState}} : MempoolWorkerActionExec KVSKey KVSDatum Executable ProgramState := Seq [transactionRequestAction]; lockAcquiredActionLabel {KVSKey KVSDatum Executable ProgramState} : MempoolWorkerActionExec KVSKey KVSDatum Executable ProgramState := Seq [lockAcquiredAction]; executorFinishedActionLabel {KVSKey KVSDatum Executable ProgramState} : MempoolWorkerActionExec KVSKey KVSDatum Executable ProgramState := Seq [executorFinishedAction]; MempoolWorkerGuard (KVSKey KVSDatum Executable ProgramState : Type) : Type := Guard MempoolWorkerCfg (MempoolWorkerLocalState KVSKey KVSDatum Executable) MempoolWorkerMailboxState MempoolWorkerTimerHandle MempoolWorkerActionArguments (Anoma.PreMsg KVSKey KVSDatum Executable) (Anoma.PreCfg KVSKey KVSDatum Executable) (Anoma.PreEnv KVSKey KVSDatum Executable ProgramState); MempoolWorkerGuardOutput (KVSKey KVSDatum Executable ProgramState : Type) : Type := GuardOutput MempoolWorkerCfg (MempoolWorkerLocalState KVSKey KVSDatum Executable) MempoolWorkerMailboxState MempoolWorkerTimerHandle MempoolWorkerActionArguments (Anoma.PreMsg KVSKey KVSDatum Executable) (Anoma.PreCfg KVSKey KVSDatum Executable) (Anoma.PreEnv KVSKey KVSDatum Executable ProgramState); MempoolWorkerGuardEval (KVSKey KVSDatum Executable ProgramState : Type) : Type := GuardEval MempoolWorkerCfg (MempoolWorkerLocalState KVSKey KVSDatum Executable) MempoolWorkerMailboxState MempoolWorkerTimerHandle MempoolWorkerActionArguments (Anoma.PreMsg KVSKey KVSDatum Executable) (Anoma.PreCfg KVSKey KVSDatum Executable) (Anoma.PreEnv KVSKey KVSDatum Executable ProgramState); transactionRequestGuard {KVSKey KVSDatum Executable ProgramState} {{Ord KVSKey}} {{Runnable KVSKey KVSDatum Executable ProgramState}} (trigger : TimestampedTrigger MempoolWorkerTimerHandle (Anoma.PreMsg KVSKey KVSDatum Executable)) (cfg : EngineCfg MempoolWorkerCfg) (env : MempoolWorkerEnv KVSKey KVSDatum Executable) : Option (MempoolWorkerGuardOutput KVSKey KVSDatum Executable ProgramState) := case getEngineMsgFromTimestampedTrigger trigger of | some mkEngineMsg@{ msg := Anoma.MsgMempoolWorker (MempoolWorkerMsgTransactionRequest _); } := some mkGuardOutput@{ action := transactionRequestActionLabel; args := []; } | _ := none; lockAcquiredGuard {KVSKey KVSDatum Executable ProgramState} (trigger : TimestampedTrigger MempoolWorkerTimerHandle (Anoma.PreMsg KVSKey KVSDatum Executable)) (cfg : EngineCfg MempoolWorkerCfg) (env : MempoolWorkerEnv KVSKey KVSDatum Executable) : Option (MempoolWorkerGuardOutput KVSKey KVSDatum Executable ProgramState) := case getEngineMsgFromTimestampedTrigger trigger of | some mkEngineMsg@{msg := Anoma.MsgShard (ShardMsgKVSLockAcquired _)} := some mkGuardOutput@{ action := lockAcquiredActionLabel; args := []; } | _ := none; executorFinishedGuard {KVSKey KVSDatum Executable ProgramState} (trigger : TimestampedTrigger MempoolWorkerTimerHandle (Anoma.PreMsg KVSKey KVSDatum Executable)) (cfg : EngineCfg MempoolWorkerCfg) (env : MempoolWorkerEnv KVSKey KVSDatum Executable) : Option (MempoolWorkerGuardOutput KVSKey KVSDatum Executable ProgramState) := case getEngineMsgFromTimestampedTrigger trigger of | some mkEngineMsg@{ msg := Anoma.MsgExecutor (ExecutorMsgExecutorFinished _); } := some mkGuardOutput@{ action := executorFinishedActionLabel; args := []; } | _ := none; MempoolWorkerBehaviour (KVSKey KVSDatum Executable ProgramState : Type) : Type := EngineBehaviour MempoolWorkerCfg (MempoolWorkerLocalState KVSKey KVSDatum Executable) MempoolWorkerMailboxState MempoolWorkerTimerHandle MempoolWorkerActionArguments (Anoma.PreMsg KVSKey KVSDatum Executable) (Anoma.PreCfg KVSKey KVSDatum Executable) (Anoma.PreEnv KVSKey KVSDatum Executable ProgramState); instance dummyRunnable : Runnable String String ByteString String := mkRunnable@{ executeStep := \{_ _ _ := error "Not implemented"}; halted := \{_ := false}; startingState := ""; }; mempoolWorkerBehaviour : MempoolWorkerBehaviour String String ByteString String := mkEngineBehaviour@{ guards := First [transactionRequestGuard; lockAcquiredGuard; executorFinishedGuard]; };