module arch.node.engines.mempool_worker_behaviour;
import arch.node.engines.mempool_worker_messages open;
import arch.node.engines.mempool_worker_config open;
import arch.node.engines.mempool_worker_environment open;
import arch.node.engines.shard_messages open;
import arch.node.engines.executor_messages open;
import arch.node.engines.executor_config open;
import arch.node.engines.executor_environment open;
import prelude open;
import Stdlib.Data.Nat open;
import Stdlib.Data.List as List;
import Stdlib.Data.Set as Set;
import arch.node.types.basics open;
import arch.node.types.identities open;
import arch.node.types.messages open;
import arch.node.types.engine open;
import arch.node.types.anoma as Anoma open;
axiom sign
{KVSKey Executable}
: TxFingerprint -> TransactionCandidate KVSKey KVSKey Executable -> Signature;
axiom hash
{KVSKey Executable}
: TxFingerprint -> TransactionCandidate KVSKey KVSKey Executable -> Hash;
syntax alias MempoolWorkerActionArgument := Unit;
MempoolWorkerActionArguments : Type := List MempoolWorkerActionArgument;
MempoolWorkerAction (KVSKey KVSDatum Executable ProgramState : Type) : Type :=
Action
MempoolWorkerCfg
(MempoolWorkerLocalState KVSKey KVSDatum Executable)
MempoolWorkerMailboxState
MempoolWorkerTimerHandle
MempoolWorkerActionArguments
(Anoma.PreMsg KVSKey KVSDatum Executable)
(Anoma.PreCfg KVSKey KVSDatum Executable)
(Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);
MempoolWorkerActionInput (KVSKey KVSDatum Executable : Type) : Type :=
ActionInput
MempoolWorkerCfg
(MempoolWorkerLocalState KVSKey KVSDatum Executable)
MempoolWorkerMailboxState
MempoolWorkerTimerHandle
MempoolWorkerActionArguments
(Anoma.PreMsg KVSKey KVSDatum Executable);
MempoolWorkerActionEffect
(KVSKey KVSDatum Executable ProgramState : Type) : Type :=
ActionEffect
(MempoolWorkerLocalState KVSKey KVSDatum Executable)
MempoolWorkerMailboxState
MempoolWorkerTimerHandle
(Anoma.PreMsg KVSKey KVSDatum Executable)
(Anoma.PreCfg KVSKey KVSDatum Executable)
(Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);
MempoolWorkerActionExec
(KVSKey KVSDatum Executable ProgramState : Type) : Type :=
ActionExec
MempoolWorkerCfg
(MempoolWorkerLocalState KVSKey KVSDatum Executable)
MempoolWorkerMailboxState
MempoolWorkerTimerHandle
MempoolWorkerActionArguments
(Anoma.PreMsg KVSKey KVSDatum Executable)
(Anoma.PreCfg KVSKey KVSDatum Executable)
(Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);
transactionRequestAction
{KVSKey KVSDatum Executable ProgramState}
{{Ord KVSKey}}
{{rinst : Runnable KVSKey KVSDatum Executable ProgramState}}
(input : MempoolWorkerActionInput KVSKey KVSDatum Executable)
: Option
(MempoolWorkerActionEffect KVSKey KVSDatum Executable ProgramState) :=
let
env := ActionInput.env input;
cfg := ActionInput.cfg input;
local := EngineEnv.localState env;
trigger := ActionInput.trigger input;
in case getEngineMsgFromTimestampedTrigger trigger of
| some emsg :=
case emsg of {
| EngineMsg.mk@{
msg := Anoma.PreMsg.MsgMempoolWorker (MempoolWorkerMsg.TransactionRequest request);
sender := sender;
} :=
let
fingerprint := MempoolWorkerLocalState.gensym local + 1;
worker_id := getEngineIDFromEngineCfg cfg;
candidate := TransactionRequest.tx request;
executor_name := nameGen "executor" (snd worker_id) worker_id;
executor_id := mkPair none executor_name;
executorCfg :=
Anoma.PreCfg.CfgExecutor
ExecutorCfg.mk@{
timestamp := fingerprint;
executable := TransactionCandidate.executable candidate;
lazy_read_keys := Set.empty;
eager_read_keys :=
Set.fromList
(TransactionLabel.read
(TransactionCandidate.label candidate));
will_write_keys :=
Set.fromList
(TransactionLabel.write
(TransactionCandidate.label candidate));
may_write_keys := Set.empty;
worker := worker_id;
issuer := sender;
};
executorEnv :=
Anoma.PreEnv.EnvExecutor
EngineEnv.mk@{
localState :=
ExecutorLocalState.mk@{
program_state := Runnable.startingState {{rinst}};
completed_reads := Map.empty;
completed_writes := Map.empty;
};
mailboxCluster := Map.empty;
acquaintances := Set.empty;
timers := [];
};
newState :=
local@MempoolWorkerLocalState{
gensym := fingerprint;
transactions := Map.insert
fingerprint
candidate
(MempoolWorkerLocalState.transactions local);
transactionEngines := Map.insert
executor_id
fingerprint
(MempoolWorkerLocalState.transactionEngines local);
};
newEnv := env@EngineEnv{localState := newState};
read_keys :=
Set.fromList
(TransactionLabel.read
(TransactionCandidate.label candidate));
write_keys :=
Set.fromList
(TransactionLabel.write
(TransactionCandidate.label candidate));
shards :=
Set.toList
(Set.map keyToShard (Set.union read_keys write_keys));
shardMsgs :=
map
\{shard :=
let
shard_read_keys :=
Set.filter
\{key := snd (keyToShard key) == snd shard}
read_keys;
shard_write_keys :=
Set.filter
\{key := snd (keyToShard key) == snd shard}
write_keys;
lockRequest :=
KVSAcquireLockMsg.mkKVSAcquireLockMsg@{
lazy_read_keys := Set.empty;
eager_read_keys := shard_read_keys;
will_write_keys := shard_write_keys;
may_write_keys := Set.empty;
worker := worker_id;
executor := executor_id;
timestamp := fingerprint;
};
in EngineMsg.mk@{
sender := worker_id;
target := shard;
mailbox := some 0;
msg :=
Anoma.PreMsg.MsgShard
(ShardMsg.KVSAcquireLock lockRequest);
}}
shards;
ackMsg :=
EngineMsg.mk@{
sender := worker_id;
target := sender;
mailbox := some 0;
msg :=
Anoma.PreMsg.MsgMempoolWorker
(MempoolWorkerMsg.TransactionAck
TransactionAck.mkTransactionAck@{
tx_hash := hash fingerprint candidate;
batch_number :=
MempoolWorkerLocalState.batch_number local;
batch_start := 0;
worker_id := worker_id;
signature := sign fingerprint candidate;
});
};
in some
ActionEffect.mk@{
env := newEnv;
msgs := ackMsg :: shardMsgs;
timers := [];
engines := [mkPair executorCfg executorEnv];
}
| _ := none
}
| _ := none;
allLocksAcquired
{KVSKey Executable}
(isWrite : Bool)
(tx : TransactionCandidate KVSKey KVSKey Executable)
(txNum : TxFingerprint)
(locks : List (Pair EngineID KVSLockAcquiredMsg))
: Bool :=
let
keys :=
case isWrite of
| true := TransactionLabel.write (TransactionCandidate.label tx)
| false := TransactionLabel.read (TransactionCandidate.label tx);
neededShards := Set.fromList (map keyToShard keys);
lockingShards :=
Set.fromList
(map
fst
(List.filter
\{lock := KVSLockAcquiredMsg.timestamp (snd lock) == txNum}
locks));
in Set.isSubset neededShards lockingShards;
--- Finds the highest transaction fingerprint N such that all transactions with fingerprints 1..N
--- have acquired all their necessary locks of the specified type (read or write). This represents
--- the "safe point" up to which shards can process transactions without worrying about missing locks.
terminating
findMaxConsecutiveLocked
{KVSKey Executable}
(isWrite : Bool)
(transactions : Map
TxFingerprint
(TransactionCandidate KVSKey KVSKey Executable))
(locks : List (Pair EngineID KVSLockAcquiredMsg))
(current : TxFingerprint)
(prev : TxFingerprint)
: TxFingerprint :=
case Map.lookup current transactions of
| none := prev
| some tx :=
case allLocksAcquired isWrite tx current locks of
| true :=
findMaxConsecutiveLocked
isWrite
transactions
locks
(current + 1)
current
| false := prev;
getAllShards
{KVSKey Executable}
(transactions : Map
TxFingerprint
(TransactionCandidate KVSKey KVSKey Executable))
: Set EngineID :=
let
getAllKeysFromLabel
(label : TransactionLabel KVSKey KVSKey) : List KVSKey :=
TransactionLabel.read label ++ TransactionLabel.write label;
allKeys :=
List.concatMap
\{tx := getAllKeysFromLabel (TransactionCandidate.label tx)}
(Map.values transactions);
in Set.fromList (map keyToShard allKeys);
lockAcquiredAction
{KVSKey KVSDatum Executable ProgramState}
(input : MempoolWorkerActionInput KVSKey KVSDatum Executable)
: Option
(MempoolWorkerActionEffect KVSKey KVSDatum Executable ProgramState) :=
let
env := ActionInput.env input;
local := EngineEnv.localState env;
trigger := ActionInput.trigger input;
in case getEngineMsgFromTimestampedTrigger trigger of
| some emsg :=
case emsg of {
| EngineMsg.mk@{
msg := Anoma.PreMsg.MsgShard (ShardMsg.KVSLockAcquired lockMsg);
sender := sender;
} :=
let
timestamp := KVSLockAcquiredMsg.timestamp lockMsg;
newLocks :=
mkPair sender lockMsg
:: MempoolWorkerLocalState.locks_acquired local;
maxConsecutiveWrite :=
findMaxConsecutiveLocked
true
(MempoolWorkerLocalState.transactions local)
newLocks
1
0;
maxConsecutiveRead :=
findMaxConsecutiveLocked
false
(MempoolWorkerLocalState.transactions local)
newLocks
1
0;
newState :=
local@MempoolWorkerLocalState{
locks_acquired := newLocks;
seen_all_writes := maxConsecutiveWrite;
seen_all_reads := maxConsecutiveRead;
};
newEnv := env@EngineEnv{localState := newState};
allShards :=
getAllShards (MempoolWorkerLocalState.transactions local);
makeUpdateMsg
(target : EngineID)
(isWrite : Bool)
(timestamp : TxFingerprint)
: EngineMsg (Anoma.PreMsg KVSKey KVSDatum Executable) :=
EngineMsg.mk@{
sender := getEngineIDFromEngineCfg (ActionInput.cfg input);
target := target;
mailbox := some 0;
msg :=
Anoma.PreMsg.MsgShard
(ShardMsg.UpdateSeenAll
UpdateSeenAllMsg.mkUpdateSeenAllMsg@{
timestamp := timestamp;
write := isWrite;
});
};
writeMessages :=
map
\{shard := makeUpdateMsg shard true maxConsecutiveWrite}
(Set.toList allShards);
readMessages :=
map
\{shard := makeUpdateMsg shard false maxConsecutiveRead}
(Set.toList allShards);
in some
ActionEffect.mk@{
env := newEnv;
msgs := writeMessages ++ readMessages;
timers := [];
engines := [];
}
| _ := none
}
| _ := none;
executorFinishedAction
{KVSKey KVSDatum Executable ProgramState}
(input : MempoolWorkerActionInput KVSKey KVSDatum Executable)
: Option
(MempoolWorkerActionEffect KVSKey KVSDatum Executable ProgramState) :=
let
env := ActionInput.env input;
local := EngineEnv.localState env;
trigger := ActionInput.trigger input;
in case getEngineMsgFromTimestampedTrigger trigger of
| some emsg :=
case emsg of {
| EngineMsg.mk@{
msg := Anoma.PreMsg.MsgExecutor (ExecutorMsg.ExecutorFinished summary);
sender := sender;
} :=
case
Map.lookup
sender
(MempoolWorkerLocalState.transactionEngines local)
of {
| some tr :=
let
newState :=
local@MempoolWorkerLocalState{execution_summaries := Map.insert
tr
summary
(MempoolWorkerLocalState.execution_summaries local)};
newEnv := env@EngineEnv{localState := newState};
in some
ActionEffect.mk@{
env := newEnv;
msgs := [];
timers := [];
engines := [];
}
| _ := none
}
| _ := none
}
| _ := none;
transactionRequestActionLabel
{KVSKey KVSDatum Executable ProgramState}
{{Ord KVSKey}}
{{Runnable KVSKey KVSDatum Executable ProgramState}}
: MempoolWorkerActionExec KVSKey KVSDatum Executable ProgramState :=
ActionExec.Seq [transactionRequestAction];
lockAcquiredActionLabel
{KVSKey KVSDatum Executable ProgramState}
: MempoolWorkerActionExec KVSKey KVSDatum Executable ProgramState :=
ActionExec.Seq [lockAcquiredAction];
executorFinishedActionLabel
{KVSKey KVSDatum Executable ProgramState}
: MempoolWorkerActionExec KVSKey KVSDatum Executable ProgramState :=
ActionExec.Seq [executorFinishedAction];
MempoolWorkerGuard (KVSKey KVSDatum Executable ProgramState : Type) : Type :=
Guard
MempoolWorkerCfg
(MempoolWorkerLocalState KVSKey KVSDatum Executable)
MempoolWorkerMailboxState
MempoolWorkerTimerHandle
MempoolWorkerActionArguments
(Anoma.PreMsg KVSKey KVSDatum Executable)
(Anoma.PreCfg KVSKey KVSDatum Executable)
(Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);
MempoolWorkerGuardOutput
(KVSKey KVSDatum Executable ProgramState : Type) : Type :=
GuardOutput
MempoolWorkerCfg
(MempoolWorkerLocalState KVSKey KVSDatum Executable)
MempoolWorkerMailboxState
MempoolWorkerTimerHandle
MempoolWorkerActionArguments
(Anoma.PreMsg KVSKey KVSDatum Executable)
(Anoma.PreCfg KVSKey KVSDatum Executable)
(Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);
MempoolWorkerGuardEval
(KVSKey KVSDatum Executable ProgramState : Type) : Type :=
GuardEval
MempoolWorkerCfg
(MempoolWorkerLocalState KVSKey KVSDatum Executable)
MempoolWorkerMailboxState
MempoolWorkerTimerHandle
MempoolWorkerActionArguments
(Anoma.PreMsg KVSKey KVSDatum Executable)
(Anoma.PreCfg KVSKey KVSDatum Executable)
(Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);
transactionRequestGuard
{KVSKey KVSDatum Executable ProgramState}
{{Ord KVSKey}}
{{Runnable KVSKey KVSDatum Executable ProgramState}}
(trigger : TimestampedTrigger
MempoolWorkerTimerHandle
(Anoma.PreMsg KVSKey KVSDatum Executable))
(cfg : EngineCfg MempoolWorkerCfg)
(env : MempoolWorkerEnv KVSKey KVSDatum Executable)
: Option (MempoolWorkerGuardOutput KVSKey KVSDatum Executable ProgramState) :=
case getEngineMsgFromTimestampedTrigger trigger of
| some EngineMsg.mk@{
msg := Anoma.PreMsg.MsgMempoolWorker (MempoolWorkerMsg.TransactionRequest _);
} :=
some
GuardOutput.mk@{
action := transactionRequestActionLabel;
args := [];
}
| _ := none;
lockAcquiredGuard
{KVSKey KVSDatum Executable ProgramState}
(trigger : TimestampedTrigger
MempoolWorkerTimerHandle
(Anoma.PreMsg KVSKey KVSDatum Executable))
(cfg : EngineCfg MempoolWorkerCfg)
(env : MempoolWorkerEnv KVSKey KVSDatum Executable)
: Option (MempoolWorkerGuardOutput KVSKey KVSDatum Executable ProgramState) :=
case getEngineMsgFromTimestampedTrigger trigger of
| some EngineMsg.mk@{
msg := Anoma.PreMsg.MsgShard (ShardMsg.KVSLockAcquired _);
} :=
some
GuardOutput.mk@{
action := lockAcquiredActionLabel;
args := [];
}
| _ := none;
executorFinishedGuard
{KVSKey KVSDatum Executable ProgramState}
(trigger : TimestampedTrigger
MempoolWorkerTimerHandle
(Anoma.PreMsg KVSKey KVSDatum Executable))
(cfg : EngineCfg MempoolWorkerCfg)
(env : MempoolWorkerEnv KVSKey KVSDatum Executable)
: Option (MempoolWorkerGuardOutput KVSKey KVSDatum Executable ProgramState) :=
case getEngineMsgFromTimestampedTrigger trigger of
| some EngineMsg.mk@{
msg := Anoma.PreMsg.MsgExecutor (ExecutorMsg.ExecutorFinished _);
} :=
some
GuardOutput.mk@{
action := executorFinishedActionLabel;
args := [];
}
| _ := none;
MempoolWorkerBehaviour
(KVSKey KVSDatum Executable ProgramState : Type) : Type :=
EngineBehaviour
MempoolWorkerCfg
(MempoolWorkerLocalState KVSKey KVSDatum Executable)
MempoolWorkerMailboxState
MempoolWorkerTimerHandle
MempoolWorkerActionArguments
(Anoma.PreMsg KVSKey KVSDatum Executable)
(Anoma.PreCfg KVSKey KVSDatum Executable)
(Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);
instance
dummyRunnable : Runnable String String ByteString String :=
Runnable.mkRunnable@{
executeStep := \{_ _ _ := error "Not implemented"};
halted := \{_ := false};
startingState := "";
};
mempoolWorkerBehaviour
: MempoolWorkerBehaviour String String ByteString String :=
EngineBehaviour.mk@{
guards :=
GuardEval.First
[transactionRequestGuard; lockAcquiredGuard; executorFinishedGuard];
};