Skip to content
Juvix imports

module arch.node.engines.shard_behaviour;

import arch.node.engines.shard_messages open;
import arch.node.engines.shard_config open;
import arch.node.engines.shard_environment open;
import Stdlib.Data.Nat open;
import Stdlib.Data.List as List;
import prelude open;
import arch.node.types.basics open;
import arch.node.types.identities open;
import arch.node.types.messages open;
import arch.node.types.engine open;
import arch.node.types.anoma as Anoma open;

Shard Behaviour

Action arguments

ShardActionArgument

type ShardActionArgument := ShardActionArgumentReplyTo EngineID;

ShardActionArguments

ShardActionArguments : Type := List ShardActionArgument;

Helper Functions

maximumBy {A : Type} (f : A -> Nat) (lst : List A) : Option A :=
let
maxHelper :=
\{curr acc :=
case acc of
| none := some curr
| some maxVal :=
case f curr > f maxVal of
| true := some curr
| false := some maxVal};
in foldr maxHelper none lst;
findMostRecentWrite
(dag : DAGStructure)
(key : KVSKey)
(timestamp : TxFingerprint)
: Option KVSDatum :=
case Map.lookup key (DAGStructure.keyAccesses dag) of
| none := none
| some timestampMap :=
let
validEntries :=
List.filter
\{entry :=
fst entry < timestamp
&& case KeyAccess.writeStatus (snd entry) of {
| some writeStatus :=
not
(WriteStatus.mayWrite writeStatus
&& isNone (WriteStatus.data writeStatus))
| none := false
}}
(Map.toList timestampMap);
in case maximumBy \{entry := fst entry} validEntries of
| some (mkPair _ access) :=
case KeyAccess.writeStatus access of {
| some writeStatus := WriteStatus.data writeStatus
| none := none
}
| none := none;

-- add read without prior lock

addReadAccess
(dag : DAGStructure)
(key : KVSKey)
(timestamp : TxFingerprint)
(readStatus : ReadStatus)
: DAGStructure :=
let
keyMap :=
case Map.lookup key (DAGStructure.keyAccesses dag) of
| none := Map.empty
| some m := m;
existingAccess :=
case Map.lookup timestamp keyMap of
| none :=
mkKeyAccess@{
readStatus := none;
writeStatus := none;
}
| some access := access;
newAccess := existingAccess@KeyAccess{readStatus := some readStatus};
newKeyMap := Map.insert timestamp newAccess keyMap;
newKeyAccesses := Map.insert key newKeyMap (DAGStructure.keyAccesses dag);
in dag@DAGStructure{keyAccesses := newKeyAccesses};

-- add write without prior lock

addWriteAccess
(dag : DAGStructure)
(key : KVSKey)
(timestamp : TxFingerprint)
(writeStatus : WriteStatus)
: DAGStructure :=
let
keyMap :=
case Map.lookup key (DAGStructure.keyAccesses dag) of
| none := Map.empty
| some m := m;
existingAccess :=
case Map.lookup timestamp keyMap of
| none :=
mkKeyAccess@{
readStatus := none;
writeStatus := none;
}
| some access := access;
newAccess := existingAccess@KeyAccess{writeStatus := some writeStatus};
newKeyMap := Map.insert timestamp newAccess keyMap;
newKeyAccesses := Map.insert key newKeyMap (DAGStructure.keyAccesses dag);
in dag@DAGStructure{keyAccesses := newKeyAccesses};

-- Replaces if read lock exists

replaceReadAccess
(dag : DAGStructure)
(key : KVSKey)
(timestamp : TxFingerprint)
: Option DAGStructure :=
let
keyMap :=
case Map.lookup key (DAGStructure.keyAccesses dag) of
| none := Map.empty
| some m := m;
access :=
case Map.lookup timestamp keyMap of
| none := none
| some a := some a;
in case access of
| some a :=
case KeyAccess.readStatus a of {
| none := none
| some rs :=
let
updatedReadStatus := rs@ReadStatus{hasBeenRead := true};
updatedAccess :=
a@KeyAccess{readStatus := some updatedReadStatus};
updatedKeyMap := Map.insert timestamp updatedAccess keyMap;
updatedKeyAccesses :=
Map.insert key updatedKeyMap (DAGStructure.keyAccesses dag);
in some dag@DAGStructure{keyAccesses := updatedKeyAccesses}
}
| none := none;

-- Replaces if write lock exists

