Skip to content
Juvix imports

module arch.node.engines.shard;

import prelude open;
import arch.node.types.engine open;
import arch.node.engines.shard_config open public;
import arch.node.engines.shard_messages open public;
import arch.node.engines.shard_environment open public;
import arch.node.engines.shard_behaviour open public;
import arch.node.types.anoma as Anoma open;

open shard_config_example;
open shard_environment_example;

Shard

Purpose

The Shards together store and update the state of the replicated state machine and together are a component of the Execution Engines. They provide Executors with input data and update the state according to the results of Executors' computations.

Different shards may be on different physical machines.

Each shard is responsible for a set of KVSKeys and these sets are disjoint for different shards. For each of the keys that a shard is responsible for, the shard maintains a (partially-ordered) timeline of Timestamps of transaction candidates that may read or write to keys. Shards also keep a history of data written by each TransactionCandidate to each key. This is multi-version concurrent storage.

State (of the shard)

For each Worker Engine, the Shard maintains:

  • A Timestamp, such that all write lock requests[^1] for transaction candidates with earlier timestamps that this worker curates have already been received. Together, these timestamps represent heardAllWrites.
  • Another Timestamp, before which the Shard will receive no further read requests from this Worker Engine. For WorkerEngine, this cannot be after the corresponding write Timestamps. We will also maintain these from each Read Backend worker. Together, these represent heardAllReads.

For each key (assigned to this Shard):

  • A set of time‍stamps of known transaction candidates that read and/or write that key, and for each, some subset of:
    • A value written to that key at that time‍stamps by that TransactionCandidate using a KVSWrite message
    • A marker indicating that this TransactionCandidate may (or will) write to this key, but this Shard has not yet received a corresponding KVSWrite message.
    • A marker indicating that this TransactionCandidate will read this value, and an ExternalIdentity corresponding to the relevant Executor. This marker is only stored so long as the Shard doesn't know the value. When this value is determined, this Shard must remove this marker and send a KVSRead message to the Executor.
    • A marker indicating that this TransactionCandidate may read this value, and an ExternalIdentity corresponding to the relevant Executor. If the Executor sends a KVSReadRequest for this key, the Shard updates this marker to a "will read" marker.
  • If a Timestamp has no corresponding markers or values written, we don't have to store it.
  • If a value written is before heardAllReads, and there are no pending reads or writes before it, then we can remove all earlier values written.

Additionally, the Shard maintains:

  • A complete copy of the DAG structure produced by the Mempool Engines. This includes a set of all NarwhalBlockHeaders. For Timestamps before SeenAllRead, if there are no keys with a pending read or write before that Timestamp, we can delete old DAG structure.
  • A complete copy of the sequence of Anchors chosen by Consensus Engine. This is a sequence of consensus decisions. For Timestamps before heardAllReads, if there are no keys with a pending read or write before that Timestamp, we can delete old anchors.

Shard Optimizations

We want to execute each TransactionCandidate (evaluate the executor function in order to compute the data written) using the idea of serializability: each TransactionCandidate's reads and writes should be as if they were executed in the total order determined by the mempool (and consensus, from V2 onward). In fact, the simplest correct implementation amounts to executing all transaction candidates sequentially, repeatedly applying the executor function in a loop. However, we want to compute concurrently as possible, for minimum latency. We do this using a set of optimizations.

Optimization: Per-Key Ordering

Per-key ordering (see web version for animation)

Mempool and consensus provides ordering information for the time‍stamps. Thus, relative to each key, transaction candidates can be totally ordered by the Happens Before relationship. With a total ordering of transaction candidates, Shards can send read information (KVSReads) to Executors as soon as the previous TransactionCandidate is complete. However, transaction candidates that access on disjoint sets of keys can be run in parallel. In the diagram above, for example, transaction candidates c and d can run concurrently, as can transaction candidates e and f, and transaction candidates h and j.

Optimization: Order With Respect To Writes

Order with respect to writes (see web version for animation)

In fact, Shards can send read information to an Executor as soon as the previous write's TransactionCandidate has completed (sent a KVSWrite). All Shards really need to keep track of is a total order of writes, and how each read is ordered with respect to writes (which write it precedes and which write preceded it). As soon as the preceding write is complete (the Shard has received a KVSWrite), the reads that depend on it can run concurrently. There are no "read/read" conflicts. In the diagram above, for example, transaction candidates a and b can run concurrently.

Optimization: Only Wait to Read

Only wait to read (see web version for animation)

Because we store each version written (multi-version concurrent storage), we do not have to execute writes in order. A Shard does not have to wait to write a later data version to a key just because previous reads have not finished executing yet. In the diagram above, for example, only green happens-before arrows require waiting. transaction candidates a, b, c, and j can all be executed concurrently, as can transaction candidates d, e, and i.

Optimization: Execute With Partial Order

