Juvix imports
module arch.node.engines.shard_behaviour;
import arch.node.engines.shard_messages open;
import arch.node.engines.shard_config open;
import arch.node.engines.shard_environment open;
import Stdlib.Data.Nat open;
import Stdlib.Data.List as List;
import prelude open;
import arch.node.types.basics open;
import arch.node.types.identities open;
import arch.node.types.messages open;
import arch.node.types.engine open;
import arch.node.types.anoma as Anoma open;
Shard Behaviour¶
Action arguments¶
ShardActionArgument
¶
type ShardActionArgument := ShardActionArgumentReplyTo EngineID;
ShardActionArguments
¶
ShardActionArguments : Type := List ShardActionArgument;
Helper Functions¶
maximumBy {A : Type} (f : A -> Nat) (lst : List A) : Option A :=
let
maxHelper :=
\{curr acc :=
case acc of
| none := some curr
| some maxVal :=
case f curr > f maxVal of
| true := some curr
| false := some maxVal};
in foldr maxHelper none lst;
findMostRecentWrite
(dag : DAGStructure)
(key : KVSKey)
(timestamp : TxFingerprint)
: Option KVSDatum :=
case Map.lookup key (DAGStructure.keyAccesses dag) of
| none := none
| some timestampMap :=
let
validEntries :=
List.filter
\{entry :=
fst entry < timestamp
&& case KeyAccess.writeStatus (snd entry) of {
| some writeStatus :=
not
(WriteStatus.mayWrite writeStatus
&& isNone (WriteStatus.data writeStatus))
| none := false
}}
(Map.toList timestampMap);
in case maximumBy \{entry := fst entry} validEntries of
| some (mkPair _ access) :=
case KeyAccess.writeStatus access of {
| some writeStatus := WriteStatus.data writeStatus
| none := none
}
| none := none;
-- add read without prior lock
addReadAccess
(dag : DAGStructure)
(key : KVSKey)
(timestamp : TxFingerprint)
(readStatus : ReadStatus)
: DAGStructure :=
let
keyMap :=
case Map.lookup key (DAGStructure.keyAccesses dag) of
| none := Map.empty
| some m := m;
existingAccess :=
case Map.lookup timestamp keyMap of
| none :=
mkKeyAccess@{
readStatus := none;
writeStatus := none;
}
| some access := access;
newAccess := existingAccess@KeyAccess{readStatus := some readStatus};
newKeyMap := Map.insert timestamp newAccess keyMap;
newKeyAccesses := Map.insert key newKeyMap (DAGStructure.keyAccesses dag);
in dag@DAGStructure{keyAccesses := newKeyAccesses};
-- add write without prior lock
addWriteAccess
(dag : DAGStructure)
(key : KVSKey)
(timestamp : TxFingerprint)
(writeStatus : WriteStatus)
: DAGStructure :=
let
keyMap :=
case Map.lookup key (DAGStructure.keyAccesses dag) of
| none := Map.empty
| some m := m;
existingAccess :=
case Map.lookup timestamp keyMap of
| none :=
mkKeyAccess@{
readStatus := none;
writeStatus := none;
}
| some access := access;
newAccess := existingAccess@KeyAccess{writeStatus := some writeStatus};
newKeyMap := Map.insert timestamp newAccess keyMap;
newKeyAccesses := Map.insert key newKeyMap (DAGStructure.keyAccesses dag);
in dag@DAGStructure{keyAccesses := newKeyAccesses};
-- Replaces if read lock exists
replaceReadAccess
(dag : DAGStructure)
(key : KVSKey)
(timestamp : TxFingerprint)
: Option DAGStructure :=
let
keyMap :=
case Map.lookup key (DAGStructure.keyAccesses dag) of
| none := Map.empty
| some m := m;
access :=
case Map.lookup timestamp keyMap of
| none := none
| some a := some a;
in case access of
| some a :=
case KeyAccess.readStatus a of {
| none := none
| some rs :=
let
updatedReadStatus := rs@ReadStatus{hasBeenRead := true};
updatedAccess :=
a@KeyAccess{readStatus := some updatedReadStatus};
updatedKeyMap := Map.insert timestamp updatedAccess keyMap;
updatedKeyAccesses :=
Map.insert key updatedKeyMap (DAGStructure.keyAccesses dag);
in some dag@DAGStructure{keyAccesses := updatedKeyAccesses}
}
| none := none;
-- Replaces if write lock exists
replaceWriteAccess
(dag : DAGStructure)
(key : KVSKey)
(timestamp : TxFingerprint)
(newData : Option KVSDatum)
: Option DAGStructure :=
let
keyMap :=
case Map.lookup key (DAGStructure.keyAccesses dag) of
| none := Map.empty
| some m := m;
in case Map.lookup timestamp keyMap of
| some a :=
case KeyAccess.writeStatus a of {
| none := none
| some ws :=
case isNone newData && not (WriteStatus.mayWrite ws) of {
| true := none
| false :=
let
data :=
case newData of
| none := WriteStatus.data ws
| some dat := some dat;
updatedAccess :=
a@KeyAccess{writeStatus := some
ws@WriteStatus{data := data}};
updatedKeyMap := Map.insert timestamp updatedAccess keyMap;
updatedKeyAccesses :=
Map.insert
key
updatedKeyMap
(DAGStructure.keyAccesses dag);
in some dag@DAGStructure{keyAccesses := updatedKeyAccesses}
}
}
| none := none;
generateReadMsg
(sender : EngineID)
(key : KVSKey)
(timestamp : TxFingerprint)
(data : KVSDatum)
(executor : EngineID)
: EngineMsg Msg :=
mkEngineMsg@{
sender := sender;
target := executor;
mailbox := some 0;
msg :=
Anoma.MsgShard
(ShardMsgKVSRead
mkKVSReadMsg@{
timestamp := timestamp;
key := key;
data := data;
});
};
execEagerReadsAtTime
(sender : EngineID)
(dag : DAGStructure)
(key : KVSKey)
(timestamp : TxFingerprint)
(access : KeyAccess)
: Option (Pair DAGStructure (EngineMsg Msg)) :=
case KeyAccess.readStatus access of
| some readStatus :=
case
ReadStatus.isEager readStatus && not (ReadStatus.hasBeenRead readStatus)
of {
| true :=
case
timestamp < DAGStructure.heardAllWrites dag
&& timestamp >= DAGStructure.heardAllReads dag
of {
| true :=
case findMostRecentWrite dag key timestamp of {
| some data :=
let
newReadStatus := readStatus@ReadStatus{hasBeenRead := true};
newDag := addReadAccess dag key timestamp newReadStatus;
msg :=
generateReadMsg
sender
key
timestamp
data
(ReadStatus.executor readStatus);
in some (mkPair newDag msg)
| none := none
}
| false := none
}
| false := none
}
| none := none;
execEagerReadsAtKey
(sender : EngineID)
(dag : DAGStructure)
(key : KVSKey)
(timestampMap : Map TxFingerprint KeyAccess)
: Pair DAGStructure (List (EngineMsg Msg)) :=
let
processTimestamp :=
\{k v acc :=
case acc of
mkPair currDag msgs :=
case execEagerReadsAtTime sender currDag key k v of
| some processed := mkPair (fst processed) (snd processed :: msgs)
| none := acc};
in Map.foldr processTimestamp (mkPair dag []) timestampMap;
execEagerReads
(sender : EngineID)
(dag : DAGStructure)
: Pair DAGStructure (List (EngineMsg Msg)) :=
let
processKey :=
\{k v acc :=
case acc of
mkPair currDag msgs :=
let
processed := execEagerReadsAtKey sender currDag k v;
in mkPair (fst processed) (msgs ++ snd processed)};
in Map.foldr processKey (mkPair dag []) (DAGStructure.keyAccesses dag);
Actions¶
Auxiliary Juvix code
ShardAction
¶
ShardAction : Type :=
Action
ShardCfg
ShardLocalState
ShardMailboxState
ShardTimerHandle
ShardActionArguments
Anoma.Msg
Anoma.Cfg
Anoma.Env;
ShardActionInput
¶
ShardActionInput : Type :=
ActionInput
ShardCfg
ShardLocalState
ShardMailboxState
ShardTimerHandle
ShardActionArguments
Anoma.Msg;
ShardActionEffect
¶
ShardActionEffect : Type :=
ActionEffect
ShardLocalState
ShardMailboxState
ShardTimerHandle
Anoma.Msg
Anoma.Cfg
Anoma.Env;
ShardActionExec
¶
ShardActionExec : Type :=
ActionExec
ShardCfg
ShardLocalState
ShardMailboxState
ShardTimerHandle
ShardActionArguments
Anoma.Msg
Anoma.Cfg
Anoma.Env;
acquireLockAction
¶
Process lock acquisition request and send confirmation.
- State update
- Update DAG with new read/write accesses.
- Messages to be sent
- KVSLockAcquired message to worker.
acquireLockAction (input : ShardActionInput) : Option ShardActionEffect :=
let
cfg := ActionInput.cfg input;
env := ActionInput.env input;
local := EngineEnv.localState env;
trigger := ActionInput.trigger input;
in case getEngineMsgFromTimestampedTrigger trigger of
| some mkEngineMsg@{
msg := Anoma.MsgShard (ShardMsgKVSAcquireLock lockMsg);
} :=
let
addEagerReadAccesses :=
\{key dag :=
let
readStatus :=
mkReadStatus@{
hasBeenRead := false;
isEager := true;
executor := KVSAcquireLockMsg.executor lockMsg;
};
in addReadAccess
dag
key
(KVSAcquireLockMsg.timestamp lockMsg)
readStatus};
addLazyReadAccesses :=
\{key dag :=
let
readStatus :=
mkReadStatus@{
hasBeenRead := false;
isEager := false;
executor := KVSAcquireLockMsg.executor lockMsg;
};
in addReadAccess
dag
key
(KVSAcquireLockMsg.timestamp lockMsg)
readStatus};
addWillWriteAccesses :=
\{key dag :=
let
writeStatus :=
mkWriteStatus@{
data := none;
mayWrite := false;
};
in addWriteAccess
dag
key
(KVSAcquireLockMsg.timestamp lockMsg)
writeStatus};
addMayWriteAccesses :=
\{key dag :=
let
writeStatus :=
mkWriteStatus@{
data := none;
mayWrite := true;
};
in addWriteAccess
dag
key
(KVSAcquireLockMsg.timestamp lockMsg)
writeStatus};
dagWithEagerReads :=
Set.foldr
addEagerReadAccesses
(ShardLocalState.dagStructure local)
(KVSAcquireLockMsg.eager_read_keys lockMsg);
dagWithAllReads :=
Set.foldr
addLazyReadAccesses
dagWithEagerReads
(KVSAcquireLockMsg.lazy_read_keys lockMsg);
dagWithWillWrites :=
Set.foldr
addWillWriteAccesses
dagWithAllReads
(KVSAcquireLockMsg.will_write_keys lockMsg);
dagWithAllWrites :=
Set.foldr
addMayWriteAccesses
dagWithWillWrites
(KVSAcquireLockMsg.may_write_keys lockMsg);
propagationResult :=
execEagerReads (getEngineIDFromEngineCfg cfg) dagWithAllWrites;
newLocal :=
local@ShardLocalState{dagStructure := fst propagationResult};
newEnv := env@EngineEnv{localState := newLocal};
in some
mkActionEffect@{
env := newEnv;
msgs :=
mkEngineMsg@{
sender := getEngineIDFromEngineCfg (ActionInput.cfg input);
target := KVSAcquireLockMsg.worker lockMsg;
mailbox := some 0;
msg :=
Anoma.MsgShard
(ShardMsgKVSLockAcquired
mkKVSLockAcquiredMsg@{
timestamp := KVSAcquireLockMsg.timestamp lockMsg;
});
}
:: snd propagationResult;
timers := [];
engines := [];
}
| _ := none;
processWriteAction
¶
Process write request and potentially trigger eager reads.
- State update
- Update DAG with write data and trigger eager reads.
- Messages to be sent
- KVSRead messages if eligible eager reads are found.
processWriteAction (input : ShardActionInput) : Option ShardActionEffect :=
let
cfg := ActionInput.cfg input;
env := ActionInput.env input;
local := EngineEnv.localState env;
trigger := ActionInput.trigger input;
in case getEngineMsgFromTimestampedTrigger trigger of
| some mkEngineMsg@{msg := Anoma.MsgShard (ShardMsgKVSWrite writeMsg)} :=
let
dag := ShardLocalState.dagStructure local;
key := KVSWriteMsg.key writeMsg;
timestamp := KVSWriteMsg.timestamp writeMsg;
in case
replaceWriteAccess dag key timestamp (KVSWriteMsg.datum writeMsg)
of {
| some updatedDag :=
let
propagationResult :=
execEagerReads (getEngineIDFromEngineCfg cfg) updatedDag;
newLocal :=
local@ShardLocalState{dagStructure := fst
propagationResult};
newEnv := env@EngineEnv{localState := newLocal};
readMsgs := snd propagationResult;
in some
mkActionEffect@{
env := newEnv;
msgs := readMsgs;
timers := [];
engines := [];
}
| none := none
}
| _ := none;
processReadRequestAction
¶
Process read request and potentially send read response.
- State update
- Update DAG with read request status.
- Messages to be sent
- KVSRead message if read data is available.
processReadRequestAction
(input : ShardActionInput) : Option ShardActionEffect :=
let
cfg := ActionInput.cfg input;
env := ActionInput.env input;
local := EngineEnv.localState env;
trigger := ActionInput.trigger input;
in case getEngineMsgFromTimestampedTrigger trigger of
| some mkEngineMsg@{
sender := sender;
msg := Anoma.MsgShard (ShardMsgKVSReadRequest readReqMsg);
} :=
let
dag := ShardLocalState.dagStructure local;
key := KVSReadRequestMsg.key readReqMsg;
timestamp := KVSReadRequestMsg.timestamp readReqMsg;
actual := KVSReadRequestMsg.actual readReqMsg;
in case timestamp >= DAGStructure.heardAllReads dag of {
| false := none
| true :=
case replaceReadAccess dag key timestamp of {
| none := none
| some updatedDag :=
case actual of {
| false :=
let
newLocal :=
local@ShardLocalState{dagStructure := updatedDag};
newEnv := env@EngineEnv{localState := newLocal};
in some
mkActionEffect@{
env := newEnv;
msgs := [];
timers := [];
engines := [];
}
| true :=
case findMostRecentWrite updatedDag key timestamp of {
| none := none
| some data :=
let
readMsg :=
mkEngineMsg@{
sender := getEngineIDFromEngineCfg cfg;
target := sender;
mailbox := some 0;
msg :=
Anoma.MsgShard
(ShardMsgKVSRead
mkKVSReadMsg@{
timestamp := timestamp;
key := key;
data := data;
});
};
newLocal :=
local@ShardLocalState{dagStructure := updatedDag};
newEnv := env@EngineEnv{localState := newLocal};
in some
mkActionEffect@{
env := newEnv;
msgs := [readMsg];
timers := [];
engines := [];
}
}
}
}
}
| _ := none;
updateSeenAllAction
¶
Process seen-all update and potentially trigger eager reads.
- State update
- Update DAG barriers and trigger eager reads.
- Messages to be sent
- KVSRead messages if eligible eager reads are found.
updateSeenAllAction (input : ShardActionInput) : Option ShardActionEffect :=
let
cfg := ActionInput.cfg input;
env := ActionInput.env input;
local := EngineEnv.localState env;
trigger := ActionInput.trigger input;
in case getEngineMsgFromTimestampedTrigger trigger of
| some mkEngineMsg@{
msg := Anoma.MsgShard (ShardMsgUpdateSeenAll updateMsg);
} :=
let
oldDag := ShardLocalState.dagStructure local;
newDag :=
case UpdateSeenAllMsg.write updateMsg of
| true :=
oldDag@DAGStructure{heardAllWrites := UpdateSeenAllMsg.timestamp
updateMsg}
| false :=
oldDag@DAGStructure{heardAllReads := UpdateSeenAllMsg.timestamp
updateMsg};
propagationResult :=
case UpdateSeenAllMsg.write updateMsg of
| true := execEagerReads (getEngineIDFromEngineCfg cfg) newDag
| false := mkPair newDag [];
newLocal :=
local@ShardLocalState{dagStructure := fst propagationResult};
newEnv := env@EngineEnv{localState := newLocal};
readMsgs := snd propagationResult;
in some
mkActionEffect@{
env := newEnv;
msgs := readMsgs;
timers := [];
engines := [];
}
| _ := none;
Action Labels¶
acquireLockActionLabel
¶
acquireLockActionLabel : ShardActionExec := Seq [acquireLockAction];
processWriteActionLabel
¶
processWriteActionLabel : ShardActionExec := Seq [processWriteAction];
processReadRequestActionLabel
¶
processReadRequestActionLabel : ShardActionExec :=
Seq [processReadRequestAction];
updateSeenAllActionLabel
¶
updateSeenAllActionLabel : ShardActionExec := Seq [updateSeenAllAction];
Guards¶
Auxiliary Juvix code
ShardGuard
¶
ShardGuard : Type :=
Guard
ShardCfg
ShardLocalState
ShardMailboxState
ShardTimerHandle
ShardActionArguments
Anoma.Msg
Anoma.Cfg
Anoma.Env;
ShardGuardOutput
¶
ShardGuardOutput : Type :=
GuardOutput
ShardCfg
ShardLocalState
ShardMailboxState
ShardTimerHandle
ShardActionArguments
Anoma.Msg
Anoma.Cfg
Anoma.Env;
ShardGuardEval
¶
ShardGuardEval : Type :=
GuardEval
ShardCfg
ShardLocalState
ShardMailboxState
ShardTimerHandle
ShardActionArguments
Anoma.Msg
Anoma.Cfg
Anoma.Env;
acquireLockGuard
¶
- Condition
- Message type is ShardMsgKVSAcquireLock.
acquireLockGuard
(trigger : TimestampedTrigger ShardTimerHandle Anoma.Msg)
(cfg : EngineCfg ShardCfg)
(env : ShardEnv)
: Option ShardGuardOutput :=
case getEngineMsgFromTimestampedTrigger trigger of
| some mkEngineMsg@{msg := Anoma.MsgShard (ShardMsgKVSAcquireLock _)} :=
some
mkGuardOutput@{
action := acquireLockActionLabel;
args := [];
}
| _ := none;
processWriteGuard
¶
- Condition
- Message type is ShardMsgKVSWrite.
processWriteGuard
(trigger : TimestampedTrigger ShardTimerHandle Anoma.Msg)
(cfg : EngineCfg ShardCfg)
(env : ShardEnv)
: Option ShardGuardOutput :=
case getEngineMsgFromTimestampedTrigger trigger of
| some mkEngineMsg@{msg := Anoma.MsgShard (ShardMsgKVSWrite _)} :=
some
mkGuardOutput@{
action := processWriteActionLabel;
args := [];
}
| _ := none;
processReadRequestGuard
¶
- Condition
- Message type is ShardMsgKVSReadRequest.
processReadRequestGuard
(trigger : TimestampedTrigger ShardTimerHandle Anoma.Msg)
(cfg : EngineCfg ShardCfg)
(env : ShardEnv)
: Option ShardGuardOutput :=
case getEngineMsgFromTimestampedTrigger trigger of
| some mkEngineMsg@{msg := Anoma.MsgShard (ShardMsgKVSReadRequest _)} :=
some
mkGuardOutput@{
action := processReadRequestActionLabel;
args := [];
}
| _ := none;
updateSeenAllGuard
¶
- Condition
- Message type is ShardMsgUpdateSeenAll.
updateSeenAllGuard
(trigger : TimestampedTrigger ShardTimerHandle Anoma.Msg)
(cfg : EngineCfg ShardCfg)
(env : ShardEnv)
: Option ShardGuardOutput :=
case getEngineMsgFromTimestampedTrigger trigger of
| some mkEngineMsg@{msg := Anoma.MsgShard (ShardMsgUpdateSeenAll _)} :=
some
mkGuardOutput@{
action := updateSeenAllActionLabel;
args := [];
}
| _ := none;
The Shard Behaviour¶
ShardBehaviour
¶
ShardBehaviour : Type :=
EngineBehaviour
ShardCfg
ShardLocalState
ShardMailboxState
ShardTimerHandle
ShardActionArguments
Anoma.Msg
Anoma.Cfg
Anoma.Env;
Instantiation¶
shardBehaviour : ShardBehaviour :=
mkEngineBehaviour@{
guards :=
First
[
acquireLockGuard;
processWriteGuard;
processReadRequestGuard;
updateSeenAllGuard;
];
};