replaceWriteAccess
(dag : DAGStructure)
(key : KVSKey)
(timestamp : TxFingerprint)
(newData : Option KVSDatum)
: Option DAGStructure :=
let
keyMap :=
case Map.lookup key (DAGStructure.keyAccesses dag) of
| none := Map.empty
| some m := m;
in case Map.lookup timestamp keyMap of
| some a :=
case KeyAccess.writeStatus a of {
| none := none
| some ws :=
case isNone newData && not (WriteStatus.mayWrite ws) of {
| true := none
| false :=
let
data :=
case newData of
| none := WriteStatus.data ws
| some dat := some dat;
updatedAccess :=
a@KeyAccess{writeStatus := some
ws@WriteStatus{data := data}};
updatedKeyMap := Map.insert timestamp updatedAccess keyMap;
updatedKeyAccesses :=
Map.insert
key
updatedKeyMap
(DAGStructure.keyAccesses dag);
in some dag@DAGStructure{keyAccesses := updatedKeyAccesses}
}
}
| none := none;
generateReadMsg
(sender : EngineID)
(key : KVSKey)
(timestamp : TxFingerprint)
(data : KVSDatum)
(executor : EngineID)
: EngineMsg Msg :=
mkEngineMsg@{
sender := sender;
target := executor;
mailbox := some 0;
msg :=
Anoma.MsgShard
(ShardMsgKVSRead
mkKVSReadMsg@{
timestamp := timestamp;
key := key;
data := data;
});
};
execEagerReadsAtTime
(sender : EngineID)
(dag : DAGStructure)
(key : KVSKey)
(timestamp : TxFingerprint)
(access : KeyAccess)
: Option (Pair DAGStructure (EngineMsg Msg)) :=
case KeyAccess.readStatus access of
| some readStatus :=
case
ReadStatus.isEager readStatus && not (ReadStatus.hasBeenRead readStatus)
of {
| true :=
case
timestamp < DAGStructure.heardAllWrites dag
&& timestamp >= DAGStructure.heardAllReads dag
of {
| true :=
case findMostRecentWrite dag key timestamp of {
| some data :=
let
newReadStatus := readStatus@ReadStatus{hasBeenRead := true};
newDag := addReadAccess dag key timestamp newReadStatus;
msg :=
generateReadMsg
sender
key
timestamp
data
(ReadStatus.executor readStatus);
in some (mkPair newDag msg)
| none := none
}
| false := none
}
| false := none
}
| none := none;
execEagerReadsAtKey
(sender : EngineID)
(dag : DAGStructure)
(key : KVSKey)
(timestampMap : Map TxFingerprint KeyAccess)
: Pair DAGStructure (List (EngineMsg Msg)) :=
let
processTimestamp :=
\{k v acc :=
case acc of
mkPair currDag msgs :=
case execEagerReadsAtTime sender currDag key k v of
| some processed := mkPair (fst processed) (snd processed :: msgs)
| none := acc};
in Map.foldr processTimestamp (mkPair dag []) timestampMap;
execEagerReads
(sender : EngineID)
(dag : DAGStructure)
: Pair DAGStructure (List (EngineMsg Msg)) :=
let
processKey :=
\{k v acc :=
case acc of
mkPair currDag msgs :=
let
processed := execEagerReadsAtKey sender currDag k v;
in mkPair (fst processed) (msgs ++ snd processed)};
in Map.foldr processKey (mkPair dag []) (DAGStructure.keyAccesses dag);

Actions

Auxiliary Juvix code

ShardAction

ShardAction : Type :=
Action
ShardCfg
ShardLocalState
ShardMailboxState
ShardTimerHandle
ShardActionArguments
Anoma.Msg
Anoma.Cfg
Anoma.Env;

ShardActionInput

ShardActionInput : Type :=
ActionInput
ShardCfg
ShardLocalState
ShardMailboxState
ShardTimerHandle
ShardActionArguments
Anoma.Msg;

ShardActionEffect

ShardActionEffect : Type :=
ActionEffect
ShardLocalState
ShardMailboxState
ShardTimerHandle
Anoma.Msg
Anoma.Cfg
Anoma.Env;

ShardActionExec

ShardActionExec : Type :=
ActionExec
ShardCfg
ShardLocalState
ShardMailboxState
ShardTimerHandle
ShardActionArguments
Anoma.Msg
Anoma.Cfg
Anoma.Env;

acquireLockAction

Process lock acquisition request and send confirmation.

