-
Notifications
You must be signed in to change notification settings - Fork 122
D3log Design
A divergent variant of the distributed controller design is currently being implemented. This page will be updated once that is done.
D3log is an effort to distribute differential-datalog
computation among a set of compute nodes. This document acts as the
design space exploration document.
We identified two possible architectures. In each nodes are connected through TCP.
The first architecture has a global control plane that is assumed to be highly available and that is orchestrating changes to the system. The entity is assumed to have the full view of the system. High availability is ensured through replication or similar means.
+----------------------+
| Global Control Plane |
| (highly available) |
+-/------/----\--------+
/----- /- -\
/------ /- -\
/----- / -\
+---------------+ /- -\
| Node 1 | / \
|---------------| /- -\
| Local Control --- /- -\
| Plane | \--- / -\
| | /---- -\
+---------------+ /- \---- \
/ \--- -\
/- \---- -\
/ \--- -\
/- \---- -\
/- \---- +---------------+
+---------------+ \--- | Node 3 |
| Node 2 | \-----------------|
|---------------| | Local Control |
| Local Control ------------------------------------------| Plane |
| Plane | | |
| | +---------------+
+---------------+
(https://textik.com/#89fae6998c0d415f)
It is imaginable that the global control plane is implemented using a replicated state machine that runs on actual compute nodes, i.e., that resources are shared between control plane and compute nodes.
An alternative approach uses a distributed key-value store (DKVS) as the source of truth pertaining the system state. There would be no global controller in this model, rather, all nodes subscribe to updates on this store and would react to them locally. A selective subscription mechanism, i.e., one that allows subscription to a certain namespace or set of keys, would reduce the amount of chatter and unnecessary computation happening. E.g., certain nodes may only be interested in changes to certain keys and with a selective subscription approach they would only receive updates for those as opposed to also receiving updates for unrelated changes which they would have to identify as irrelevant.
The main difference to a central controller approach is that there is no central "active" entity taking care of the orchestration. Instead, that logic is split amongst the individual compute nodes.
Similar to the global control plane approach, the distributed key-value store itself could share resources with the actual computation (i.e., run on the same nodes).
For the sake of simplicity, we decided to go with the distributed controller approach. Not having a global controller perhaps most importantly eliminates cases where the controller is partitioned off from the rest of the system. With a central controller approach, it is not clear how we would be able to keep the computation running in such a case, because the central controller would interpret such a partition as failure of those nodes and recreate the relevant part of the program.
It also eliminates the need to implement a "control channel", as we would really only need a connection to the DKVS. On the other hand, now we have to deal with creating a proper schema for how to represent our state in the DKVS. We probably would need something similar anyway at some point, but for an initial version that presumably could be more ad hoc.
This section lays out the state that our system will require and maintain.
The program defines what our computation looks like. It is not clear how we could easily represent the computation we want to perform and put it into our DKVS. At least in part the complication arises from the fact that we have a compilation step (producing Rust source code) involved.
If we were to hand over the vanilla Datalog program (which also does not yet support annotating parts of the computation ready for distribution; something that clearly requires much more thought), we would need to compile that. That's a lot of plumbing that we should be able to punt to a later point in time.
So instead, in the first version we assume that each node has the produced Rust artifact for the program to compute present locally. That obviously limits us in that we can now only compute a single program (easily), but that seems to be a reasonable compromise to cut down on features for the initial version.
With that we gain the ability to directly reference relations (with their numerical ID) and can use that to describe input & output relationships between nodes (i.e., which relation is connected to which other).
Given the assumption that the program is present on each node already, everything we need to instantiate the computation is mostly information on how to connect relations on the individual nodes. That is, we need to describe:
- How input relations receive their data (those that are not fed with deltas from other output relations).
- Which output relations are connected to which input relations.
- Presumably we may also need to describe how data is consumed or made available at the "final" output relation (one that does not feed data to another input relation).
Problems 1) and 3) we can probably ignore for now and just assume a certain mechanism. E.g., 1) could just be an adapter that reads data from a file. 3) could be output into a file.
To satisfy 2) we essentially need a mapping from input to output relations. This mapping will be stored in the DKVS. In its simplest form (and given that we have numerical IDs available), representation could be as simple as:
input_1 <- [output_1, output_n, ...]
input_2 <- [output_m, output_o, ...]
...
where input_x
and output_y
are just numeric IDs of corresponding
relations. That is, we describe the output relations (can be multiple)
that feed deltas to a particular input relation.
TODO: Will we also need the redirection information? Probably...
TODO: How is topology different from the membership service that was brought up? Should those two things be different? In fact, why do we actually need topology information to begin with? If every node that becomes available just announces itself to the DKVS and picks some part of the computation that is available (if any), then perhaps we don't need this input at all?
We need some way to describe the topology that is available to us (i.e., the nodes we can use). This data should reside in the DKVS, where it can be updated as members come and go.
At this point it is not clear how we differentiate nodes from each other and decided what to any individual one for (i.e., based on which criteria we would pick one over another for a particular computation).
Until that is clear, the simplest way to represent a topology is just as a list of IP addresses.
The "configuration" (for lack of a better term) describes which node actually performs which part of the computation. This configuration could like as follows:
node_1 <- [input_1, input_n, output_1, output_m, ...]
node_2 <- [...]
Here, a node (represented by IP address/DNS name, presumably), has an assignment of relations (again, represented by numeric IDs) that reside on it.
Before a node is taking over some part of the computation it would compare-and-swap an updated configuration in there that includes itself "managing" a certain set of relations.
On the DKVS we have a hierarchical namespace we can use for storing our data. In there, data would be stored as follows:
<root>
└── d3log
├── input
│ <JSON serialized array of (input rel, [output rels]) tuples>
└── configuration
<JSON serialized array of (node, [rels]) tuples>
Going with JSON encoded data (instead of some binary encoding) has the benefit of being human readable, and speed or size are unlikely to be of an issue here, even for large computations.
Multiple Phases:
- once global controller booted it waits for incoming connections from nodes OR
- it just boots and then waits for the program to arrive -> strictly speaking there is no need to get informed about nodes having booted, the controller could just retry connecting to them once it needs to -> on the other hand, in order to connect to them they need to listen on a specific port; we could cut out this dependency by making nodes connect to the global control plane
- but that would mean local control planes need to know where to find the global one (i.e., they need some configuration), which is probably something we want to avoid unless we really require it
=> Design: local nodes boot up and listen on a specific port on which the local control plane accepts connections from the global one
-
program contains addresses to nodes (IP/DNS name) and program to run on each node TODO: Perhaps "actual" node names are unnecessary in the program itself?
-
global control plane connects to each node, reporting an error if that connection fails (it could retry for a certain amount of time as well or whatever)
-
this connection constitutes some form of control channel
-
that control channel comes with a heartbeat to detect failed nodes
-
list of supported messages on the control channel:
-
Ping
&Pong
// ping the other end and expect a pong within a // certain deadline -
Adjust(state)
// set new desired state, i.e., how to connect // to other nodes etc. (can be called before and // afterRun
-
Run(prog)
// start the computation running the given // program
->
state
above essentially needs to describe the connections between nodes and what data moves from where to where- this data needs to be inferred from the program passed to the control plane
- a program could look as follows:
prog:
- (prog1, node1, out1: [p1_1_out: [node2, ...], p1_2_out: [node3], ...])
-> that is:
-
prog1
is the actual program -
node1
is the DNS name of the node this program runs on -
out1
is some description of the output relations and how they should connect to other nodes
-
- (prog1, node1, out1: [p1_1_out: [node2, ...], p1_2_out: [node3], ...])
-> that is:
-
TODO: Do we want to fit all of that into
ddlog
relations? -
2.1)
- description of the desired local configuration is pushed to each
node over the control channel (via
Adjust
message) - program to run is pushed to each node and then started (
Run
)
- Node failure:
- detected via heartbeat timeout
- that mechanism can't detect malicious behavior or anything like that which is out of scope
- heartbeat timeouts may actually also trigger on slow nodes
- or in case of network partition -> in both cases data may still be pushed to nodes; can that be a problem? I guess it should not be?
- Network partition:
- could be between global control plane and local ones, just between local ones, or anything in between
- unclear how we deal with that