Skip to content
Juvix Markdown error

    ***/arch/node/engines/mempool_worker_behaviour.juvix.md:446:32-38: error: Symbol not in scope: TChash Perhaps
you meant: hash

Juvix imports
module arch.node.engines.mempool_worker_behaviour;

import arch.node.engines.mempool_worker_messages open;
import arch.node.engines.mempool_worker_config open;
import arch.node.engines.mempool_worker_environment open;
import arch.node.engines.shard_messages open;
import arch.node.engines.executor_messages open;
import arch.node.engines.executor_config open;
import arch.node.engines.executor_environment open;

import prelude open;
import Stdlib.Data.Nat open;
import Stdlib.Data.List as List;
import arch.node.types.basics open;
import arch.node.types.identities open;
import arch.node.types.messages open;
import arch.node.types.engine open;
import arch.node.types.anoma as Anoma open;

Mempool Worker Behaviour

Overview

A mempool worker acts as a transaction coordinator, receiving transaction requests, managing their execution lifecycle, and coordinating with shards and executors.

Auxiliary Juvix code
axiom sign {KVSKey Executable} : TxFingerprint -> TransactionCandidate KVSKey KVSKey Executable -> Signature;
axiom hash {KVSKey Executable} : TxFingerprint -> TransactionCandidate KVSKey KVSKey Executable -> Hash;

Mempool Worker Action Flowcharts

transactionRequestAction flowchart

```mermaidflowchart TD Start([Client Request]) --> MsgReq[MempoolWorkerMsgTransactionRequest
tx: TransactionCandidate]

subgraph Guard["transactionRequestGuard"]
    MsgReq --> ValidType{Is message type
TransactionRequest?} ValidType -->|No| Reject([Reject Request]) ValidType -->|Yes| ActionEntry[Enter Action Phase] end ActionEntry --> Action subgraph Action["transactionRequestAction"] direction TB GenFP[Generate new fingerprint
from gensym] GenFP --> SpawnEx[Create Executor Config
with access rights] SpawnEx --> UpdateState[Update local state:
- Increment gensym
- Add to transactions map
- Add to engine map] UpdateState --> PrepLocks[Prepare lock requests
for each shard] end PrepLocks --> Msgs subgraph Msgs[Messages and Effects] MsgAck[TransactionAck to client
with fingerprint & signature] MsgLock[KVSAcquireLock to shards
with read/write keys] SpawnEng[Spawn Executor Engine] end style Guard fill:#f0f7ff,stroke:#333,stroke-width:2px style Action fill:#fff7f0,stroke:#333,stroke-width:2px style Msgs fill:#f7fff0,stroke:#333,stroke-width:2px

`` <figcaption markdown="span">transactionRequestAction` flowchart

Explanation

  1. Initial Request
    • A client sends a MempoolWorkerMsgTransactionRequest containing:
      • tx: The transaction candidate to be ordered and executed.
      • resubmission: Optional reference to a previous occurrence (currently unused).
    • The transaction candidate includes its program code and access patterns (what it will read/write).
  2. Guard Phase (transactionRequestGuard)
    • Verifies message type is MempoolWorkerMsgTransactionRequest.
    • If validation fails, request is rejected.
    • On success, passes control to transactionRequestActionLabel.
  3. Action Phase (transactionRequestAction)
    • Generates new fingerprint by incrementing gensym counter.
    • Creates Executor configuration with:
      • Timestamp set to new fingerprint.
      • Executable code from transaction.
      • Access rights from transaction label.
      • References to worker and transaction issuer.
    • Updates local state:
      • Increments gensym counter.
      • Adds transaction to transactions map.
      • Records executor ID to fingerprint mapping.
    • Prepares lock requests for each affected shard by:
      • Grouping keys by shard.
      • Creating appropriate lock request messages.
  4. Reply Generation
    • Messages sent:
      • To client: MempoolWorkerMsgTransactionAck containing:
        • tx_hash: Hash of the transaction
        • batch_number: Current batch number
        • worker_id: This worker's ID
        • signature: Worker's signature over above fields
      • To shards: KVSAcquireLock messages for each affected shard containing:
        • lazy_read_keys: Keys that might be read
        • eager_read_keys: Keys that will definitely be read
        • will_write_keys: Keys that will definitely be written
        • may_write_keys: Keys that might be written
        • worker: This worker's ID
        • executor: ID of spawned executor
        • timestamp: Generated fingerprint
  5. Replys and Effects
    • Reply Delivery
      • All messages are sent with mailbox 0 (default response mailbox).
      • Transaction acknowledgment is sent back to original requester.
      • Lock requests are sent to all relevant shards.
    • Engines spawned:
      • Creates new Executor engine with generated configuration.