State update
Update DAG with new read/write accesses.
Messages to be sent
KVSLockAcquired message to worker.
acquireLockAction (input : ShardActionInput) : Option ShardActionEffect :=
let
cfg := ActionInput.cfg input;
env := ActionInput.env input;
local := EngineEnv.localState env;
trigger := ActionInput.trigger input;
in case getEngineMsgFromTimestampedTrigger trigger of
| some mkEngineMsg@{
msg := Anoma.MsgShard (ShardMsgKVSAcquireLock lockMsg);
} :=
let
addEagerReadAccesses :=
\{key dag :=
let
readStatus :=
mkReadStatus@{
hasBeenRead := false;
isEager := true;
executor := KVSAcquireLockMsg.executor lockMsg;
};
in addReadAccess
dag
key
(KVSAcquireLockMsg.timestamp lockMsg)
readStatus};
addLazyReadAccesses :=
\{key dag :=
let
readStatus :=
mkReadStatus@{
hasBeenRead := false;
isEager := false;
executor := KVSAcquireLockMsg.executor lockMsg;
};
in addReadAccess
dag
key
(KVSAcquireLockMsg.timestamp lockMsg)
readStatus};
addWillWriteAccesses :=
\{key dag :=
let
writeStatus :=
mkWriteStatus@{
data := none;
mayWrite := false;
};
in addWriteAccess
dag
key
(KVSAcquireLockMsg.timestamp lockMsg)
writeStatus};
addMayWriteAccesses :=
\{key dag :=
let
writeStatus :=
mkWriteStatus@{
data := none;
mayWrite := true;
};
in addWriteAccess
dag
key
(KVSAcquireLockMsg.timestamp lockMsg)
writeStatus};
dagWithEagerReads :=
Set.foldr
addEagerReadAccesses
(ShardLocalState.dagStructure local)
(KVSAcquireLockMsg.eager_read_keys lockMsg);
dagWithAllReads :=
Set.foldr
addLazyReadAccesses
dagWithEagerReads
(KVSAcquireLockMsg.lazy_read_keys lockMsg);
dagWithWillWrites :=
Set.foldr
addWillWriteAccesses
dagWithAllReads
(KVSAcquireLockMsg.will_write_keys lockMsg);
dagWithAllWrites :=
Set.foldr
addMayWriteAccesses
dagWithWillWrites
(KVSAcquireLockMsg.may_write_keys lockMsg);
propagationResult :=
execEagerReads (getEngineIDFromEngineCfg cfg) dagWithAllWrites;
newLocal :=
local@ShardLocalState{dagStructure := fst propagationResult};
newEnv := env@EngineEnv{localState := newLocal};
in some
mkActionEffect@{
env := newEnv;
msgs :=
mkEngineMsg@{
sender := getEngineIDFromEngineCfg (ActionInput.cfg input);
target := KVSAcquireLockMsg.worker lockMsg;
mailbox := some 0;
msg :=
Anoma.MsgShard
(ShardMsgKVSLockAcquired
mkKVSLockAcquiredMsg@{
timestamp := KVSAcquireLockMsg.timestamp lockMsg;
});
}
:: snd propagationResult;
timers := [];
engines := [];
}
| _ := none;

processWriteAction

Process write request and potentially trigger eager reads.

State update
Update DAG with write data and trigger eager reads.
Messages to be sent
KVSRead messages if eligible eager reads are found.
processWriteAction (input : ShardActionInput) : Option ShardActionEffect :=
let
cfg := ActionInput.cfg input;
env := ActionInput.env input;
local := EngineEnv.localState env;
trigger := ActionInput.trigger input;
in case getEngineMsgFromTimestampedTrigger trigger of
| some mkEngineMsg@{msg := Anoma.MsgShard (ShardMsgKVSWrite writeMsg)} :=
let
dag := ShardLocalState.dagStructure local;
key := KVSWriteMsg.key writeMsg;
timestamp := KVSWriteMsg.timestamp writeMsg;
in case
replaceWriteAccess dag key timestamp (KVSWriteMsg.datum writeMsg)
of {
| some updatedDag :=
let
propagationResult :=
execEagerReads (getEngineIDFromEngineCfg cfg) updatedDag;
newLocal :=
local@ShardLocalState{dagStructure := fst
propagationResult};
newEnv := env@EngineEnv{localState := newLocal};
readMsgs := snd propagationResult;
in some
mkActionEffect@{
env := newEnv;
msgs := readMsgs;
timers := [];
engines := [];
}
| none := none
}
| _ := none;

processReadRequestAction

Process read request and potentially send read response.

