Skip to content
This repository has been archived by the owner on Jul 19, 2022. It is now read-only.

Distributed Node Operation #175

Open
7 of 8 tasks
mormj opened this issue Dec 20, 2021 · 5 comments
Open
7 of 8 tasks

Distributed Node Operation #175

mormj opened this issue Dec 20, 2021 · 5 comments

Comments

@mormj
Copy link
Contributor

mormj commented Dec 20, 2021

The goal of this feature is to setup flowgraphs that can span multiple nodes.

A first pass at this feature, or proof of concept I think involves the following:

  • networked custom buffers for passing streams across nodes
  • networked message passing
  • description of the remote execution host that can be passed to the partition function via the domain objects
  • serialization of the graph (edges + blocks + custom buffers) to a string that can be passed to the remote execution host to set up that part of the flowgraph in the same manner
  • a remote execution daemon with some sort of RPC that can communicate with the main flowgraph
  • ability to pass callbacks
  • ability to pass tags
  • validation of flowgraph elements on remote host

Other ideas for doing this have been proposed using containers
Not sure yet what this would mean for embedded host environments

@jsallay
Copy link

jsallay commented Dec 21, 2021

I would be one of the those supporting containers. When you work across nodes there are so many chances for things to go wrong (the node goes down, that portion of the flowgraph crashes, network throughput issues, etc) that I think it would be really hard to make something that "just works" without leveraging existing technologies.

I have been looking at argo dataflows (which is built on top of kubernetes) https://github.com/argoproj-labs/argo-dataflow . It provides for distributed processing across containers and nodes and checks many (but not all) of the boxes you have above. One feature of particular interest is that it can auto scale the number of containers available based upon your workload. (Imagine a spectrum survey application, where once we detect a signal, it will start processing it. Argo could reduce/increase the processing resources available based upon the amount of traffic coming in.)

The biggest drawback to this right now is the amount of boilerplate that must be written. A flowgraph has to be split into smaller flowgraphs. We have to create Dockerfiles, and kubernetes manifests. I think that most of the boilerplate could be automated. From the discussion at GRCon, I had in mind a kubernetes, argo, or docker-compose custom scheduler. Behind the scenes, it could split up the flowgraph, write Dockerfiles and manifests and launch them. To use the custom scheduler, you would have to install the underlying technology and add in some configuration, such as available nodes, image registry locations, etc. But with that info, I think it would be pretty straightforward to switch launching the flowgraph with the normal scheduler vs the distributed scheduler.

@mormj
Copy link
Contributor Author

mormj commented Dec 21, 2021

@jsallay - thank you for the comments. I agree that leveraging existing technologies will be absolutely necessary other than proving some toy example. Let's run with the container example - even with fixed resources. If I can build a docker that has the GR libraries installed that will contain part of my flowgraph, I still need to get the docker set up to instantiate the flowgraph portion.

Right now in newsched there is a graph_utils::partition() that is used to split up the flowgraph across domain boundaries. This could easily be extended to split across remote hosts. But then how to communicate the flowgraph object to the remote host that will be on the container - has to be serialized somehow and have a control interface to pass it along. Haven't thought of a slick way to do this other than making all the objects involved in a flowgraph have a tostring() interface make subgraph.tostring() serialize a json string that can be loaded on the other side.

@jsallay
Copy link

jsallay commented Dec 21, 2021

I was thinking about the problem from a slightly different perspective. Let's take the simplest example of using docker-compose. The scheduler would convert the flowgraph into and a docker-compose file. For the time being, let's assume that the images needed already exist. The docker-compose file would essentially say launch a container of image x and run command y which would execute a portion of the flowgraph.

If we wanted to run with kubernetes, then the same principles apply, but it would create manifest files rather than docker-compose files. With Argo Dataflows, it would create a dataflow template.

So to now answer your question: The flowgraph portions would be communicated to the kubernetes instance. Kubernetes (or Argo) would then be told to launch a set of images. It would handle which portions get put on which nodes in the cluster and it would set up everything for them to be able to communicate.

@marcusmueller
Copy link
Member

marcusmueller commented Dec 21, 2021

My gut feeling would be that you'd not serialize the graph partition going to a "worker" node on the "controller" node (which knows the full fg), but that the workers run a newschedd with an RPC interface for building a (partial) flowgraph.

For example, the worker's firecracker VMs / containers / kubelets/ pods described above have their own daemon waiting for instructions like

create_port("to control host", { "type": "accel/ethernet", "parameters": { "remote": 0x002091abcdef, "vlan": 0 }})
create_block("random source", { "name": "xoroshiro", "accel": "force", "parameters": {"seed": 0xdeadbeef}})
create_block("random source 2", { "name": "xoroshiro", "accel": "force", "parameters": {"seed": 0xcafe0x}})
create_block("adder", { "name": "saturating integer addition", "accel": "optional", "parameters": {"inputs": 2, "bounds": [0, 1000]}})
connect_blocks("my beautiful subgraph", [[["random source", 0], ["adder", 0]]])
connect_blocks("my beautiful subgraph", [[["random source 2", 0], ["adder", 1]]])
connect_blocks("my beautiful subgraph", [[["adder", 0], ["to_control_host"]]])

and answer synchronously with futures that already contain the name specified as first argument, or simply directly asynchronously. (the thing is that for example creating "random source" might require the worker to trigger an FPGA image fragment synthesis, and connect_block might cause an FPGA to undergo partial reconfig. Or anything way less innocent/easier than that, but just as time-intense, so you don't want to block the control node by waiting for each node to finish setup before talking to other nodes).

Advantage is that this RPC framework would be good to have, anyway, especially in case we'd want to reconfigure later on. Also, the fact that there's individual instructions that can go wrong rather than just a whole FG description that "doesn't work" makes the logging a bit easier (not much, probably).

As a logical extension, the node themselves might decide to delegate things, like "hm, I don't have a hardware accelerated random number generator, but one of these 20 FPGA-accelerator equipped servers over there could have, so I just spin up a kubelet on one of them", and forward the block creation to that, and play proxy for all control-plane RPC responses.

@mormj
Copy link
Contributor Author

mormj commented Dec 21, 2021

So then from a practical perspective, getting this type of RPC set up seems like the logical next step. And can be done apart from any containerization - though we will want to do that it seems it is a separable work item.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants