Execution¶
Purpose
Given a total order of transaction candidates (from the mempool),1 the execution engine updates "the" state of the replicated state machine by executing them. The execution engine uses state dependency labels to calculate updates concurrently when possible. Furthermore, partial order information can suffice to start concurrent computation, even before mempool and consensus determine a total order. Transaction candidates that include side effects (such as sending messages to a user or solver) perform those effects from the Executor process within the Execution Engines.
In V1, following each state update, the mempool's Worker Engine is informed, so that it can garbage collect information and log results.
Background
The ordering machine maintains a replicated state machine, which is a core concept of distributed systems. The architecture of the ordering machine is an adaptation of Calvin: Fast Distributed Transactions for Partitioned Database Systems, to Narwhal's scale-out architecture. The execution engine exploits concurrency of transactions, using the idea of serializability (a key concept of Transaction processing systems and databases). Given that we follow a pre-determined total order of transactions, we have what has been called deterministic execution scheduling. In V1, the mempool alone suffices to determine the total order (without the need for any Consensus Engine).
Scope
The Execution Engine group is responsible for updating and maintaining the state of the state machine. It relies on the mempool to store and order transaction candidates.4
In specifying the Execution Engines, we abstract over the contents of the state machine, or exactly what has to be done to update it given a TransactionCandidate. The state of Anoma's state machine will contain Resources, Nullifiers, and Commitments, and executing each Transaction Candidate will require, among other things, checking the respective Resource Logics of the involved resources. Roughly, the state and precise "code" of transactions are essentially a type parameter.2
For V1, we assume that there is a single validator that is maintaining a single state machine. V1 does not include any kind of garbage collection or deletion of old state.
Each Validator maintains its own Execution Engine group. These are complete replicas of the state machine, with complete replication of Transaction Candidate execution. Future optimizations may allow for distributed proofs of various kinds allowing some validators to skip some of the execution.
We do, however, allow for checkpointing: By performing a full-state read from a weak quorum of validators, a validator can learn the complete state of the state machine at a particular timestamp, and then compute all the Transaction Candidates thereafter. It should not have to compute from genesis. No one should need to store a complete history of all Transaction Candidates or past state values.
Functionality¶
High Level Protocols¶
The main functionality of the Execution engine is serializable TransactionCandidate execution. We want to execute each TransactionCandidate, i.e., read the necessary values from storage, evaluate the executor function, and write the computed results to state, concurrently but serializably: each Transaction Candidate's reads and writes should be as if the TransactionCandidates were executed in the total order chosen by the mempool.5 As this order is fixed, we can use deterministic execution scheduling.
We can imagine the simplest system as executing each TransactionCandidate, after they are ordered, sequentially in the fixed order. However, we want to exploit concurrency of transactions as much as possible to minimize latency. Several optimizations improve over sequential execution. We can increase concurrency of transaction by partitioning state as key/value pairs: we can execute TransactionCandidates that only access disjoint sets of keys concurrently (or even in parallel).
Responsibilities¶
State¶
The core purpose of the Execution Engine group is updating and maintaining the state of the RSM. As an abstraction, we assume state is stored as mutable Data (unlimited size blobs of binary), each of which is accessed using an immutable Key. If we want to mutate a Key associated with specific Data, that means that we have to delete the Data associated with the old Key, and write a copy of the data to the new Key. Keys that have never had Data written to them are mapped to an empty value. Thus, we can create, read, update, and delete Keys. These are the only primitives we use.6
For V2, we do not assume any key structure, so TransactionLabels will have an explicit list of Keys they can use, rather than something more clever.3
Transaction Candidate Labels¶
TransactionCandidates are labeled with a set of keys they can read, and a set of keys they can write. Writing \(\wp\mathit{Keys}\) for the powerset of all keys, we have the following:
\(\mathsf{read–keys}: \mathit{Tx\_Candidates} \rightarrow \wp\mathit{Keys}\)
\(\mathsf{write–keys}: \mathit{Tx\_Candidates} \rightarrow \wp\mathit{Keys}\)
The Execution Engines must not allow a TransactionCandidate to affect any key outside of its write set, aborting the transaction with an error message. Moreover, each TransactionCandidate's effects must be a deterministic function of the TransactionCandidate and the values of the keys in its read set in RSM state.
Executor Function¶
In order to define a state machine, we assume a fixed Executor Function:
\(\mathsf{executor\_function}\colon \left(\mathit{Data}^{\mathit{Keys}} \times \mathit{Tx\_Candidates}\right) \to \left( \wp(\mathit{Keys}\times \mathit{Data})\times \mathit{IO–commands} \right)\)
Inputs:
- the previous state, represented as a function from Keys to Data
- the TransactionCandidate itself
Outputs:
- The new \(\left\langle \mathit{key}, \mathit{data}\right\rangle\)
pairs to be written to state.
- Naively, these must include all the Keys in \(\mathsf{write}(T)\), but we have an optimization for cases where \(T\) does not actually update this Key.
- This new state must be a deterministic result of the inputs.
- IO side-effects are allowed, including sending messages to users or other Identities in the Anoma network. The Executor may run these IO effects, but running IO effects is orthogonal to reading and writing to state, i.e., running IO effects must not affect the key value store. Whether or not the executor runs IO effects will (in V2) depend on additional information provided in transaction requests that the worker can pass to the executor (in the ExecuteTransaction message).
When the execution engine group executes a TransactionCandidate, it read the keys from the state that would be reached by executing all preceding TransactionCandidates7 to compute the \(\mathsf{executor\_function}\), and then updates its state. After executing a TransactionCandidate, the state of the write keys must reflect the result of the \(\mathsf{executor\_function}\) as applied to the read keys.
Timestamps¶
Each Transaction Candidate has a logical Timestamp which conveys ordering information relative to other transaction candidates. Timestamps, and thus TransactionCandidates, are partially ordered. As shards learn more information from consensus or the mempool, they are able to refine this partial order into a total order. Logical timestamps should include:
- a mempool worker id
- a TxFingerprint
- a hash of the transaction candidate
For each mempool worker, timestamp ordering should be consistent with the lexicographic order of batch number and transaction number (where the latter is unique w.r.t. each batch).
!!! todo make proper logical timestamp type description file. I think we have one somewhere, and I'm not sure where.
The state stored at each Timestamp must be the one that would result from applying the \(\mathsf{executor\_function}\) to each TransactionCandidate in the total order of their Timestamps.
Engines Overview¶
Shards¶
Shards store and update the state of the RSM. Different shards may be on different machines.8 Within a Validator, Keys are partitioned across the Shards for load-balancing. Redistributing state between Shards is called re-sharding. For V2, we assume there will be no re-sharding.
For each Key, each Shard maintains a (partially-ordered) timeline of Timestamps of TransactionCandidates that read or write to that Key.9
This requires Shards to receive and process ordering information from mempool and consensus. Shards also store the data written by each TransactionCandidate that writes to that Key. This is multi-version concurrent storage.
When a Shard has determined the value a TransactionCandidate reads from a key, it sends that value to the corresponding Executor. In the Shard page, we detail optimizations for getting read values to Executors as quickly as possible.
Executors aka Executor Processes¶
Executors are processes that compute the Executor Function and communicate results to the Shards. Each TransactionCandidate is assigned its own Executor process, which receives the full TransactionExecutable from the mempool workers. The Execution Engines might keep a pool of Executors, or spin a new one up with each TransactionCandidate.
The Executor sends read requests to the relevant Shards to obtain values the TransactionCandidate needs to read at runtime; then the Executor computes the Executor Function on the values received from the Shards, and finally sends any values to be written to the relevant Shards. To save on communication costs, Executors can be co-located with Shards, Worker Engines, or both.
Read Backend¶
Read-only Transaction Candidates generalize state reads: any time someone wants to read (or compute something from) the official state on the validator (without changing it), it's a read-only transaction. We need a public-facing process that communicates with the shards and executors much like the mempool. The primary difference between the Read Backend and the mempool in terms of interface is just that its transactions cannot write to state.
We detail the Read Backend engine here.
-
For V1, the mempool was already determining a total order, because we only had a single unique worker and transaction fingerprints already provide all information of a total order. This is no longer true in V2. ↩
-
In fact, this could be made precise using abstract data types. ↩
-
In V2, we make the additional assumption that Keys can have structure. For example, we can arrange them in a forest, to allow us to easily express sub-trees using prefixes. This would allow labels to express potentially infinite sets of keys in a compact way. This will also require that we address the challenge of re-sharding state, i.e., re-partitioning the state, e.g., due to migration of validators around the globe. ↩
-
From V2 onwards, mempool interacts with consensus to establish a total order. ↩
-
Again, note that from V2 onwards, this will be in collaboration with consensus. ↩
-
However, we also assume mulit-version storage. ↩
-
Recall that we may known what this state is as soon as all relevant previous transactions are executed, i.e., those that may have an effect on the keys read. ↩
-
For V2, the number of shards may be variable and the challenge of re-sharding needs to be addressed. ↩
-
The timeline is based on a shard-specific view of the mempool DAG. ↩