State update
Update DAG with read request status.
Messages to be sent
KVSRead message if read data is available.
processReadRequestAction
(input : ShardActionInput) : Option ShardActionEffect :=
let
cfg := ActionInput.cfg input;
env := ActionInput.env input;
local := EngineEnv.localState env;
trigger := ActionInput.trigger input;
in case getEngineMsgFromTimestampedTrigger trigger of
| some mkEngineMsg@{
sender := sender;
msg := Anoma.MsgShard (ShardMsgKVSReadRequest readReqMsg);
} :=
let
dag := ShardLocalState.dagStructure local;
key := KVSReadRequestMsg.key readReqMsg;
timestamp := KVSReadRequestMsg.timestamp readReqMsg;
actual := KVSReadRequestMsg.actual readReqMsg;
in case timestamp >= DAGStructure.heardAllReads dag of {
| false := none
| true :=
case replaceReadAccess dag key timestamp of {
| none := none
| some updatedDag :=
case actual of {
| false :=
let
newLocal :=
local@ShardLocalState{dagStructure := updatedDag};
newEnv := env@EngineEnv{localState := newLocal};
in some
mkActionEffect@{
env := newEnv;
msgs := [];
timers := [];
engines := [];
}
| true :=
case findMostRecentWrite updatedDag key timestamp of {
| none := none
| some data :=
let
readMsg :=
mkEngineMsg@{
sender := getEngineIDFromEngineCfg cfg;
target := sender;
mailbox := some 0;
msg :=
Anoma.MsgShard
(ShardMsgKVSRead
mkKVSReadMsg@{
timestamp := timestamp;
key := key;
data := data;
});
};
newLocal :=
local@ShardLocalState{dagStructure := updatedDag};
newEnv := env@EngineEnv{localState := newLocal};
in some
mkActionEffect@{
env := newEnv;
msgs := [readMsg];
timers := [];
engines := [];
}
}
}
}
}
| _ := none;

updateSeenAllAction

Process seen-all update and potentially trigger eager reads.

State update
Update DAG barriers and trigger eager reads.
Messages to be sent
KVSRead messages if eligible eager reads are found.
updateSeenAllAction (input : ShardActionInput) : Option ShardActionEffect :=
let
cfg := ActionInput.cfg input;
env := ActionInput.env input;
local := EngineEnv.localState env;
trigger := ActionInput.trigger input;
in case getEngineMsgFromTimestampedTrigger trigger of
| some mkEngineMsg@{
msg := Anoma.MsgShard (ShardMsgUpdateSeenAll updateMsg);
} :=
let
oldDag := ShardLocalState.dagStructure local;
newDag :=
case UpdateSeenAllMsg.write updateMsg of
| true :=
oldDag@DAGStructure{heardAllWrites := UpdateSeenAllMsg.timestamp
updateMsg}
| false :=
oldDag@DAGStructure{heardAllReads := UpdateSeenAllMsg.timestamp
updateMsg};
propagationResult :=
case UpdateSeenAllMsg.write updateMsg of
| true := execEagerReads (getEngineIDFromEngineCfg cfg) newDag
| false := mkPair newDag [];
newLocal :=
local@ShardLocalState{dagStructure := fst propagationResult};
newEnv := env@EngineEnv{localState := newLocal};
readMsgs := snd propagationResult;
in some
mkActionEffect@{
env := newEnv;
msgs := readMsgs;
timers := [];
engines := [];
}
| _ := none;

Action Labels

acquireLockActionLabel

acquireLockActionLabel : ShardActionExec := Seq [acquireLockAction];

processWriteActionLabel

processWriteActionLabel : ShardActionExec := Seq [processWriteAction];

processReadRequestActionLabel

processReadRequestActionLabel : ShardActionExec :=
Seq [processReadRequestAction];

updateSeenAllActionLabel

updateSeenAllActionLabel : ShardActionExec := Seq [updateSeenAllAction];

Guards

Auxiliary Juvix code

ShardGuard

ShardGuard : Type :=
Guard
ShardCfg
ShardLocalState
ShardMailboxState
ShardTimerHandle
ShardActionArguments
Anoma.Msg
Anoma.Cfg
Anoma.Env;

ShardGuardOutput

ShardGuardOutput : Type :=
GuardOutput
ShardCfg
ShardLocalState
ShardMailboxState
ShardTimerHandle
ShardActionArguments
Anoma.Msg
Anoma.Cfg
Anoma.Env;

ShardGuardEval

ShardGuardEval : Type :=
GuardEval
ShardCfg
ShardLocalState
ShardMailboxState
ShardTimerHandle
ShardActionArguments
Anoma.Msg
Anoma.Cfg
Anoma.Env;

