Skip to content
Juvix imports

module arch.node.engines.mempool_worker_behaviour;

import arch.node.engines.mempool_worker_messages open;
import arch.node.engines.mempool_worker_config open;
import arch.node.engines.mempool_worker_environment open;
import arch.node.engines.shard_messages open;
import arch.node.engines.executor_messages open;
import arch.node.engines.executor_config open;
import arch.node.engines.executor_environment open;
import prelude open;
import Stdlib.Data.Nat open;
import Stdlib.Data.List as List;
import arch.node.types.basics open;
import arch.node.types.identities open;
import arch.node.types.messages open;
import arch.node.types.engine open;
import arch.node.types.anoma as Anoma open;

Mempool Worker Behaviour

Overview

A mempool worker acts as a transaction coordinator, receiving transaction requests, managing their execution lifecycle, and coordinating with shards and executors.

Auxiliary Juvix code

axiom sign
{KVSKey Executable}
: TxFingerprint -> TransactionCandidate KVSKey KVSKey Executable -> Signature;

axiom hash
{KVSKey Executable}
: TxFingerprint -> TransactionCandidate KVSKey KVSKey Executable -> Hash;

Mempool Worker Action Flowcharts

transactionRequestAction flowchart

```mermaidflowchart TD Start([Client Request]) --> MsgReq[MempoolWorkerMsgTransactionRequest
tx: TransactionCandidate]

subgraph Guard["transactionRequestGuard"]
    MsgReq --> ValidType{Is message type
TransactionRequest?} ValidType -->|No| Reject([Reject Request]) ValidType -->|Yes| ActionEntry[Enter Action Phase] end ActionEntry --> Action subgraph Action["transactionRequestAction"] direction TB GenFP[Generate new fingerprint
from gensym] GenFP --> SpawnEx[Create Executor Config
with access rights] SpawnEx --> UpdateState[Update local state:
- Increment gensym
- Add to transactions map
- Add to engine map] UpdateState --> PrepLocks[Prepare lock requests
for each shard] end PrepLocks --> Msgs subgraph Msgs[Messages and Effects] MsgAck[TransactionAck to client
with fingerprint & signature] MsgLock[KVSAcquireLock to shards
with read/write keys] SpawnEng[Spawn Executor Engine] end style Guard fill:#f0f7ff,stroke:#333,stroke-width:2px style Action fill:#fff7f0,stroke:#333,stroke-width:2px style Msgs fill:#f7fff0,stroke:#333,stroke-width:2px

`` <figcaption markdown="span">transactionRequestAction` flowchart

Explanation

  1. Initial Request
    • A client sends a MempoolWorkerMsgTransactionRequest containing:
      • tx: The transaction candidate to be ordered and executed.
      • resubmission: Optional reference to a previous occurrence (currently unused).
    • The transaction candidate includes its program code and access patterns (what it will read/write).
  2. Guard Phase (transactionRequestGuard)
    • Verifies message type is MempoolWorkerMsgTransactionRequest.
    • If validation fails, request is rejected.
    • On success, passes control to transactionRequestActionLabel.
  3. Action Phase (transactionRequestAction)
    • Generates new fingerprint by incrementing gensym counter.
    • Creates Executor configuration with:
      • Timestamp set to new fingerprint.
      • Executable code from transaction.
      • Access rights from transaction label.
      • References to worker and transaction issuer.
    • Updates local state:
      • Increments gensym counter.
      • Adds transaction to transactions map.
      • Records executor ID to fingerprint mapping.
    • Prepares lock requests for each affected shard by:
      • Grouping keys by shard.
      • Creating appropriate lock request messages.
  4. Reply Generation
    • Messages sent:
      • To client: MempoolWorkerMsgTransactionAck containing:
        • tx_hash: Hash of the transaction
        • batch_number: Current batch number
        • worker_id: This worker's ID
        • signature: Worker's signature over above fields
      • To shards: KVSAcquireLock messages for each affected shard containing:
        • lazy_read_keys: Keys that might be read
        • eager_read_keys: Keys that will definitely be read
        • will_write_keys: Keys that will definitely be written
        • may_write_keys: Keys that might be written
        • worker: This worker's ID
        • executor: ID of spawned executor
        • timestamp: Generated fingerprint
  5. Replys and Effects
    • Reply Delivery
      • All messages are sent with mailbox 0 (default response mailbox).
      • Transaction acknowledgment is sent back to original requester.
      • Lock requests are sent to all relevant shards.
    • Engines spawned:
      • Creates new Executor engine with generated configuration.