Important Notes:

  • The fingerprint generation via a gensym is a simple version of what could be a more complex process

lockAcquiredAction flowchart

mermaid flowchart TD Start([Shard Reply]) --> MsgReq[ShardMsgKVSLockAcquired<br/>timestamp: TxFingerprint] subgraph Guard["lockAcquiredGuard"] MsgReq --> ValidType{Is message type<br/>LockAcquired?} ValidType -->|No| Reject([Reject Request]) ValidType -->|Yes| ActionEntry[Enter Action Phase] end ActionEntry --> Action subgraph Action["lockAcquiredAction"] direction TB AddLock[Add lock to acquired list] AddLock --> CalcMax[Calculate max consecutive:<br/>- Writes locked<br/>- Reads locked] CalcMax --> UpdateBarriers[Update seen_all barriers:<br/>- seen_all_writes<br/>- seen_all_reads] end UpdateBarriers --> Msgs subgraph Msgs[Messages and Effects] BcastWrite[UpdateSeenAll to shards<br/>for write barrier] BcastRead[UpdateSeenAll to shards<br/>for read barrier] end

lockAcquiredAction flowchart

Explanation

  1. Initial Message

    • A Mempool Worker receives a ShardMsgKVSLockAcquired message from a Shard engine.
    • The message contains:
      • timestamp: The TxFingerprint identifying which transaction's locks were acquired.
      • (Implicit) The sender of the message identifies which shard has confirmed the locks.
  2. Guard Phase (lockAcquiredGuard)

    • Verifies message type is ShardMsgKVSLockAcquired.
    • If validation fails, request is rejected.
    • On success, passes control to lockAcquiredActionLabel.
  3. Action Phase (lockAcquiredAction)

    • Adds the new lock to the locks_acquired list in state.
    • Calculates new maximum consecutive sequence points by analyzing the lock history:
      • For writes: Finds highest fingerprint where all prior write locks are confirmed.
      • For reads: Finds highest fingerprint where all prior read locks are confirmed.
    • Updates internal barriers (seen_all_writes and seen_all_reads) based on calculations.
    • Constructs appropriate update messages for all shards.
  4. Reply Generation

    • Constructs ShardMsgUpdateSeenAll messages for every shard, containing:
      • For write barrier updates:
        • timestamp: New seen_all_writes value.
        • write: true.
      • For read barrier updates:
        • timestamp: New seen_all_reads value.
        • write: false.
  5. Message Delivery

    • Update messages are broadcast to all shards in the system.
    • Uses mailbox 0 (the standard mailbox for responses).

executorFinishedAction flowchart

flowchart TD
    Start([Executor Reply]) --> MsgReq[ExecutorMsgExecutorFinished<br/>success: Bool<br/>values_read: List KeyValue<br/>values_written: List KeyValue]

    subgraph Guard["executorFinishedGuard"]
        MsgReq --> ValidType{Is message type<br/>ExecutorFinished?}
        ValidType -->|No| Reject([Reject Request])
        ValidType -->|Yes| ActionEntry[Enter Action Phase]
    end

    ActionEntry --> Action

    subgraph Action["executorFinishedAction"]
        direction TB
        FindTx{Lookup transaction<br/>for executor}
        FindTx -->|Not Found| NoAction[Do Nothing]
        FindTx -->|Found| Store[Store execution summary<br/>in local state]
    end

    Store --> Effects
    NoAction --> NoEffect([No Effect])

    subgraph Effects[Effects]
        State[Update execution summaries<br/>in local state]
    end