acquireLockGuard

Condition
Message type is ShardMsgKVSAcquireLock.
acquireLockGuard
(trigger : TimestampedTrigger ShardTimerHandle Anoma.Msg)
(cfg : EngineCfg ShardCfg)
(env : ShardEnv)
: Option ShardGuardOutput :=
case getEngineMsgFromTimestampedTrigger trigger of
| some mkEngineMsg@{msg := Anoma.MsgShard (ShardMsgKVSAcquireLock _)} :=
some
mkGuardOutput@{
action := acquireLockActionLabel;
args := [];
}
| _ := none;

processWriteGuard

Condition
Message type is ShardMsgKVSWrite.
processWriteGuard
(trigger : TimestampedTrigger ShardTimerHandle Anoma.Msg)
(cfg : EngineCfg ShardCfg)
(env : ShardEnv)
: Option ShardGuardOutput :=
case getEngineMsgFromTimestampedTrigger trigger of
| some mkEngineMsg@{msg := Anoma.MsgShard (ShardMsgKVSWrite _)} :=
some
mkGuardOutput@{
action := processWriteActionLabel;
args := [];
}
| _ := none;

processReadRequestGuard

Condition
Message type is ShardMsgKVSReadRequest.
processReadRequestGuard
(trigger : TimestampedTrigger ShardTimerHandle Anoma.Msg)
(cfg : EngineCfg ShardCfg)
(env : ShardEnv)
: Option ShardGuardOutput :=
case getEngineMsgFromTimestampedTrigger trigger of
| some mkEngineMsg@{msg := Anoma.MsgShard (ShardMsgKVSReadRequest _)} :=
some
mkGuardOutput@{
action := processReadRequestActionLabel;
args := [];
}
| _ := none;

updateSeenAllGuard

Condition
Message type is ShardMsgUpdateSeenAll.
updateSeenAllGuard
(trigger : TimestampedTrigger ShardTimerHandle Anoma.Msg)
(cfg : EngineCfg ShardCfg)
(env : ShardEnv)
: Option ShardGuardOutput :=
case getEngineMsgFromTimestampedTrigger trigger of
| some mkEngineMsg@{msg := Anoma.MsgShard (ShardMsgUpdateSeenAll _)} :=
some
mkGuardOutput@{
action := updateSeenAllActionLabel;
args := [];
}
| _ := none;

The Shard Behaviour

ShardBehaviour

ShardBehaviour : Type :=
EngineBehaviour
ShardCfg
ShardLocalState
ShardMailboxState
ShardTimerHandle
ShardActionArguments
Anoma.Msg
Anoma.Cfg
Anoma.Env;

Instantiation

shardBehaviour : ShardBehaviour :=
mkEngineBehaviour@{
guards :=
First
[
acquireLockGuard;
processWriteGuard;
processReadRequestGuard;
updateSeenAllGuard;
];
};

Shard Action Flowchart

acquireLock Flowchart

flowchart TD
  subgraph C[Conditions]
    CMsg>ShardMsgKVSAcquireLock]
  end

  G(acquireLockGuard)
  A(acquireLockAction)

  C --> G -- *acquireLockActionLabel* --> A --> E

  subgraph E[Effects]
    EEnv[(Update DAG with locks)]
    EMsg>KVSLockAcquired]
  end
acquireLock flowchart

processWrite Flowchart

flowchart TD
  subgraph C[Conditions]
    CMsg>ShardMsgKVSWrite]
  end

  G(processWriteGuard)
  A(processWriteAction)

  C --> G -- *processWriteActionLabel* --> A --> E

  subgraph E[Effects]
    EEnv[(Update DAG)]
    EMsg>KVSRead messages]
  end
processWrite flowchart

processReadRequest Flowchart

flowchart TD
  subgraph C[Conditions]
    CMsg>ShardMsgKVSReadRequest]
  end

  G(processReadRequestGuard)
  A(processReadRequestAction)

  C --> G -- *processReadRequestActionLabel* --> A --> E

  subgraph E[Effects]
    EEnv[(Update read status)]
    EMsg>KVSRead message]
  end
processReadRequest flowchart

updateSeenAll Flowchart

flowchart TD
  subgraph C[Conditions]
    CMsg>ShardMsgUpdateSeenAll]
  end

  G(updateSeenAllGuard)
  A(updateSeenAllAction)

  C --> G -- *updateSeenAllActionLabel* --> A --> E

  subgraph E[Effects]
    EEnv[(Update barriers)]
    EMsg>KVSRead messages]
  end
updateSeenAll flowchart