Important Notes:

  • The fingerprint generation via a gensym is a simple version of what could be a more complex process

lockAcquiredAction flowchart

mermaid flowchart TD Start([Shard Reply]) --> MsgReq[ShardMsgKVSLockAcquired<br/>timestamp: TxFingerprint] subgraph Guard["lockAcquiredGuard"] MsgReq --> ValidType{Is message type<br/>LockAcquired?} ValidType -->|No| Reject([Reject Request]) ValidType -->|Yes| ActionEntry[Enter Action Phase] end ActionEntry --> Action subgraph Action["lockAcquiredAction"] direction TB AddLock[Add lock to acquired list] AddLock --> CalcMax[Calculate max consecutive:<br/>- Writes locked<br/>- Reads locked] CalcMax --> UpdateBarriers[Update seen_all barriers:<br/>- seen_all_writes<br/>- seen_all_reads] end UpdateBarriers --> Msgs subgraph Msgs[Messages and Effects] BcastWrite[UpdateSeenAll to shards<br/>for write barrier] BcastRead[UpdateSeenAll to shards<br/>for read barrier] end

lockAcquiredAction flowchart

Explanation

  1. Initial Message

    • A Mempool Worker receives a ShardMsgKVSLockAcquired message from a Shard engine.
    • The message contains:
      • timestamp: The TxFingerprint identifying which transaction's locks were acquired.
      • (Implicit) The sender of the message identifies which shard has confirmed the locks.
  2. Guard Phase (lockAcquiredGuard)

    • Verifies message type is ShardMsgKVSLockAcquired.
    • If validation fails, request is rejected.
    • On success, passes control to lockAcquiredActionLabel.
  3. Action Phase (lockAcquiredAction)

    • Adds the new lock to the locks_acquired list in state.
    • Calculates new maximum consecutive sequence points by analyzing the lock history:
      • For writes: Finds highest fingerprint where all prior write locks are confirmed.
      • For reads: Finds highest fingerprint where all prior read locks are confirmed.
    • Updates internal barriers (seen_all_writes and seen_all_reads) based on calculations.
    • Constructs appropriate update messages for all shards.
  4. Reply Generation

    • Constructs ShardMsgUpdateSeenAll messages for every shard, containing:
      • For write barrier updates:
        • timestamp: New seen_all_writes value.
        • write: true.
      • For read barrier updates:
        • timestamp: New seen_all_reads value.
        • write: false.
  5. Message Delivery

    • Update messages are broadcast to all shards in the system.
    • Uses mailbox 0 (the standard mailbox for responses).

executorFinishedAction flowchart

flowchart TD
    Start([Executor Reply]) --> MsgReq[ExecutorMsgExecutorFinished<br/>success: Bool<br/>values_read: List KeyValue<br/>values_written: List KeyValue]

    subgraph Guard["executorFinishedGuard"]
        MsgReq --> ValidType{Is message type<br/>ExecutorFinished?}
        ValidType -->|No| Reject([Reject Request])
        ValidType -->|Yes| ActionEntry[Enter Action Phase]
    end

    ActionEntry --> Action

    subgraph Action["executorFinishedAction"]
        direction TB
        FindTx{Lookup transaction<br/>for executor}
        FindTx -->|Not Found| NoAction[Do Nothing]
        FindTx -->|Found| Store[Store execution summary<br/>in local state]
    end

    Store --> Effects
    NoAction --> NoEffect([No Effect])

    subgraph Effects[Effects]
        State[Update execution summaries<br/>in local state]
    end