executorFinishedAction flowchart

Explanation

  1. Initial Request

    • An executor sends a MsgExecutorFinished containing:
      • success: Boolean indicating if execution completed successfully.
      • values_read: List of all key-value pairs that were read during execution.
      • values_written: List of all key-value pairs that were written during execution.
    • This message represents the completion of a transaction's execution lifecycle.
  2. Guard Phase (executorFinishedGuard)

    • Verifies message type is ExecutorMsgExecutorFinished.
    • If validation fails, request is rejected immediately.
    • On success, passes control to executorFinishedLabel.
  3. Action Phase (executorFinishedAction)

    • Processes valid executor completion notifications through these steps:
      • Looks up the transaction associated with the sending executor in the transactionEngines map.
      • If no transaction is found, the notification is ignored (this shouldn't happen in normal operation).
      • If transaction is found, stores the execution summary in the execution_summaries map.
      • The summary is indexed by the transaction's fingerprint for later reference.
  4. Reply Generation

    • Successful Case
      • Updates local state with the new execution summary.
      • No response messages are generated.
    • Error Case
      • If executor not found in mapping, quietly fails.
      • No error responses are sent
  5. State Update

    • Updates the worker's local state:
      • Adds new entry to execution_summaries map.
      • Maps transaction fingerprint to its execution results.
    • No messages are sent.

Action arguments

MempoolWorkerActionArgument

syntax alias MempoolWorkerActionArgument := Unit;

MempoolWorkerActionArguments

MempoolWorkerActionArguments : Type := List MempoolWorkerActionArgument;

Actions

Auxiliary Juvix code
MempoolWorkerAction (KVSKey KVSDatum Executable ProgramState : Type) : Type :=
  Action
    MempoolWorkerCfg
    (MempoolWorkerLocalState KVSKey KVSDatum Executable)
    MempoolWorkerMailboxState
    MempoolWorkerTimerHandle
    MempoolWorkerActionArguments
    (Anoma.PreMsg KVSKey KVSDatum Executable)
    (Anoma.PreCfg KVSKey KVSDatum Executable)
    (Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);

MempoolWorkerActionInput (KVSKey KVSDatum Executable : Type) : Type :=
  ActionInput
    MempoolWorkerCfg
    (MempoolWorkerLocalState KVSKey KVSDatum Executable)
    MempoolWorkerMailboxState
    MempoolWorkerTimerHandle
    MempoolWorkerActionArguments
    (Anoma.PreMsg KVSKey KVSDatum Executable);

MempoolWorkerActionEffect (KVSKey KVSDatum Executable ProgramState : Type) : Type :=
  ActionEffect
    (MempoolWorkerLocalState KVSKey KVSDatum Executable)
    MempoolWorkerMailboxState
    MempoolWorkerTimerHandle
    (Anoma.PreMsg KVSKey KVSDatum Executable)
    (Anoma.PreCfg KVSKey KVSDatum Executable)
    (Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);

MempoolWorkerActionExec (KVSKey KVSDatum Executable ProgramState : Type) : Type :=
  ActionExec
    MempoolWorkerCfg
    (MempoolWorkerLocalState KVSKey KVSDatum Executable)
    MempoolWorkerMailboxState
    MempoolWorkerTimerHandle
    MempoolWorkerActionArguments
    (Anoma.PreMsg KVSKey KVSDatum Executable)
    (Anoma.PreCfg KVSKey KVSDatum Executable)
    (Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);

transactionRequestAction

Action processing a new transaction request.

State update
  • Increments gensym counter
    • Adds transaction to transactions maps with new fingerprint
Messages to be sent
  • TransactionAck to requester
    • KVSAcquireLock messages to relevant shards
Engines to be spawned
Timer updates
No timers are set or cancelled.
transactionRequestAction
  {KVSKey KVSDatum Executable ProgramState} {{Ord KVSKey}}
  {{rinst : Runnable KVSKey KVSDatum Executable ProgramState}}
  (input : MempoolWorkerActionInput KVSKey KVSDatum Executable)
  : Option (MempoolWorkerActionEffect KVSKey KVSDatum Executable ProgramState) :=
  let
    env := ActionInput.env input;
    cfg := ActionInput.cfg input;
    local := EngineEnv.localState env;
    trigger := ActionInput.trigger input;
  in case getEngineMsgFromTimestampedTrigger trigger of {
    | some emsg := case emsg of {
      | mkEngineMsg@{msg := Anoma.MsgMempoolWorker (MempoolWorkerMsgTransactionRequest request); sender := sender} :=
          let fingerprint := MempoolWorkerLocalState.gensym local + 1;
              worker_id := getEngineIDFromEngineCfg cfg;
              candidate := TransactionRequest.tx request;
              executor_name := nameGen "executor" (snd worker_id) worker_id;
              executor_id := mkPair none executor_name;
              executorCfg := Anoma.CfgExecutor mkExecutorCfg@{
                  timestamp := fingerprint;
                  executable := TransactionCandidate.executable candidate;
                  lazy_read_keys := Set.empty;
                  eager_read_keys := Set.fromList (TransactionLabel.read (TransactionCandidate.label candidate));
                  will_write_keys := Set.fromList (TransactionLabel.write (TransactionCandidate.label candidate));
                  may_write_keys := Set.empty;
                  worker := worker_id;
                  issuer := sender
                };
              executorEnv := Anoma.EnvExecutor mkEngineEnv@{
                localState := mkExecutorLocalState@{
                  program_state := Runnable.startingState {{rinst}};
                  completed_reads := Map.empty;
                  completed_writes := Map.empty
                };
                mailboxCluster := Map.empty;
                acquaintances := Set.empty;
                timers := []
              };
              newState := local@MempoolWorkerLocalState{
                gensym := fingerprint;
                transactions := Map.insert fingerprint candidate (MempoolWorkerLocalState.transactions local);
                transactionEngines := Map.insert executor_id fingerprint (MempoolWorkerLocalState.transactionEngines local)
              };
              newEnv := env@EngineEnv{localState := newState};
              read_keys := Set.fromList (TransactionLabel.read (TransactionCandidate.label candidate));
              write_keys := Set.fromList (TransactionLabel.write (TransactionCandidate.label candidate));
              shards := Set.toList (Set.map keyToShard (Set.union read_keys write_keys));
              shardMsgs := map
                \{shard :=
                  let shard_read_keys := Set.filter (\{key := snd (keyToShard key) == snd shard}) read_keys;
                      shard_write_keys := Set.filter (\{key := snd (keyToShard key) == snd shard}) write_keys;
                      lockRequest := mkKVSAcquireLockMsg@{
                        lazy_read_keys := Set.empty;
                        eager_read_keys := shard_read_keys;
                        will_write_keys := shard_write_keys;
                        may_write_keys := Set.empty;
                        worker := worker_id;
                        executor := executor_id;
                        timestamp := fingerprint
                      };
                  in mkEngineMsg@{
                    sender := worker_id;
                    target := shard;
                    mailbox := some 0;
                    msg := Anoma.MsgShard (ShardMsgKVSAcquireLock lockRequest)
                  }}
                shards;
              ackMsg := mkEngineMsg@{
                sender := worker_id;
                target := sender;
                mailbox := some 0;
                msg := Anoma.MsgMempoolWorker (MempoolWorkerMsgTransactionAck
                  (mkTransactionAck@{
                    tx_hash := TChash fingerprint candidate;
                    batch_number := MempoolWorkerLocalState.batch_number local;
                    batch_start := 0;
                    worker_id := worker_id;
                    signature := sign fingerprint candidate
                  }))
              };
          in some mkActionEffect@{
            env := newEnv;
            msgs := ackMsg :: shardMsgs;
            timers := [];
            engines := [mkPair executorCfg executorEnv]
          }
      | _ := none
    }
    | _ := none
  };

lockAcquiredAction

Action processing lock acquisition confirmation from shards.

State update
  • Adds lock to locks_acquired list
    • Updates seen_all_writes/reads counters if applicable
Messages to be sent
  • UpdateSeenAll messages to shards when counters advance
Engines to be spawned
None
Timer updates
No timers are set or cancelled.
allLocksAcquired
  {KVSKey Executable}
  (isWrite : Bool)
  (tx : TransactionCandidate KVSKey KVSKey Executable)
  (txNum : TxFingerprint)
  (locks : List (Pair EngineID KVSLockAcquiredMsg)) : Bool :=
  let keys := case isWrite of {
        | true := TransactionLabel.write (TransactionCandidate.label tx)
        | false := TransactionLabel.read (TransactionCandidate.label tx)
      };
      neededShards := Set.fromList (map keyToShard keys);
      lockingShards := Set.fromList (map fst (List.filter \{lock := KVSLockAcquiredMsg.timestamp (snd lock) == txNum} locks));
  in Set.isSubset neededShards lockingShards;
--- Finds the highest transaction fingerprint N such that all transactions with fingerprints 1..N
--- have acquired all their necessary locks of the specified type (read or write). This represents
--- the "safe point" up to which shards can process transactions without worrying about missing locks.
terminating
findMaxConsecutiveLocked
  {KVSKey Executable}
  (isWrite : Bool)
  (transactions : Map TxFingerprint (TransactionCandidate KVSKey KVSKey Executable))
  (locks : List (Pair EngineID KVSLockAcquiredMsg))
  (current : TxFingerprint)
  (prev : TxFingerprint) : TxFingerprint :=
  case Map.lookup current transactions of {
    | none := prev
    | some tx := case allLocksAcquired isWrite tx current locks of {
      | true := findMaxConsecutiveLocked isWrite transactions locks (current + 1) current
      | false := prev
    }
  };
getAllShards
  {KVSKey Executable}
  (transactions : Map TxFingerprint (TransactionCandidate KVSKey KVSKey Executable)) : Set EngineID :=
  let getAllKeysFromLabel (label : TransactionLabel KVSKey KVSKey) : List KVSKey :=
        TransactionLabel.read label ++ TransactionLabel.write label;
      allKeys := List.concatMap
        \{tx := getAllKeysFromLabel (TransactionCandidate.label tx)}
        (Map.values transactions);
  in Set.fromList (map keyToShard allKeys);
lockAcquiredAction
  {KVSKey KVSDatum Executable ProgramState}
  (input : MempoolWorkerActionInput KVSKey KVSDatum Executable)
  : Option (MempoolWorkerActionEffect KVSKey KVSDatum Executable ProgramState) :=
  let
    env := ActionInput.env input;
    local := EngineEnv.localState env;
    trigger := ActionInput.trigger input;
  in case getEngineMsgFromTimestampedTrigger trigger of {
    | some emsg := case emsg of {
      | mkEngineMsg@{msg := Anoma.MsgShard (ShardMsgKVSLockAcquired lockMsg); sender := sender} :=
        let timestamp := KVSLockAcquiredMsg.timestamp lockMsg;
            newLocks := (mkPair sender lockMsg) :: MempoolWorkerLocalState.locks_acquired local;
            maxConsecutiveWrite := findMaxConsecutiveLocked true (MempoolWorkerLocalState.transactions local) newLocks 1 0;
            maxConsecutiveRead := findMaxConsecutiveLocked false (MempoolWorkerLocalState.transactions local) newLocks 1 0;
            newState := local@MempoolWorkerLocalState{
              locks_acquired := newLocks;
              seen_all_writes := maxConsecutiveWrite;
              seen_all_reads := maxConsecutiveRead
            };
            newEnv := env@EngineEnv{localState := newState};
            allShards := getAllShards (MempoolWorkerLocalState.transactions local);
            makeUpdateMsg (target : EngineID) (isWrite : Bool) (timestamp : TxFingerprint) : EngineMsg (Anoma.PreMsg KVSKey KVSDatum Executable) :=
              mkEngineMsg@{
                sender := getEngineIDFromEngineCfg (ActionInput.cfg input);
                target := target;
                mailbox := some 0;
                msg := Anoma.MsgShard (ShardMsgUpdateSeenAll
                  (mkUpdateSeenAllMsg@{
                    timestamp := timestamp;
                    write := isWrite
                  }))
              };
            writeMessages := map \{shard := makeUpdateMsg shard true maxConsecutiveWrite} (Set.toList allShards);
            readMessages := map \{shard := makeUpdateMsg shard false maxConsecutiveRead} (Set.toList allShards);
        in some mkActionEffect@{
          env := newEnv;
          msgs := writeMessages ++ readMessages;
          timers := [];
          engines := []
        }
      | _ := none
    }
    | _ := none
  };

executorFinishedAction

Action processing execution completion notification from executor.

State update
Adds execution summary to execution_summaries map
Messages to be sent
None
Engines to be spawned
None
Timer updates
No timers are set or cancelled.
executorFinishedAction
  {KVSKey KVSDatum Executable ProgramState}
  (input : MempoolWorkerActionInput KVSKey KVSDatum Executable)
  : Option (MempoolWorkerActionEffect KVSKey KVSDatum Executable ProgramState) :=
  let
    env := ActionInput.env input;
    local := EngineEnv.localState env;
    trigger := ActionInput.trigger input;
  in case getEngineMsgFromTimestampedTrigger trigger of {
    | some emsg := case emsg of {
      | mkEngineMsg@{msg := Anoma.MsgExecutor (ExecutorMsgExecutorFinished summary); sender := sender} :=
          case Map.lookup sender (MempoolWorkerLocalState.transactionEngines local) of {
            | some tr :=
              let newState := local@MempoolWorkerLocalState{
                    execution_summaries := Map.insert tr summary (MempoolWorkerLocalState.execution_summaries local)
                  };
                  newEnv := env@EngineEnv{localState := newState};
              in some mkActionEffect@{
                env := newEnv;
                msgs := [];
                timers := [];
                engines := []
              }
            | _ := none
          }
      | _ := none
    }
    | _ := none
  };

Action Labels

transactionRequestActionLabel
  {KVSKey KVSDatum Executable ProgramState} {{Ord KVSKey}} {{Runnable KVSKey KVSDatum Executable ProgramState}}
  : MempoolWorkerActionExec KVSKey KVSDatum Executable ProgramState :=
  Seq [ transactionRequestAction ];

lockAcquiredActionLabel
  {KVSKey KVSDatum Executable ProgramState}
  : MempoolWorkerActionExec KVSKey KVSDatum Executable ProgramState :=
  Seq [ lockAcquiredAction ];

executorFinishedActionLabel
  {KVSKey KVSDatum Executable ProgramState}
  : MempoolWorkerActionExec KVSKey KVSDatum Executable ProgramState :=
  Seq [ executorFinishedAction ];

Guards

Auxiliary Juvix code
MempoolWorkerGuard (KVSKey KVSDatum Executable ProgramState : Type) : Type :=
  Guard
    MempoolWorkerCfg
    (MempoolWorkerLocalState KVSKey KVSDatum Executable)
    MempoolWorkerMailboxState
    MempoolWorkerTimerHandle
    MempoolWorkerActionArguments
    (Anoma.PreMsg KVSKey KVSDatum Executable)
    (Anoma.PreCfg KVSKey KVSDatum Executable)
    (Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);

MempoolWorkerGuardOutput (KVSKey KVSDatum Executable ProgramState : Type) : Type :=
  GuardOutput
    MempoolWorkerCfg
    (MempoolWorkerLocalState KVSKey KVSDatum Executable)
    MempoolWorkerMailboxState
    MempoolWorkerTimerHandle
    MempoolWorkerActionArguments
    (Anoma.PreMsg KVSKey KVSDatum Executable)
    (Anoma.PreCfg KVSKey KVSDatum Executable)
    (Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);

MempoolWorkerGuardEval (KVSKey KVSDatum Executable ProgramState : Type) : Type :=
  GuardEval
    MempoolWorkerCfg
    (MempoolWorkerLocalState KVSKey KVSDatum Executable)
    MempoolWorkerMailboxState
    MempoolWorkerTimerHandle
    MempoolWorkerActionArguments
    (Anoma.PreMsg KVSKey KVSDatum Executable)
    (Anoma.PreCfg KVSKey KVSDatum Executable)
    (Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);

transactionRequestGuard

Condition
Message type is MempoolWorkerMsgTransactionRequest
transactionRequestGuard
  {KVSKey KVSDatum Executable ProgramState} {{Ord KVSKey}}
  {{Runnable KVSKey KVSDatum Executable ProgramState}}
  (trigger : TimestampedTrigger MempoolWorkerTimerHandle (Anoma.PreMsg KVSKey KVSDatum Executable))
  (cfg : EngineCfg MempoolWorkerCfg)
  (env : MempoolWorkerEnv KVSKey KVSDatum Executable)
  : Option (MempoolWorkerGuardOutput KVSKey KVSDatum Executable ProgramState) :=
  case getEngineMsgFromTimestampedTrigger trigger of {
    | some mkEngineMsg@{msg := Anoma.MsgMempoolWorker (MempoolWorkerMsgTransactionRequest _)} :=
      some mkGuardOutput@{
        action := transactionRequestActionLabel;
        args := []
      }
    | _ := none
  };

lockAcquiredGuard

Condition
Message type is ShardMsgKVSLockAc

lockAcquiredGuard

Condition
Message type is ShardMsgKVSLockAcquired
lockAcquiredGuard
  {KVSKey KVSDatum Executable ProgramState}
  (trigger : TimestampedTrigger MempoolWorkerTimerHandle (Anoma.PreMsg KVSKey KVSDatum Executable))
  (cfg : EngineCfg MempoolWorkerCfg)
  (env : MempoolWorkerEnv KVSKey KVSDatum Executable)
  : Option (MempoolWorkerGuardOutput KVSKey KVSDatum Executable ProgramState) :=
  case getEngineMsgFromTimestampedTrigger trigger of {
    | some mkEngineMsg@{msg := Anoma.MsgShard (ShardMsgKVSLockAcquired _)} :=
      some mkGuardOutput@{
        action := lockAcquiredActionLabel;
        args := []
      }
    | _ := none
  };

executorFinishedGuard

Condition
Message type is ExecutorMsgExecutorFinished
executorFinishedGuard
  {KVSKey KVSDatum Executable ProgramState}
  (trigger : TimestampedTrigger MempoolWorkerTimerHandle (Anoma.PreMsg KVSKey KVSDatum Executable))
  (cfg : EngineCfg MempoolWorkerCfg)
  (env : MempoolWorkerEnv KVSKey KVSDatum Executable)
  : Option (MempoolWorkerGuardOutput KVSKey KVSDatum Executable ProgramState) :=
  case getEngineMsgFromTimestampedTrigger trigger of {
    | some mkEngineMsg@{msg := Anoma.MsgExecutor (ExecutorMsgExecutorFinished _)} :=
      some mkGuardOutput@{
        action := executorFinishedActionLabel;
        args := []
      }
    | _ := none
  };

The Mempool Worker Behaviour

MempoolWorkerBehaviour

MempoolWorkerBehaviour (KVSKey KVSDatum Executable ProgramState : Type) : Type :=
  EngineBehaviour
    MempoolWorkerCfg
    (MempoolWorkerLocalState KVSKey KVSDatum Executable)
    MempoolWorkerMailboxState
    MempoolWorkerTimerHandle
    MempoolWorkerActionArguments
    (Anoma.PreMsg KVSKey KVSDatum Executable)
    (Anoma.PreCfg KVSKey KVSDatum Executable)
    (Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);

Instantiation

instance dummyRunnable : Runnable String String ByteString String :=
  mkRunnable@{
    executeStep := \{_ _ _ := error "Not implemented"};
    halted := \{_ := false};
    startingState := ""
  };

mempoolWorkerBehaviour : MempoolWorkerBehaviour String String ByteString String :=
  mkEngineBehaviour@{
    guards := First [
      transactionRequestGuard;
      lockAcquiredGuard;
      executorFinishedGuard
    ]
  };