Juvix imports
module arch.node.engines.mempool_worker_behaviour;
import arch.node.engines.mempool_worker_messages open;
import arch.node.engines.mempool_worker_config open;
import arch.node.engines.mempool_worker_environment open;
import arch.node.engines.shard_messages open;
import arch.node.engines.executor_messages open;
import arch.node.engines.executor_config open;
import arch.node.engines.executor_environment open;
import prelude open;
import Stdlib.Data.Nat open;
import Stdlib.Data.List as List;
import arch.node.types.basics open;
import arch.node.types.identities open;
import arch.node.types.messages open;
import arch.node.types.engine open;
import arch.node.types.anoma as Anoma open;
Mempool Worker Behaviour¶
Overview¶
A mempool worker acts as a transaction coordinator, receiving transaction requests, managing their execution lifecycle, and coordinating with shards and executors.
Auxiliary Juvix code
axiom sign
{KVSKey Executable}
: TxFingerprint -> TransactionCandidate KVSKey KVSKey Executable -> Signature;
axiom hash
{KVSKey Executable}
: TxFingerprint -> TransactionCandidate KVSKey KVSKey Executable -> Hash;
Mempool Worker Action Flowcharts¶
transactionRequestAction
flowchart¶
```mermaidflowchart TD
Start([Client Request]) --> MsgReq[MempoolWorkerMsgTransactionRequest
tx: TransactionCandidate]
subgraph Guard["transactionRequestGuard"]
MsgReq --> ValidType{Is message type
TransactionRequest?}
ValidType -->|No| Reject([Reject Request])
ValidType -->|Yes| ActionEntry[Enter Action Phase]
end
ActionEntry --> Action
subgraph Action["transactionRequestAction"]
direction TB
GenFP[Generate new fingerprint
from gensym]
GenFP --> SpawnEx[Create Executor Config
with access rights]
SpawnEx --> UpdateState[Update local state:
- Increment gensym
- Add to transactions map
- Add to engine map]
UpdateState --> PrepLocks[Prepare lock requests
for each shard]
end
PrepLocks --> Msgs
subgraph Msgs[Messages and Effects]
MsgAck[TransactionAck to client
with fingerprint & signature]
MsgLock[KVSAcquireLock to shards
with read/write keys]
SpawnEng[Spawn Executor Engine]
end
style Guard fill:#f0f7ff,stroke:#333,stroke-width:2px
style Action fill:#fff7f0,stroke:#333,stroke-width:2px
style Msgs fill:#f7fff0,stroke:#333,stroke-width:2px
``
<figcaption markdown="span">
transactionRequestAction` flowchart
Explanation¶
- Initial Request
- A client sends a
MempoolWorkerMsgTransactionRequest
containing:tx
: The transaction candidate to be ordered and executed.resubmission
: Optional reference to a previous occurrence (currently unused).
- The transaction candidate includes its program code and access patterns (what it will read/write).
- A client sends a
- Guard Phase (
transactionRequestGuard
)- Verifies message type is
MempoolWorkerMsgTransactionRequest
. - If validation fails, request is rejected.
- On success, passes control to
transactionRequestActionLabel
.
- Verifies message type is
- Action Phase (
transactionRequestAction
)- Generates new fingerprint by incrementing gensym counter.
- Creates Executor configuration with:
- Timestamp set to new fingerprint.
- Executable code from transaction.
- Access rights from transaction label.
- References to worker and transaction issuer.
- Updates local state:
- Increments gensym counter.
- Adds transaction to transactions map.
- Records executor ID to fingerprint mapping.
- Prepares lock requests for each affected shard by:
- Grouping keys by shard.
- Creating appropriate lock request messages.
- Reply Generation
- Messages sent:
- To client:
MempoolWorkerMsgTransactionAck
containing:tx_hash
: Hash of the transactionbatch_number
: Current batch numberworker_id
: This worker's IDsignature
: Worker's signature over above fields
- To shards:
KVSAcquireLock
messages for each affected shard containing:lazy_read_keys
: Keys that might be readeager_read_keys
: Keys that will definitely be readwill_write_keys
: Keys that will definitely be writtenmay_write_keys
: Keys that might be writtenworker
: This worker's IDexecutor
: ID of spawned executortimestamp
: Generated fingerprint
- To client:
- Messages sent:
- Replys and Effects
- Reply Delivery
- All messages are sent with mailbox 0 (default response mailbox).
- Transaction acknowledgment is sent back to original requester.
- Lock requests are sent to all relevant shards.
- Engines spawned:
- Creates new Executor engine with generated configuration.
- Reply Delivery
Important Notes:¶
- The fingerprint generation via a gensym is a simple version of what could be a more complex process
lockAcquiredAction
flowchart¶
mermaid
flowchart TD
Start([Shard Reply]) --> MsgReq[ShardMsgKVSLockAcquired<br/>timestamp: TxFingerprint]
subgraph Guard["lockAcquiredGuard"]
MsgReq --> ValidType{Is message type<br/>LockAcquired?}
ValidType -->|No| Reject([Reject Request])
ValidType -->|Yes| ActionEntry[Enter Action Phase]
end
ActionEntry --> Action
subgraph Action["lockAcquiredAction"]
direction TB
AddLock[Add lock to acquired list]
AddLock --> CalcMax[Calculate max consecutive:<br/>- Writes locked<br/>- Reads locked]
CalcMax --> UpdateBarriers[Update seen_all barriers:<br/>- seen_all_writes<br/>- seen_all_reads]
end
UpdateBarriers --> Msgs
subgraph Msgs[Messages and Effects]
BcastWrite[UpdateSeenAll to shards<br/>for write barrier]
BcastRead[UpdateSeenAll to shards<br/>for read barrier]
end
lockAcquiredAction
flowchart
Explanation¶
-
Initial Message
- A Mempool Worker receives a
ShardMsgKVSLockAcquired
message from a Shard engine. - The message contains:
timestamp
: The TxFingerprint identifying which transaction's locks were acquired.- (Implicit) The sender of the message identifies which shard has confirmed the locks.
- A Mempool Worker receives a
-
Guard Phase (
lockAcquiredGuard
)- Verifies message type is
ShardMsgKVSLockAcquired
. - If validation fails, request is rejected.
- On success, passes control to
lockAcquiredActionLabel
.
- Verifies message type is
-
Action Phase (
lockAcquiredAction
)- Adds the new lock to the
locks_acquired
list in state. - Calculates new maximum consecutive sequence points by analyzing the lock history:
- For writes: Finds highest fingerprint where all prior write locks are confirmed.
- For reads: Finds highest fingerprint where all prior read locks are confirmed.
- Updates internal barriers (
seen_all_writes
andseen_all_reads
) based on calculations. - Constructs appropriate update messages for all shards.
- Adds the new lock to the
-
Reply Generation
- Constructs
ShardMsgUpdateSeenAll
messages for every shard, containing:- For write barrier updates:
timestamp
: Newseen_all_writes
value.write
: true.
- For read barrier updates:
timestamp
: Newseen_all_reads
value.write
: false.
- For write barrier updates:
- Constructs
-
Message Delivery
- Update messages are broadcast to all shards in the system.
- Uses mailbox 0 (the standard mailbox for responses).
executorFinishedAction
flowchart¶
flowchart TD
Start([Executor Reply]) --> MsgReq[ExecutorMsgExecutorFinished<br/>success: Bool<br/>values_read: List KeyValue<br/>values_written: List KeyValue]
subgraph Guard["executorFinishedGuard"]
MsgReq --> ValidType{Is message type<br/>ExecutorFinished?}
ValidType -->|No| Reject([Reject Request])
ValidType -->|Yes| ActionEntry[Enter Action Phase]
end
ActionEntry --> Action
subgraph Action["executorFinishedAction"]
direction TB
FindTx{Lookup transaction<br/>for executor}
FindTx -->|Not Found| NoAction[Do Nothing]
FindTx -->|Found| Store[Store execution summary<br/>in local state]
end
Store --> Effects
NoAction --> NoEffect([No Effect])
subgraph Effects[Effects]
State[Update execution summaries<br/>in local state]
end
executorFinishedAction
flowchart
Explanation¶
-
Initial Request
- An executor sends a
MsgExecutorFinished
containing:success
: Boolean indicating if execution completed successfully.values_read
: List of all key-value pairs that were read during execution.values_written
: List of all key-value pairs that were written during execution.
- This message represents the completion of a transaction's execution lifecycle.
- An executor sends a
-
Guard Phase (
executorFinishedGuard
)- Verifies message type is
ExecutorMsgExecutorFinished
. - If validation fails, request is rejected immediately.
- On success, passes control to
executorFinishedLabel
.
- Verifies message type is
-
Action Phase (
executorFinishedAction
)- Processes valid executor completion notifications through these steps:
- Looks up the transaction associated with the sending executor in the
transactionEngines
map. - If no transaction is found, the notification is ignored (this shouldn't happen in normal operation).
- If transaction is found, stores the execution summary in the
execution_summaries
map. - The summary is indexed by the transaction's fingerprint for later reference.
- Looks up the transaction associated with the sending executor in the
- Processes valid executor completion notifications through these steps:
-
Reply Generation
- Successful Case
- Updates local state with the new execution summary.
- No response messages are generated.
- Error Case
- If executor not found in mapping, quietly fails.
- No error responses are sent
- Successful Case
-
State Update
- Updates the worker's local state:
- Adds new entry to
execution_summaries
map. - Maps transaction fingerprint to its execution results.
- Adds new entry to
- No messages are sent.
- Updates the worker's local state:
Action arguments¶
MempoolWorkerActionArgument
¶
syntax alias MempoolWorkerActionArgument := Unit;
MempoolWorkerActionArguments
¶
MempoolWorkerActionArguments : Type := List MempoolWorkerActionArgument;
Actions¶
Auxiliary Juvix code
MempoolWorkerAction (KVSKey KVSDatum Executable ProgramState : Type) : Type :=
Action
MempoolWorkerCfg
(MempoolWorkerLocalState KVSKey KVSDatum Executable)
MempoolWorkerMailboxState
MempoolWorkerTimerHandle
MempoolWorkerActionArguments
(Anoma.PreMsg KVSKey KVSDatum Executable)
(Anoma.PreCfg KVSKey KVSDatum Executable)
(Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);
MempoolWorkerActionInput (KVSKey KVSDatum Executable : Type) : Type :=
ActionInput
MempoolWorkerCfg
(MempoolWorkerLocalState KVSKey KVSDatum Executable)
MempoolWorkerMailboxState
MempoolWorkerTimerHandle
MempoolWorkerActionArguments
(Anoma.PreMsg KVSKey KVSDatum Executable);
MempoolWorkerActionEffect
(KVSKey KVSDatum Executable ProgramState : Type) : Type :=
ActionEffect
(MempoolWorkerLocalState KVSKey KVSDatum Executable)
MempoolWorkerMailboxState
MempoolWorkerTimerHandle
(Anoma.PreMsg KVSKey KVSDatum Executable)
(Anoma.PreCfg KVSKey KVSDatum Executable)
(Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);
MempoolWorkerActionExec
(KVSKey KVSDatum Executable ProgramState : Type) : Type :=
ActionExec
MempoolWorkerCfg
(MempoolWorkerLocalState KVSKey KVSDatum Executable)
MempoolWorkerMailboxState
MempoolWorkerTimerHandle
MempoolWorkerActionArguments
(Anoma.PreMsg KVSKey KVSDatum Executable)
(Anoma.PreCfg KVSKey KVSDatum Executable)
(Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);
transactionRequestAction
¶
Action processing a new transaction request.
- State update
-
- Increments gensym counter
- Adds transaction to transactions maps with new fingerprint
- Increments gensym counter
- Messages to be sent
-
TransactionAck
to requesterKVSAcquireLock
messages to relevant shards
- Engines to be spawned
-
- Creates new Executor Engine for the transaction
- Timer updates
- No timers are set or cancelled.
transactionRequestAction
{KVSKey KVSDatum Executable ProgramState}
{{Ord KVSKey}}
{{rinst : Runnable KVSKey KVSDatum Executable ProgramState}}
(input : MempoolWorkerActionInput KVSKey KVSDatum Executable)
: Option
(MempoolWorkerActionEffect KVSKey KVSDatum Executable ProgramState) :=
let
env := ActionInput.env input;
cfg := ActionInput.cfg input;
local := EngineEnv.localState env;
trigger := ActionInput.trigger input;
in case getEngineMsgFromTimestampedTrigger trigger of
| some emsg :=
case emsg of {
| mkEngineMsg@{
msg := Anoma.MsgMempoolWorker (MempoolWorkerMsgTransactionRequest request);
sender := sender;
} :=
let
fingerprint := MempoolWorkerLocalState.gensym local + 1;
worker_id := getEngineIDFromEngineCfg cfg;
candidate := TransactionRequest.tx request;
executor_name := nameGen "executor" (snd worker_id) worker_id;
executor_id := mkPair none executor_name;
executorCfg :=
Anoma.CfgExecutor
mkExecutorCfg@{
timestamp := fingerprint;
executable := TransactionCandidate.executable candidate;
lazy_read_keys := Set.empty;
eager_read_keys :=
Set.fromList
(TransactionLabel.read
(TransactionCandidate.label candidate));
will_write_keys :=
Set.fromList
(TransactionLabel.write
(TransactionCandidate.label candidate));
may_write_keys := Set.empty;
worker := worker_id;
issuer := sender;
};
executorEnv :=
Anoma.EnvExecutor
mkEngineEnv@{
localState :=
mkExecutorLocalState@{
program_state := Runnable.startingState {{rinst}};
completed_reads := Map.empty;
completed_writes := Map.empty;
};
mailboxCluster := Map.empty;
acquaintances := Set.empty;
timers := [];
};
newState :=
local@MempoolWorkerLocalState{
gensym := fingerprint;
transactions := Map.insert
fingerprint
candidate
(MempoolWorkerLocalState.transactions local);
transactionEngines := Map.insert
executor_id
fingerprint
(MempoolWorkerLocalState.transactionEngines local);
};
newEnv := env@EngineEnv{localState := newState};
read_keys :=
Set.fromList
(TransactionLabel.read
(TransactionCandidate.label candidate));
write_keys :=
Set.fromList
(TransactionLabel.write
(TransactionCandidate.label candidate));
shards :=
Set.toList
(Set.map keyToShard (Set.union read_keys write_keys));
shardMsgs :=
map
\{shard :=
let
shard_read_keys :=
Set.filter
\{key := snd (keyToShard key) == snd shard}
read_keys;
shard_write_keys :=
Set.filter
\{key := snd (keyToShard key) == snd shard}
write_keys;
lockRequest :=
mkKVSAcquireLockMsg@{
lazy_read_keys := Set.empty;
eager_read_keys := shard_read_keys;
will_write_keys := shard_write_keys;
may_write_keys := Set.empty;
worker := worker_id;
executor := executor_id;
timestamp := fingerprint;
};
in mkEngineMsg@{
sender := worker_id;
target := shard;
mailbox := some 0;
msg :=
Anoma.MsgShard (ShardMsgKVSAcquireLock lockRequest);
}}
shards;
ackMsg :=
mkEngineMsg@{
sender := worker_id;
target := sender;
mailbox := some 0;
msg :=
Anoma.MsgMempoolWorker
(MempoolWorkerMsgTransactionAck
mkTransactionAck@{
tx_hash := hash fingerprint candidate;
batch_number :=
MempoolWorkerLocalState.batch_number local;
batch_start := 0;
worker_id := worker_id;
signature := sign fingerprint candidate;
});
};
in some
mkActionEffect@{
env := newEnv;
msgs := ackMsg :: shardMsgs;
timers := [];
engines := [mkPair executorCfg executorEnv];
}
| _ := none
}
| _ := none;
lockAcquiredAction
¶
Action processing lock acquisition confirmation from shards.
- State update
-
- Adds lock to locks_acquired list
- Updates seen_all_writes/reads counters if applicable
- Adds lock to locks_acquired list
- Messages to be sent
-
- UpdateSeenAll messages to shards when counters advance
- Engines to be spawned
- None
- Timer updates
- No timers are set or cancelled.
allLocksAcquired
{KVSKey Executable}
(isWrite : Bool)
(tx : TransactionCandidate KVSKey KVSKey Executable)
(txNum : TxFingerprint)
(locks : List (Pair EngineID KVSLockAcquiredMsg))
: Bool :=
let
keys :=
case isWrite of
| true := TransactionLabel.write (TransactionCandidate.label tx)
| false := TransactionLabel.read (TransactionCandidate.label tx);
neededShards := Set.fromList (map keyToShard keys);
lockingShards :=
Set.fromList
(map
fst
(List.filter
\{lock := KVSLockAcquiredMsg.timestamp (snd lock) == txNum}
locks));
in Set.isSubset neededShards lockingShards;
--- Finds the highest transaction fingerprint N such that all transactions with fingerprints 1..N
--- have acquired all their necessary locks of the specified type (read or write). This represents
--- the "safe point" up to which shards can process transactions without worrying about missing locks.
terminating
findMaxConsecutiveLocked
{KVSKey Executable}
(isWrite : Bool)
(transactions : Map
TxFingerprint
(TransactionCandidate KVSKey KVSKey Executable))
(locks : List (Pair EngineID KVSLockAcquiredMsg))
(current : TxFingerprint)
(prev : TxFingerprint)
: TxFingerprint :=
case Map.lookup current transactions of
| none := prev
| some tx :=
case allLocksAcquired isWrite tx current locks of
| true :=
findMaxConsecutiveLocked
isWrite
transactions
locks
(current + 1)
current
| false := prev;
getAllShards
{KVSKey Executable}
(transactions : Map
TxFingerprint
(TransactionCandidate KVSKey KVSKey Executable))
: Set EngineID :=
let
getAllKeysFromLabel
(label : TransactionLabel KVSKey KVSKey) : List KVSKey :=
TransactionLabel.read label ++ TransactionLabel.write label;
allKeys :=
List.concatMap
\{tx := getAllKeysFromLabel (TransactionCandidate.label tx)}
(Map.values transactions);
in Set.fromList (map keyToShard allKeys);
lockAcquiredAction
{KVSKey KVSDatum Executable ProgramState}
(input : MempoolWorkerActionInput KVSKey KVSDatum Executable)
: Option
(MempoolWorkerActionEffect KVSKey KVSDatum Executable ProgramState) :=
let
env := ActionInput.env input;
local := EngineEnv.localState env;
trigger := ActionInput.trigger input;
in case getEngineMsgFromTimestampedTrigger trigger of
| some emsg :=
case emsg of {
| mkEngineMsg@{
msg := Anoma.MsgShard (ShardMsgKVSLockAcquired lockMsg);
sender := sender;
} :=
let
timestamp := KVSLockAcquiredMsg.timestamp lockMsg;
newLocks :=
mkPair sender lockMsg
:: MempoolWorkerLocalState.locks_acquired local;
maxConsecutiveWrite :=
findMaxConsecutiveLocked
true
(MempoolWorkerLocalState.transactions local)
newLocks
1
0;
maxConsecutiveRead :=
findMaxConsecutiveLocked
false
(MempoolWorkerLocalState.transactions local)
newLocks
1
0;
newState :=
local@MempoolWorkerLocalState{
locks_acquired := newLocks;
seen_all_writes := maxConsecutiveWrite;
seen_all_reads := maxConsecutiveRead;
};
newEnv := env@EngineEnv{localState := newState};
allShards :=
getAllShards (MempoolWorkerLocalState.transactions local);
makeUpdateMsg
(target : EngineID)
(isWrite : Bool)
(timestamp : TxFingerprint)
: EngineMsg (Anoma.PreMsg KVSKey KVSDatum Executable) :=
mkEngineMsg@{
sender := getEngineIDFromEngineCfg (ActionInput.cfg input);
target := target;
mailbox := some 0;
msg :=
Anoma.MsgShard
(ShardMsgUpdateSeenAll
mkUpdateSeenAllMsg@{
timestamp := timestamp;
write := isWrite;
});
};
writeMessages :=
map
\{shard := makeUpdateMsg shard true maxConsecutiveWrite}
(Set.toList allShards);
readMessages :=
map
\{shard := makeUpdateMsg shard false maxConsecutiveRead}
(Set.toList allShards);
in some
mkActionEffect@{
env := newEnv;
msgs := writeMessages ++ readMessages;
timers := [];
engines := [];
}
| _ := none
}
| _ := none;
executorFinishedAction
¶
Action processing execution completion notification from executor.
- State update
- Adds execution summary to execution_summaries map
- Messages to be sent
- None
- Engines to be spawned
- None
- Timer updates
- No timers are set or cancelled.
executorFinishedAction
{KVSKey KVSDatum Executable ProgramState}
(input : MempoolWorkerActionInput KVSKey KVSDatum Executable)
: Option
(MempoolWorkerActionEffect KVSKey KVSDatum Executable ProgramState) :=
let
env := ActionInput.env input;
local := EngineEnv.localState env;
trigger := ActionInput.trigger input;
in case getEngineMsgFromTimestampedTrigger trigger of
| some emsg :=
case emsg of {
| mkEngineMsg@{
msg := Anoma.MsgExecutor (ExecutorMsgExecutorFinished summary);
sender := sender;
} :=
case
Map.lookup
sender
(MempoolWorkerLocalState.transactionEngines local)
of {
| some tr :=
let
newState :=
local@MempoolWorkerLocalState{execution_summaries := Map.insert
tr
summary
(MempoolWorkerLocalState.execution_summaries local)};
newEnv := env@EngineEnv{localState := newState};
in some
mkActionEffect@{
env := newEnv;
msgs := [];
timers := [];
engines := [];
}
| _ := none
}
| _ := none
}
| _ := none;
Action Labels¶
transactionRequestActionLabel
{KVSKey KVSDatum Executable ProgramState}
{{Ord KVSKey}}
{{Runnable KVSKey KVSDatum Executable ProgramState}}
: MempoolWorkerActionExec KVSKey KVSDatum Executable ProgramState :=
Seq [transactionRequestAction];
lockAcquiredActionLabel
{KVSKey KVSDatum Executable ProgramState}
: MempoolWorkerActionExec KVSKey KVSDatum Executable ProgramState :=
Seq [lockAcquiredAction];
executorFinishedActionLabel
{KVSKey KVSDatum Executable ProgramState}
: MempoolWorkerActionExec KVSKey KVSDatum Executable ProgramState :=
Seq [executorFinishedAction];
Guards¶
Auxiliary Juvix code
MempoolWorkerGuard (KVSKey KVSDatum Executable ProgramState : Type) : Type :=
Guard
MempoolWorkerCfg
(MempoolWorkerLocalState KVSKey KVSDatum Executable)
MempoolWorkerMailboxState
MempoolWorkerTimerHandle
MempoolWorkerActionArguments
(Anoma.PreMsg KVSKey KVSDatum Executable)
(Anoma.PreCfg KVSKey KVSDatum Executable)
(Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);
MempoolWorkerGuardOutput
(KVSKey KVSDatum Executable ProgramState : Type) : Type :=
GuardOutput
MempoolWorkerCfg
(MempoolWorkerLocalState KVSKey KVSDatum Executable)
MempoolWorkerMailboxState
MempoolWorkerTimerHandle
MempoolWorkerActionArguments
(Anoma.PreMsg KVSKey KVSDatum Executable)
(Anoma.PreCfg KVSKey KVSDatum Executable)
(Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);
MempoolWorkerGuardEval
(KVSKey KVSDatum Executable ProgramState : Type) : Type :=
GuardEval
MempoolWorkerCfg
(MempoolWorkerLocalState KVSKey KVSDatum Executable)
MempoolWorkerMailboxState
MempoolWorkerTimerHandle
MempoolWorkerActionArguments
(Anoma.PreMsg KVSKey KVSDatum Executable)
(Anoma.PreCfg KVSKey KVSDatum Executable)
(Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);
transactionRequestGuard
¶
- Condition
- Message type is MempoolWorkerMsgTransactionRequest
transactionRequestGuard
{KVSKey KVSDatum Executable ProgramState}
{{Ord KVSKey}}
{{Runnable KVSKey KVSDatum Executable ProgramState}}
(trigger : TimestampedTrigger
MempoolWorkerTimerHandle
(Anoma.PreMsg KVSKey KVSDatum Executable))
(cfg : EngineCfg MempoolWorkerCfg)
(env : MempoolWorkerEnv KVSKey KVSDatum Executable)
: Option (MempoolWorkerGuardOutput KVSKey KVSDatum Executable ProgramState) :=
case getEngineMsgFromTimestampedTrigger trigger of
| some mkEngineMsg@{
msg := Anoma.MsgMempoolWorker (MempoolWorkerMsgTransactionRequest _);
} :=
some
mkGuardOutput@{
action := transactionRequestActionLabel;
args := [];
}
| _ := none;
lockAcquiredGuard
¶
- Condition
- Message type is ShardMsgKVSLockAc
lockAcquiredGuard
¶
- Condition
- Message type is ShardMsgKVSLockAcquired
lockAcquiredGuard
{KVSKey KVSDatum Executable ProgramState}
(trigger : TimestampedTrigger
MempoolWorkerTimerHandle
(Anoma.PreMsg KVSKey KVSDatum Executable))
(cfg : EngineCfg MempoolWorkerCfg)
(env : MempoolWorkerEnv KVSKey KVSDatum Executable)
: Option (MempoolWorkerGuardOutput KVSKey KVSDatum Executable ProgramState) :=
case getEngineMsgFromTimestampedTrigger trigger of
| some mkEngineMsg@{msg := Anoma.MsgShard (ShardMsgKVSLockAcquired _)} :=
some
mkGuardOutput@{
action := lockAcquiredActionLabel;
args := [];
}
| _ := none;
executorFinishedGuard
¶
- Condition
- Message type is ExecutorMsgExecutorFinished
executorFinishedGuard
{KVSKey KVSDatum Executable ProgramState}
(trigger : TimestampedTrigger
MempoolWorkerTimerHandle
(Anoma.PreMsg KVSKey KVSDatum Executable))
(cfg : EngineCfg MempoolWorkerCfg)
(env : MempoolWorkerEnv KVSKey KVSDatum Executable)
: Option (MempoolWorkerGuardOutput KVSKey KVSDatum Executable ProgramState) :=
case getEngineMsgFromTimestampedTrigger trigger of
| some mkEngineMsg@{
msg := Anoma.MsgExecutor (ExecutorMsgExecutorFinished _);
} :=
some
mkGuardOutput@{
action := executorFinishedActionLabel;
args := [];
}
| _ := none;
The Mempool Worker Behaviour¶
MempoolWorkerBehaviour
¶
MempoolWorkerBehaviour
(KVSKey KVSDatum Executable ProgramState : Type) : Type :=
EngineBehaviour
MempoolWorkerCfg
(MempoolWorkerLocalState KVSKey KVSDatum Executable)
MempoolWorkerMailboxState
MempoolWorkerTimerHandle
MempoolWorkerActionArguments
(Anoma.PreMsg KVSKey KVSDatum Executable)
(Anoma.PreCfg KVSKey KVSDatum Executable)
(Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);
Instantiation¶
instance
dummyRunnable : Runnable String String ByteString String :=
mkRunnable@{
executeStep := \{_ _ _ := error "Not implemented"};
halted := \{_ := false};
startingState := "";
};
mempoolWorkerBehaviour
: MempoolWorkerBehaviour String String ByteString String :=
mkEngineBehaviour@{
guards :=
First [transactionRequestGuard; lockAcquiredGuard; executorFinishedGuard];
};