executorFinishedAction flowchart

Explanation

  1. Initial Request

    • An executor sends a MsgExecutorFinished containing:
      • success: Boolean indicating if execution completed successfully.
      • values_read: List of all key-value pairs that were read during execution.
      • values_written: List of all key-value pairs that were written during execution.
    • This message represents the completion of a transaction's execution lifecycle.
  2. Guard Phase (executorFinishedGuard)

    • Verifies message type is ExecutorMsgExecutorFinished.
    • If validation fails, request is rejected immediately.
    • On success, passes control to executorFinishedLabel.
  3. Action Phase (executorFinishedAction)

    • Processes valid executor completion notifications through these steps:
      • Looks up the transaction associated with the sending executor in the transactionEngines map.
      • If no transaction is found, the notification is ignored (this shouldn't happen in normal operation).
      • If transaction is found, stores the execution summary in the execution_summaries map.
      • The summary is indexed by the transaction's fingerprint for later reference.
  4. Reply Generation

    • Successful Case
      • Updates local state with the new execution summary.
      • No response messages are generated.
    • Error Case
      • If executor not found in mapping, quietly fails.
      • No error responses are sent
  5. State Update

    • Updates the worker's local state:
      • Adds new entry to execution_summaries map.
      • Maps transaction fingerprint to its execution results.
    • No messages are sent.

Action arguments

MempoolWorkerActionArgument

syntax alias MempoolWorkerActionArgument := Unit;

MempoolWorkerActionArguments

MempoolWorkerActionArguments : Type := List MempoolWorkerActionArgument;

Actions

Auxiliary Juvix code

MempoolWorkerAction (KVSKey KVSDatum Executable ProgramState : Type) : Type :=
Action
MempoolWorkerCfg
(MempoolWorkerLocalState KVSKey KVSDatum Executable)
MempoolWorkerMailboxState
MempoolWorkerTimerHandle
MempoolWorkerActionArguments
(Anoma.PreMsg KVSKey KVSDatum Executable)
(Anoma.PreCfg KVSKey KVSDatum Executable)
(Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);

MempoolWorkerActionInput (KVSKey KVSDatum Executable : Type) : Type :=
ActionInput
MempoolWorkerCfg
(MempoolWorkerLocalState KVSKey KVSDatum Executable)
MempoolWorkerMailboxState
MempoolWorkerTimerHandle
MempoolWorkerActionArguments
(Anoma.PreMsg KVSKey KVSDatum Executable);

MempoolWorkerActionEffect
(KVSKey KVSDatum Executable ProgramState : Type) : Type :=
ActionEffect
(MempoolWorkerLocalState KVSKey KVSDatum Executable)
MempoolWorkerMailboxState
MempoolWorkerTimerHandle
(Anoma.PreMsg KVSKey KVSDatum Executable)
(Anoma.PreCfg KVSKey KVSDatum Executable)
(Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);

MempoolWorkerActionExec
(KVSKey KVSDatum Executable ProgramState : Type) : Type :=
ActionExec
MempoolWorkerCfg
(MempoolWorkerLocalState KVSKey KVSDatum Executable)
MempoolWorkerMailboxState
MempoolWorkerTimerHandle
MempoolWorkerActionArguments
(Anoma.PreMsg KVSKey KVSDatum Executable)
(Anoma.PreCfg KVSKey KVSDatum Executable)
(Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);

transactionRequestAction

Action processing a new transaction request.

State update
  • Increments gensym counter
    • Adds transaction to transactions maps with new fingerprint
Messages to be sent
  • TransactionAck to requester
    • KVSAcquireLock messages to relevant shards
Engines to be spawned
Timer updates
No timers are set or cancelled.
transactionRequestAction
{KVSKey KVSDatum Executable ProgramState}
{{Ord KVSKey}}
{{rinst : Runnable KVSKey KVSDatum Executable ProgramState}}
(input : MempoolWorkerActionInput KVSKey KVSDatum Executable)
: Option
(MempoolWorkerActionEffect KVSKey KVSDatum Executable ProgramState) :=
let
env := ActionInput.env input;
cfg := ActionInput.cfg input;
local := EngineEnv.localState env;
trigger := ActionInput.trigger input;
in case getEngineMsgFromTimestampedTrigger trigger of
| some emsg :=
case emsg of {
| mkEngineMsg@{
msg := Anoma.MsgMempoolWorker (MempoolWorkerMsgTransactionRequest request);
sender := sender;
} :=
let
fingerprint := MempoolWorkerLocalState.gensym local + 1;
worker_id := getEngineIDFromEngineCfg cfg;
candidate := TransactionRequest.tx request;
executor_name := nameGen "executor" (snd worker_id) worker_id;
executor_id := mkPair none executor_name;
executorCfg :=
Anoma.CfgExecutor
mkExecutorCfg@{
timestamp := fingerprint;
executable := TransactionCandidate.executable candidate;
lazy_read_keys := Set.empty;
eager_read_keys :=
Set.fromList
(TransactionLabel.read
(TransactionCandidate.label candidate));
will_write_keys :=
Set.fromList
(TransactionLabel.write
(TransactionCandidate.label candidate));
may_write_keys := Set.empty;
worker := worker_id;
issuer := sender;
};
executorEnv :=
Anoma.EnvExecutor
mkEngineEnv@{
localState :=
mkExecutorLocalState@{
program_state := Runnable.startingState {{rinst}};
completed_reads := Map.empty;
completed_writes := Map.empty;
};
mailboxCluster := Map.empty;
acquaintances := Set.empty;
timers := [];
};
newState :=
local@MempoolWorkerLocalState{
gensym := fingerprint;
transactions := Map.insert
fingerprint
candidate
(MempoolWorkerLocalState.transactions local);
transactionEngines := Map.insert
executor_id
fingerprint
(MempoolWorkerLocalState.transactionEngines local);
};
newEnv := env@EngineEnv{localState := newState};
read_keys :=
Set.fromList
(TransactionLabel.read
(TransactionCandidate.label candidate));
write_keys :=
Set.fromList
(TransactionLabel.write
(TransactionCandidate.label candidate));
shards :=
Set.toList
(Set.map keyToShard (Set.union read_keys write_keys));
shardMsgs :=
map
\{shard :=
let
shard_read_keys :=
Set.filter
\{key := snd (keyToShard key) == snd shard}
read_keys;
shard_write_keys :=
Set.filter
\{key := snd (keyToShard key) == snd shard}
write_keys;
lockRequest :=
mkKVSAcquireLockMsg@{
lazy_read_keys := Set.empty;
eager_read_keys := shard_read_keys;
will_write_keys := shard_write_keys;
may_write_keys := Set.empty;
worker := worker_id;
executor := executor_id;
timestamp := fingerprint;
};
in mkEngineMsg@{
sender := worker_id;
target := shard;
mailbox := some 0;
msg :=
Anoma.MsgShard (ShardMsgKVSAcquireLock lockRequest);
}}
shards;
ackMsg :=
mkEngineMsg@{
sender := worker_id;
target := sender;
mailbox := some 0;
msg :=
Anoma.MsgMempoolWorker
(MempoolWorkerMsgTransactionAck
mkTransactionAck@{
tx_hash := hash fingerprint candidate;
batch_number :=
MempoolWorkerLocalState.batch_number local;
batch_start := 0;
worker_id := worker_id;
signature := sign fingerprint candidate;
});
};
in some
mkActionEffect@{
env := newEnv;
msgs := ackMsg :: shardMsgs;
timers := [];
engines := [mkPair executorCfg executorEnv];
}
| _ := none
}
| _ := none;

lockAcquiredAction

Action processing lock acquisition confirmation from shards.

State update
  • Adds lock to locks_acquired list
    • Updates seen_all_writes/reads counters if applicable
Messages to be sent
  • UpdateSeenAll messages to shards when counters advance
Engines to be spawned
None
Timer updates
No timers are set or cancelled.
allLocksAcquired
{KVSKey Executable}
(isWrite : Bool)
(tx : TransactionCandidate KVSKey KVSKey Executable)
(txNum : TxFingerprint)
(locks : List (Pair EngineID KVSLockAcquiredMsg))
: Bool :=
let
keys :=
case isWrite of
| true := TransactionLabel.write (TransactionCandidate.label tx)
| false := TransactionLabel.read (TransactionCandidate.label tx);
neededShards := Set.fromList (map keyToShard keys);
lockingShards :=
Set.fromList
(map
fst
(List.filter
\{lock := KVSLockAcquiredMsg.timestamp (snd lock) == txNum}
locks));
in Set.isSubset neededShards lockingShards;
--- Finds the highest transaction fingerprint N such that all transactions with fingerprints 1..N
--- have acquired all their necessary locks of the specified type (read or write). This represents
--- the "safe point" up to which shards can process transactions without worrying about missing locks.
terminating
findMaxConsecutiveLocked
{KVSKey Executable}
(isWrite : Bool)
(transactions : Map
TxFingerprint
(TransactionCandidate KVSKey KVSKey Executable))
(locks : List (Pair EngineID KVSLockAcquiredMsg))
(current : TxFingerprint)
(prev : TxFingerprint)
: TxFingerprint :=
case Map.lookup current transactions of
| none := prev
| some tx :=
case allLocksAcquired isWrite tx current locks of
| true :=
findMaxConsecutiveLocked
isWrite
transactions
locks
(current + 1)
current
| false := prev;
getAllShards
{KVSKey Executable}
(transactions : Map
TxFingerprint
(TransactionCandidate KVSKey KVSKey Executable))
: Set EngineID :=
let
getAllKeysFromLabel
(label : TransactionLabel KVSKey KVSKey) : List KVSKey :=
TransactionLabel.read label ++ TransactionLabel.write label;
allKeys :=
List.concatMap
\{tx := getAllKeysFromLabel (TransactionCandidate.label tx)}
(Map.values transactions);
in Set.fromList (map keyToShard allKeys);
lockAcquiredAction
{KVSKey KVSDatum Executable ProgramState}
(input : MempoolWorkerActionInput KVSKey KVSDatum Executable)
: Option
(MempoolWorkerActionEffect KVSKey KVSDatum Executable ProgramState) :=
let
env := ActionInput.env input;
local := EngineEnv.localState env;
trigger := ActionInput.trigger input;
in case getEngineMsgFromTimestampedTrigger trigger of
| some emsg :=
case emsg of {
| mkEngineMsg@{
msg := Anoma.MsgShard (ShardMsgKVSLockAcquired lockMsg);
sender := sender;
} :=
let
timestamp := KVSLockAcquiredMsg.timestamp lockMsg;
newLocks :=
mkPair sender lockMsg
:: MempoolWorkerLocalState.locks_acquired local;
maxConsecutiveWrite :=
findMaxConsecutiveLocked
true
(MempoolWorkerLocalState.transactions local)
newLocks
1
0;
maxConsecutiveRead :=
findMaxConsecutiveLocked
false
(MempoolWorkerLocalState.transactions local)
newLocks
1
0;
newState :=
local@MempoolWorkerLocalState{
locks_acquired := newLocks;
seen_all_writes := maxConsecutiveWrite;
seen_all_reads := maxConsecutiveRead;
};
newEnv := env@EngineEnv{localState := newState};
allShards :=
getAllShards (MempoolWorkerLocalState.transactions local);
makeUpdateMsg
(target : EngineID)
(isWrite : Bool)
(timestamp : TxFingerprint)
: EngineMsg (Anoma.PreMsg KVSKey KVSDatum Executable) :=
mkEngineMsg@{
sender := getEngineIDFromEngineCfg (ActionInput.cfg input);
target := target;
mailbox := some 0;
msg :=
Anoma.MsgShard
(ShardMsgUpdateSeenAll
mkUpdateSeenAllMsg@{
timestamp := timestamp;
write := isWrite;
});
};
writeMessages :=
map
\{shard := makeUpdateMsg shard true maxConsecutiveWrite}
(Set.toList allShards);
readMessages :=
map
\{shard := makeUpdateMsg shard false maxConsecutiveRead}
(Set.toList allShards);
in some
mkActionEffect@{
env := newEnv;
msgs := writeMessages ++ readMessages;
timers := [];
engines := [];
}
| _ := none
}
| _ := none;

executorFinishedAction

Action processing execution completion notification from executor.

State update
Adds execution summary to execution_summaries map
Messages to be sent
None
Engines to be spawned
None
Timer updates
No timers are set or cancelled.
executorFinishedAction
{KVSKey KVSDatum Executable ProgramState}
(input : MempoolWorkerActionInput KVSKey KVSDatum Executable)
: Option
(MempoolWorkerActionEffect KVSKey KVSDatum Executable ProgramState) :=
let
env := ActionInput.env input;
local := EngineEnv.localState env;
trigger := ActionInput.trigger input;
in case getEngineMsgFromTimestampedTrigger trigger of
| some emsg :=
case emsg of {
| mkEngineMsg@{
msg := Anoma.MsgExecutor (ExecutorMsgExecutorFinished summary);
sender := sender;
} :=
case
Map.lookup
sender
(MempoolWorkerLocalState.transactionEngines local)
of {
| some tr :=
let
newState :=
local@MempoolWorkerLocalState{execution_summaries := Map.insert
tr
summary
(MempoolWorkerLocalState.execution_summaries local)};
newEnv := env@EngineEnv{localState := newState};
in some
mkActionEffect@{
env := newEnv;
msgs := [];
timers := [];
engines := [];
}
| _ := none
}
| _ := none
}
| _ := none;

Action Labels

transactionRequestActionLabel
{KVSKey KVSDatum Executable ProgramState}
{{Ord KVSKey}}
{{Runnable KVSKey KVSDatum Executable ProgramState}}
: MempoolWorkerActionExec KVSKey KVSDatum Executable ProgramState :=
Seq [transactionRequestAction];

lockAcquiredActionLabel
{KVSKey KVSDatum Executable ProgramState}
: MempoolWorkerActionExec KVSKey KVSDatum Executable ProgramState :=
Seq [lockAcquiredAction];

executorFinishedActionLabel
{KVSKey KVSDatum Executable ProgramState}
: MempoolWorkerActionExec KVSKey KVSDatum Executable ProgramState :=
Seq [executorFinishedAction];

Guards

Auxiliary Juvix code

MempoolWorkerGuard (KVSKey KVSDatum Executable ProgramState : Type) : Type :=
Guard
MempoolWorkerCfg
(MempoolWorkerLocalState KVSKey KVSDatum Executable)
MempoolWorkerMailboxState
MempoolWorkerTimerHandle
MempoolWorkerActionArguments
(Anoma.PreMsg KVSKey KVSDatum Executable)
(Anoma.PreCfg KVSKey KVSDatum Executable)
(Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);

MempoolWorkerGuardOutput
(KVSKey KVSDatum Executable ProgramState : Type) : Type :=
GuardOutput
MempoolWorkerCfg
(MempoolWorkerLocalState KVSKey KVSDatum Executable)
MempoolWorkerMailboxState
MempoolWorkerTimerHandle
MempoolWorkerActionArguments
(Anoma.PreMsg KVSKey KVSDatum Executable)
(Anoma.PreCfg KVSKey KVSDatum Executable)
(Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);

MempoolWorkerGuardEval
(KVSKey KVSDatum Executable ProgramState : Type) : Type :=
GuardEval
MempoolWorkerCfg
(MempoolWorkerLocalState KVSKey KVSDatum Executable)
MempoolWorkerMailboxState
MempoolWorkerTimerHandle
MempoolWorkerActionArguments
(Anoma.PreMsg KVSKey KVSDatum Executable)
(Anoma.PreCfg KVSKey KVSDatum Executable)
(Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);

transactionRequestGuard

Condition
Message type is MempoolWorkerMsgTransactionRequest
transactionRequestGuard
{KVSKey KVSDatum Executable ProgramState}
{{Ord KVSKey}}
{{Runnable KVSKey KVSDatum Executable ProgramState}}
(trigger : TimestampedTrigger
MempoolWorkerTimerHandle
(Anoma.PreMsg KVSKey KVSDatum Executable))
(cfg : EngineCfg MempoolWorkerCfg)
(env : MempoolWorkerEnv KVSKey KVSDatum Executable)
: Option (MempoolWorkerGuardOutput KVSKey KVSDatum Executable ProgramState) :=
case getEngineMsgFromTimestampedTrigger trigger of
| some mkEngineMsg@{
msg := Anoma.MsgMempoolWorker (MempoolWorkerMsgTransactionRequest _);
} :=
some
mkGuardOutput@{
action := transactionRequestActionLabel;
args := [];
}
| _ := none;

lockAcquiredGuard

Condition
Message type is ShardMsgKVSLockAc

lockAcquiredGuard

Condition
Message type is ShardMsgKVSLockAcquired
lockAcquiredGuard
{KVSKey KVSDatum Executable ProgramState}
(trigger : TimestampedTrigger
MempoolWorkerTimerHandle
(Anoma.PreMsg KVSKey KVSDatum Executable))
(cfg : EngineCfg MempoolWorkerCfg)
(env : MempoolWorkerEnv KVSKey KVSDatum Executable)
: Option (MempoolWorkerGuardOutput KVSKey KVSDatum Executable ProgramState) :=
case getEngineMsgFromTimestampedTrigger trigger of
| some mkEngineMsg@{msg := Anoma.MsgShard (ShardMsgKVSLockAcquired _)} :=
some
mkGuardOutput@{
action := lockAcquiredActionLabel;
args := [];
}
| _ := none;

executorFinishedGuard

Condition
Message type is ExecutorMsgExecutorFinished
executorFinishedGuard
{KVSKey KVSDatum Executable ProgramState}
(trigger : TimestampedTrigger
MempoolWorkerTimerHandle
(Anoma.PreMsg KVSKey KVSDatum Executable))
(cfg : EngineCfg MempoolWorkerCfg)
(env : MempoolWorkerEnv KVSKey KVSDatum Executable)
: Option (MempoolWorkerGuardOutput KVSKey KVSDatum Executable ProgramState) :=
case getEngineMsgFromTimestampedTrigger trigger of
| some mkEngineMsg@{
msg := Anoma.MsgExecutor (ExecutorMsgExecutorFinished _);
} :=
some
mkGuardOutput@{
action := executorFinishedActionLabel;
args := [];
}
| _ := none;

The Mempool Worker Behaviour

MempoolWorkerBehaviour

MempoolWorkerBehaviour
(KVSKey KVSDatum Executable ProgramState : Type) : Type :=
EngineBehaviour
MempoolWorkerCfg
(MempoolWorkerLocalState KVSKey KVSDatum Executable)
MempoolWorkerMailboxState
MempoolWorkerTimerHandle
MempoolWorkerActionArguments
(Anoma.PreMsg KVSKey KVSDatum Executable)
(Anoma.PreCfg KVSKey KVSDatum Executable)
(Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);

Instantiation

instance
dummyRunnable : Runnable String String ByteString String :=
mkRunnable@{
executeStep := \{_ _ _ := error "Not implemented"};
halted := \{_ := false};
startingState := "";
};

mempoolWorkerBehaviour
: MempoolWorkerBehaviour String String ByteString String :=
mkEngineBehaviour@{
guards :=
First [transactionRequestGuard; lockAcquiredGuard; executorFinishedGuard];
};