module arch.node.engines.shard_behaviour;
import arch.node.engines.shard_messages open;
import arch.node.engines.shard_config open;
import arch.node.engines.shard_environment open;
import Stdlib.Data.Nat open;
import Stdlib.Data.List as List;
import prelude open;
import arch.node.types.basics open;
import arch.node.types.identities open;
import arch.node.types.messages open;
import arch.node.types.engine open;
import arch.node.types.anoma as Anoma open;
type ShardActionArgument := | ShardActionArgumentReplyTo EngineID;
ShardActionArguments : Type := List ShardActionArgument;
findMostRecentWrite
{KVSKey KVSDatum}
{{Ord KVSKey}}
(dag : DAGStructure KVSKey KVSDatum)
(key : KVSKey)
(timestamp : TxFingerprint)
: Option KVSDatum :=
case Map.lookup key (DAGStructure.keyAccesses dag) of
| none := none
| some timestampMap :=
let
validEntries :=
List.filter
\{entry :=
fst entry < timestamp
&& case KeyAccess.writeStatus (snd entry) of {
| some writeStatus :=
not
(WriteStatus.mayWrite writeStatus
&& isNone (WriteStatus.data writeStatus))
| none := false
}}
(Map.toList timestampMap);
in case maximumBy \{entry := fst entry} validEntries of
| some (mkPair _ access) :=
case KeyAccess.writeStatus access of {
| some writeStatus := WriteStatus.data writeStatus
| none := none
}
| none := none;
addReadAccess
{KVSKey KVSDatum}
{{Ord KVSKey}}
(dag : DAGStructure KVSKey KVSDatum)
(key : KVSKey)
(timestamp : TxFingerprint)
(readStatus : ReadStatus)
: DAGStructure KVSKey KVSDatum :=
let
keyMap :=
case Map.lookup key (DAGStructure.keyAccesses dag) of
| none := Map.empty
| some m := m;
existingAccess :=
case Map.lookup timestamp keyMap of
| none :=
mkKeyAccess@{
readStatus := none;
writeStatus := none;
}
| some access := access;
newAccess := existingAccess@KeyAccess{readStatus := some readStatus};
newKeyMap := Map.insert timestamp newAccess keyMap;
newKeyAccesses := Map.insert key newKeyMap (DAGStructure.keyAccesses dag);
in dag@DAGStructure{keyAccesses := newKeyAccesses};
addWriteAccess
{KVSKey KVSDatum}
{{Ord KVSKey}}
(dag : DAGStructure KVSKey KVSDatum)
(key : KVSKey)
(timestamp : TxFingerprint)
(writeStatus : WriteStatus KVSDatum)
: DAGStructure KVSKey KVSDatum :=
let
keyMap :=
case Map.lookup key (DAGStructure.keyAccesses dag) of
| none := Map.empty
| some m := m;
existingAccess :=
case Map.lookup timestamp keyMap of
| none :=
mkKeyAccess@{
readStatus := none;
writeStatus := none;
}
| some access := access;
newAccess := existingAccess@KeyAccess{writeStatus := some writeStatus};
newKeyMap := Map.insert timestamp newAccess keyMap;
newKeyAccesses := Map.insert key newKeyMap (DAGStructure.keyAccesses dag);
in dag@DAGStructure{keyAccesses := newKeyAccesses};
replaceReadAccess
{KVSKey KVSDatum}
{{Ord KVSKey}}
(dag : DAGStructure KVSKey KVSDatum)
(key : KVSKey)
(timestamp : TxFingerprint)
: Option (DAGStructure KVSKey KVSDatum) :=
let
keyMap :=
case Map.lookup key (DAGStructure.keyAccesses dag) of
| none := Map.empty
| some m := m;
access :=
case Map.lookup timestamp keyMap of
| none := none
| some a := some a;
in case access of
| some a :=
case KeyAccess.readStatus a of {
| none := none
| some rs :=
let
updatedReadStatus := rs@ReadStatus{hasBeenRead := true};
updatedAccess :=
a@KeyAccess{readStatus := some updatedReadStatus};
updatedKeyMap := Map.insert timestamp updatedAccess keyMap;
updatedKeyAccesses :=
Map.insert key updatedKeyMap (DAGStructure.keyAccesses dag);
in some dag@DAGStructure{keyAccesses := updatedKeyAccesses}
}
| none := none;
replaceWriteAccess
{KVSKey KVSDatum}
{{Ord KVSKey}}
(dag : DAGStructure KVSKey KVSDatum)
(key : KVSKey)
(timestamp : TxFingerprint)
(newData : Option KVSDatum)
: Option (DAGStructure KVSKey KVSDatum) :=
let
keyMap :=
case Map.lookup key (DAGStructure.keyAccesses dag) of
| none := Map.empty
| some m := m;
in case Map.lookup timestamp keyMap of
| some a :=
case KeyAccess.writeStatus a of {
| none := none
| some ws :=
case isNone newData && not (WriteStatus.mayWrite ws) of {
| true := none
| false :=
let
data :=
case newData of
| none := WriteStatus.data ws
| some dat := some dat;
updatedAccess :=
a@KeyAccess{writeStatus := some
ws@WriteStatus{data := data}};
updatedKeyMap := Map.insert timestamp updatedAccess keyMap;
updatedKeyAccesses :=
Map.insert
key
updatedKeyMap
(DAGStructure.keyAccesses dag);
in some dag@DAGStructure{keyAccesses := updatedKeyAccesses}
}
}
| none := none;
generateReadMsg
{KVSKey KVSDatum Executable}
(sender : EngineID)
(key : KVSKey)
(timestamp : TxFingerprint)
(data : KVSDatum)
(executor : EngineID)
: EngineMsg (PreMsg KVSKey KVSDatum Executable) :=
mkEngineMsg@{
sender := sender;
target := executor;
mailbox := some 0;
msg :=
Anoma.MsgShard
(ShardMsgKVSRead
mkKVSReadMsg@{
timestamp := timestamp;
key := key;
data := data;
});
};
execEagerReadsAtTime
{KVSKey KVSDatum Executable}
{{Ord KVSKey}}
(sender : EngineID)
(dag : DAGStructure KVSKey KVSDatum)
(key : KVSKey)
(timestamp : TxFingerprint)
(access : KeyAccess KVSDatum)
: Option
(Pair
(DAGStructure KVSKey KVSDatum)
(EngineMsg (Anoma.PreMsg KVSKey KVSDatum Executable))) :=
case KeyAccess.readStatus access of
| some readStatus :=
case
ReadStatus.isEager readStatus && not (ReadStatus.hasBeenRead readStatus)
of {
| true :=
case
timestamp < DAGStructure.heardAllWrites dag
&& timestamp >= DAGStructure.heardAllReads dag
of {
| true :=
case findMostRecentWrite dag key timestamp of {
| some data :=
let
newReadStatus := readStatus@ReadStatus{hasBeenRead := true};
newDag := addReadAccess dag key timestamp newReadStatus;
msg :=
generateReadMsg
sender
key
timestamp
data
(ReadStatus.executor readStatus);
in some (mkPair newDag msg)
| none := none
}
| false := none
}
| false := none
}
| none := none;
execEagerReadsAtKey
{KVSKey KVSDatum Executable}
{{Ord KVSKey}}
(sender : EngineID)
(dag : DAGStructure KVSKey KVSDatum)
(key : KVSKey)
(timestampMap : Map TxFingerprint (KeyAccess KVSDatum))
: Pair
(DAGStructure KVSKey KVSDatum)
(List (EngineMsg (Anoma.PreMsg KVSKey KVSDatum Executable))) :=
let
processTimestamp :=
\{k v acc :=
case acc of
| mkPair currDag msgs :=
case execEagerReadsAtTime sender currDag key k v of
| some processed := mkPair (fst processed) (snd processed :: msgs)
| none := acc};
in Map.foldr processTimestamp (mkPair dag []) timestampMap;
execEagerReads
{KVSKey KVSDatum Executable}
{{Ord KVSKey}}
(sender : EngineID)
(dag : DAGStructure KVSKey KVSDatum)
: Pair
(DAGStructure KVSKey KVSDatum)
(List (EngineMsg (Anoma.PreMsg KVSKey KVSDatum Executable))) :=
let
processKey :=
\{k v acc :=
case acc of
| mkPair currDag msgs :=
let
processed := execEagerReadsAtKey sender currDag k v;
in mkPair (fst processed) (msgs ++ snd processed)};
in Map.foldr processKey (mkPair dag []) (DAGStructure.keyAccesses dag);
ShardAction (KVSKey KVSDatum Executable ProgramState : Type) : Type :=
Action
ShardCfg
(ShardLocalState KVSKey KVSDatum)
ShardMailboxState
ShardTimerHandle
ShardActionArguments
(Anoma.PreMsg KVSKey KVSDatum Executable)
(Anoma.PreCfg KVSKey KVSDatum Executable)
(Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);
ShardActionInput (KVSKey KVSDatum Executable : Type) : Type :=
ActionInput
ShardCfg
(ShardLocalState KVSKey KVSDatum)
ShardMailboxState
ShardTimerHandle
ShardActionArguments
(Anoma.PreMsg KVSKey KVSDatum Executable);
ShardActionEffect (KVSKey KVSDatum Executable ProgramState : Type) : Type :=
ActionEffect
(ShardLocalState KVSKey KVSDatum)
ShardMailboxState
ShardTimerHandle
(Anoma.PreMsg KVSKey KVSDatum Executable)
(Anoma.PreCfg KVSKey KVSDatum Executable)
(Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);
ShardActionExec (KVSKey KVSDatum Executable ProgramState : Type) : Type :=
ActionExec
ShardCfg
(ShardLocalState KVSKey KVSDatum)
ShardMailboxState
ShardTimerHandle
ShardActionArguments
(Anoma.PreMsg KVSKey KVSDatum Executable)
(Anoma.PreCfg KVSKey KVSDatum Executable)
(Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);
acquireLockAction
{KVSKey KVSDatum Executable ProgramState}
{{Ord KVSKey}}
(input : ShardActionInput KVSKey KVSDatum Executable)
: Option (ShardActionEffect KVSKey KVSDatum Executable ProgramState) :=
let
cfg := ActionInput.cfg input;
env := ActionInput.env input;
local := EngineEnv.localState env;
trigger := ActionInput.trigger input;
in case getEngineMsgFromTimestampedTrigger trigger of
| some mkEngineMsg@{
msg := Anoma.MsgShard (ShardMsgKVSAcquireLock lockMsg);
} :=
let
addEagerReadAccesses :=
\{key dag :=
let
readStatus :=
mkReadStatus@{
hasBeenRead := false;
isEager := true;
executor := KVSAcquireLockMsg.executor lockMsg;
};
in addReadAccess
dag
key
(KVSAcquireLockMsg.timestamp lockMsg)
readStatus};
addLazyReadAccesses :=
\{key dag :=
let
readStatus :=
mkReadStatus@{
hasBeenRead := false;
isEager := false;
executor := KVSAcquireLockMsg.executor lockMsg;
};
in addReadAccess
dag
key
(KVSAcquireLockMsg.timestamp lockMsg)
readStatus};
addWillWriteAccesses :=
\{key dag :=
let
writeStatus :=
mkWriteStatus@{
data := none;
mayWrite := false;
};
in addWriteAccess
dag
key
(KVSAcquireLockMsg.timestamp lockMsg)
writeStatus};
addMayWriteAccesses :=
\{key dag :=
let
writeStatus :=
mkWriteStatus@{
data := none;
mayWrite := true;
};
in addWriteAccess
dag
key
(KVSAcquireLockMsg.timestamp lockMsg)
writeStatus};
dagWithEagerReads :=
Set.foldr
addEagerReadAccesses
(ShardLocalState.dagStructure local)
(KVSAcquireLockMsg.eager_read_keys lockMsg);
dagWithAllReads :=
Set.foldr
addLazyReadAccesses
dagWithEagerReads
(KVSAcquireLockMsg.lazy_read_keys lockMsg);
dagWithWillWrites :=
Set.foldr
addWillWriteAccesses
dagWithAllReads
(KVSAcquireLockMsg.will_write_keys lockMsg);
dagWithAllWrites :=
Set.foldr
addMayWriteAccesses
dagWithWillWrites
(KVSAcquireLockMsg.may_write_keys lockMsg);
propagationResult :=
execEagerReads (getEngineIDFromEngineCfg cfg) dagWithAllWrites;
newLocal :=
local@ShardLocalState{dagStructure := fst propagationResult};
newEnv := env@EngineEnv{localState := newLocal};
in some
mkActionEffect@{
env := newEnv;
msgs :=
mkEngineMsg@{
sender := getEngineIDFromEngineCfg (ActionInput.cfg input);
target := KVSAcquireLockMsg.worker lockMsg;
mailbox := some 0;
msg :=
Anoma.MsgShard
(ShardMsgKVSLockAcquired
mkKVSLockAcquiredMsg@{
timestamp := KVSAcquireLockMsg.timestamp lockMsg;
});
}
:: snd propagationResult;
timers := [];
engines := [];
}
| _ := none;
processWriteAction
{KVSKey KVSDatum Executable ProgramState}
{{Ord KVSKey}}
(input : ShardActionInput KVSKey KVSDatum Executable)
: Option (ShardActionEffect KVSKey KVSDatum Executable ProgramState) :=
let
cfg := ActionInput.cfg input;
env := ActionInput.env input;
local := EngineEnv.localState env;
trigger := ActionInput.trigger input;
in case getEngineMsgFromTimestampedTrigger trigger of
| some mkEngineMsg@{msg := Anoma.MsgShard (ShardMsgKVSWrite writeMsg)} :=
let
dag := ShardLocalState.dagStructure local;
key := KVSWriteMsg.key writeMsg;
timestamp := KVSWriteMsg.timestamp writeMsg;
in case
replaceWriteAccess dag key timestamp (KVSWriteMsg.datum writeMsg)
of {
| some updatedDag :=
let
propagationResult :=
execEagerReads (getEngineIDFromEngineCfg cfg) updatedDag;
newLocal :=
local@ShardLocalState{dagStructure := fst
propagationResult};
newEnv := env@EngineEnv{localState := newLocal};
readMsgs := snd propagationResult;
in some
mkActionEffect@{
env := newEnv;
msgs := readMsgs;
timers := [];
engines := [];
}
| none := none
}
| _ := none;
processReadRequestAction
{KVSKey KVSDatum Executable ProgramState}
{{Ord KVSKey}}
(input : ShardActionInput KVSKey KVSDatum Executable)
: Option (ShardActionEffect KVSKey KVSDatum Executable ProgramState) :=
let
cfg := ActionInput.cfg input;
env := ActionInput.env input;
local := EngineEnv.localState env;
trigger := ActionInput.trigger input;
in case getEngineMsgFromTimestampedTrigger trigger of
| some mkEngineMsg@{
sender := sender;
msg := Anoma.MsgShard (ShardMsgKVSReadRequest readReqMsg);
} :=
let
dag := ShardLocalState.dagStructure local;
key := KVSReadRequestMsg.key readReqMsg;
timestamp := KVSReadRequestMsg.timestamp readReqMsg;
actual := KVSReadRequestMsg.actual readReqMsg;
in case timestamp >= DAGStructure.heardAllReads dag of {
| false := none
| true :=
case replaceReadAccess dag key timestamp of {
| none := none
| some updatedDag :=
case actual of {
| false :=
let
newLocal :=
local@ShardLocalState{dagStructure := updatedDag};
newEnv := env@EngineEnv{localState := newLocal};
in some
mkActionEffect@{
env := newEnv;
msgs := [];
timers := [];
engines := [];
}
| true :=
case findMostRecentWrite updatedDag key timestamp of {
| none := none
| some data :=
let
readMsg :=
mkEngineMsg@{
sender := getEngineIDFromEngineCfg cfg;
target := sender;
mailbox := some 0;
msg :=
Anoma.MsgShard
(ShardMsgKVSRead
mkKVSReadMsg@{
timestamp := timestamp;
key := key;
data := data;
});
};
newLocal :=
local@ShardLocalState{dagStructure := updatedDag};
newEnv := env@EngineEnv{localState := newLocal};
in some
mkActionEffect@{
env := newEnv;
msgs := [readMsg];
timers := [];
engines := [];
}
}
}
}
}
| _ := none;
updateSeenAllAction
{KVSKey KVSDatum Executable ProgramState}
{{Ord KVSKey}}
(input : ShardActionInput KVSKey KVSDatum Executable)
: Option (ShardActionEffect KVSKey KVSDatum Executable ProgramState) :=
let
cfg := ActionInput.cfg input;
env := ActionInput.env input;
local := EngineEnv.localState env;
trigger := ActionInput.trigger input;
in case getEngineMsgFromTimestampedTrigger trigger of
| some mkEngineMsg@{
msg := Anoma.MsgShard (ShardMsgUpdateSeenAll updateMsg);
} :=
let
oldDag := ShardLocalState.dagStructure local;
newDag :=
case UpdateSeenAllMsg.write updateMsg of
| true :=
oldDag@DAGStructure{heardAllWrites := UpdateSeenAllMsg.timestamp
updateMsg}
| false :=
oldDag@DAGStructure{heardAllReads := UpdateSeenAllMsg.timestamp
updateMsg};
propagationResult :=
case UpdateSeenAllMsg.write updateMsg of
| true := execEagerReads (getEngineIDFromEngineCfg cfg) newDag
| false := mkPair newDag [];
newLocal :=
local@ShardLocalState{dagStructure := fst propagationResult};
newEnv := env@EngineEnv{localState := newLocal};
readMsgs := snd propagationResult;
in some
mkActionEffect@{
env := newEnv;
msgs := readMsgs;
timers := [];
engines := [];
}
| _ := none;
acquireLockActionLabel
{KVSKey KVSDatum Executable ProgramState}
{{Ord KVSKey}}
: ShardActionExec KVSKey KVSDatum Executable ProgramState :=
Seq [acquireLockAction];
processWriteActionLabel
{KVSKey KVSDatum Executable ProgramState}
{{Ord KVSKey}}
: ShardActionExec KVSKey KVSDatum Executable ProgramState :=
Seq [processWriteAction];
processReadRequestActionLabel
{KVSKey KVSDatum Executable ProgramState}
{{Ord KVSKey}}
: ShardActionExec KVSKey KVSDatum Executable ProgramState :=
Seq [processReadRequestAction];
updateSeenAllActionLabel
{KVSKey KVSDatum Executable ProgramState}
{{Ord KVSKey}}
: ShardActionExec KVSKey KVSDatum Executable ProgramState :=
Seq [updateSeenAllAction];
ShardGuard (KVSKey KVSDatum Executable ProgramState : Type) : Type :=
Guard
ShardCfg
(ShardLocalState KVSKey KVSDatum)
ShardMailboxState
ShardTimerHandle
ShardActionArguments
(Anoma.PreMsg KVSKey KVSDatum Executable)
(Anoma.PreCfg KVSKey KVSDatum Executable)
(Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);
ShardGuardOutput (KVSKey KVSDatum Executable ProgramState : Type) : Type :=
GuardOutput
ShardCfg
(ShardLocalState KVSKey KVSDatum)
ShardMailboxState
ShardTimerHandle
ShardActionArguments
(Anoma.PreMsg KVSKey KVSDatum Executable)
(Anoma.PreCfg KVSKey KVSDatum Executable)
(Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);
ShardGuardEval (KVSKey KVSDatum Executable ProgramState : Type) : Type :=
GuardEval
ShardCfg
(ShardLocalState KVSKey KVSDatum)
ShardMailboxState
ShardTimerHandle
ShardActionArguments
(Anoma.PreMsg KVSKey KVSDatum Executable)
(Anoma.PreCfg KVSKey KVSDatum Executable)
(Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);
acquireLockGuard
{KVSKey KVSDatum Executable ProgramState}
{{Ord KVSKey}}
(trigger : TimestampedTrigger
ShardTimerHandle
(Anoma.PreMsg KVSKey KVSDatum Executable))
(cfg : EngineCfg ShardCfg)
(env : ShardEnv KVSKey KVSDatum)
: Option (ShardGuardOutput KVSKey KVSDatum Executable ProgramState) :=
case getEngineMsgFromTimestampedTrigger trigger of
| some mkEngineMsg@{msg := Anoma.MsgShard (ShardMsgKVSAcquireLock _)} :=
some
mkGuardOutput@{
action := acquireLockActionLabel;
args := [];
}
| _ := none;
processWriteGuard
{KVSKey KVSDatum Executable ProgramState}
{{Ord KVSKey}}
(trigger : TimestampedTrigger
ShardTimerHandle
(Anoma.PreMsg KVSKey KVSDatum Executable))
(cfg : EngineCfg ShardCfg)
(env : ShardEnv KVSKey KVSDatum)
: Option (ShardGuardOutput KVSKey KVSDatum Executable ProgramState) :=
case getEngineMsgFromTimestampedTrigger trigger of
| some mkEngineMsg@{msg := Anoma.MsgShard (ShardMsgKVSWrite _)} :=
some
mkGuardOutput@{
action := processWriteActionLabel;
args := [];
}
| _ := none;
processReadRequestGuard
{KVSKey KVSDatum Executable ProgramState}
{{Ord KVSKey}}
(trigger : TimestampedTrigger
ShardTimerHandle
(Anoma.PreMsg KVSKey KVSDatum Executable))
(cfg : EngineCfg ShardCfg)
(env : ShardEnv KVSKey KVSDatum)
: Option (ShardGuardOutput KVSKey KVSDatum Executable ProgramState) :=
case getEngineMsgFromTimestampedTrigger trigger of
| some mkEngineMsg@{msg := Anoma.MsgShard (ShardMsgKVSReadRequest _)} :=
some
mkGuardOutput@{
action := processReadRequestActionLabel;
args := [];
}
| _ := none;
updateSeenAllGuard
{KVSKey KVSDatum Executable ProgramState}
{{Ord KVSKey}}
(trigger : TimestampedTrigger
ShardTimerHandle
(Anoma.PreMsg KVSKey KVSDatum Executable))
(cfg : EngineCfg ShardCfg)
(env : ShardEnv KVSKey KVSDatum)
: Option (ShardGuardOutput KVSKey KVSDatum Executable ProgramState) :=
case getEngineMsgFromTimestampedTrigger trigger of
| some mkEngineMsg@{msg := Anoma.MsgShard (ShardMsgUpdateSeenAll _)} :=
some
mkGuardOutput@{
action := updateSeenAllActionLabel;
args := [];
}
| _ := none;
ShardBehaviour (KVSKey KVSDatum Executable ProgramState : Type) : Type :=
EngineBehaviour
ShardCfg
(ShardLocalState KVSKey KVSDatum)
ShardMailboxState
ShardTimerHandle
ShardActionArguments
(Anoma.PreMsg KVSKey KVSDatum Executable)
(Anoma.PreCfg KVSKey KVSDatum Executable)
(Anoma.PreEnv KVSKey KVSDatum Executable ProgramState);
shardBehaviour : ShardBehaviour String String ByteString String :=
mkEngineBehaviour@{
guards :=
First
[
acquireLockGuard;
processWriteGuard;
processReadRequestGuard;
updateSeenAllGuard;
];
};