Some mempools, including Narwhal, can provide partial order information on transactions even before consensus has determined a total order. This allows the Ordering Machine to execute some transactions before a total ordering is known. In general, for a given key, a shard can send read information to an executor when it knows precisely which write happens most recently before the read, and that write has executed.

heardAllWrites

In order to know which write happens most recently before a given read, the Shard must know that no further writes will be added to the timeline before the read. Mempool and consensus should communicate a lower bound on timestamps to the Shards, called heardAllWrites. The Shard is guaranteed to never receive another KVSAcquireLock with a write operation and Timestamp before heardAllWrites. In general, a Shard cannot send a KVSRead for a Timestamp unless the Timestamp is before heardAllWrites. heardAllWrites consists of a TxFingerprint from each worker engine such that the worker engine is certain (based on KVSLockAcquireds) that the Shard has already seen all the KVSAcquireLocks it will ever send at or before that TxFingerprint.

This can be on a per-key basis or simply a global lower bound. Occasionally, heardAllWrites should be updated with later timestamps. Each round of consensus should produce a lower bound for heardAllWrites, but the mempool may already have sent better bounds. Each Shard must keep track of heardAllWrites on each key's multi-version timeline.

Transactions (like transaction j in the diagram below) containing only write operations can execute with a timestamp after heardAllWrites, but this simply means calculating the data they will write. Since that does not depend on state, this can of course be done at any time.

heardAllReads

We want to allow Typhon to eventually garbage-collect old state. mempool and consensus should communicate a lower bound timestamp to the execution engine, called heardAllReads, before which there will be no more read transactions send to the execution engine. Occasionally, heardAllReads should be updated with later timestamps. Each Shard must keep track of heardAllReads on each key's multi-version timeline, so it can garbage-collect old values.

Execute with partial order (see web version for animation)

In the example above, our happens-before arrows have been replaced with may-happen-before arrows, representing partial ordering information from the mempool. Note that not all transactions can be executed with this partial order information.

Conflicts

There are three types of conflicts that can prevent a transaction from being executable without more ordering information.

  • Write/Write Conflicts occur when a shard cannot identify the most recent write before a given read. In the diagram above, transaction e cannot execute because it is not clear whether transaction b or transaction c wrote most recently to the yellow key.
  • Read/Write Conflicts occur when shard cannot identify whether a read operation occurs before or after a write, so it is not clear if it should read the value from that write or from a previous write. In the diagram above, transaction g cannot execute because it is not clear whether it would read the data written to the blue key by transaction d or transaction i.
  • Transitive Conflicts occur when a shard cannot get the data for a read because the relevant write is conflicted. In the diagram above, transaction h cannot execute because it cannot read the data written to the yellow key by transaction g, since transaction g is conflicted.

As the mempool and consensus provide the execution engine with more and more ordering information, and the partial order of timestamps is refined, all conflicts eventually resolve. In the diagram above, suppose consensus orders transaction g before transaction i. The Read/Write conflict is resolved: transaction g reads the data transaction d writes to the blue key. Then the transitive conflict is also resolved: transaction h will be able to execute. -->

Optimization: Client Reads as Read-Only Transactions

Client reads as read-only transactions (see web version for animation)

With the above optimizations, transactions containing only read operations do not affect other transactions (or scheduling) at all. Therefore, they can bypass mempool and consensus altogether. Clients can simply send read-only transactions directly to the execution engine (with a label and a timestamp), and if the timestamp is after heardAllReads, the execution engine can simply place the transaction in the timeline of the relevant shards and execute it when possible. In the diagram above, transaction f is read-only.

If client reads produce signed responses, then signed responses from a weak quorum of validators would form a light client proof.

Components

Type

ShardEngine : Type :=
Engine
ShardCfg
ShardLocalState
ShardMailboxState
ShardTimerHandle
ShardActionArguments
Anoma.Msg
Anoma.Cfg
Anoma.Env;

Example of a shard engine

exampleShardEngine : ShardEngine :=
mkEngine@{
cfg := shardCfg;
env := shardEnv;
behaviour := shardBehaviour;
};

where shardCfg is defined as follows:

shardCfg : EngineCfg ShardCfg :=
mkEngineCfg@{
node := Curve25519PubKey "0xabcd1234";
name := "shard";
cfg := mkShardCfg;
};

where shardEnv is defined as follows:

shardEnv : ShardEnv :=
mkEngineEnv@{
localState :=
mkShardLocalState@{
dagStructure :=
mkDAGStructure@{
keyAccesses := Map.empty;
heardAllReads := 0;
heardAllWrites := 0;
};
anchors := [];
};
mailboxCluster := Map.empty;
acquaintances := Set.empty;
timers := [];
};

and shardBehaviour is defined as follows:

shardBehaviour : ShardBehaviour :=
mkEngineBehaviour@{
guards :=
First
[
acquireLockGuard;
processWriteGuard;
processReadRequestGuard;
updateSeenAllGuard;
];
};