learning-at-home / hivemind Goto Github PK
View Code? Open in Web Editor NEWDecentralized deep learning in PyTorch. Built to train models on thousands of volunteers across the world.
License: MIT License
Decentralized deep learning in PyTorch. Built to train models on thousands of volunteers across the world.
License: MIT License
(after we make a standardized way to run servers)
It would be great to add a basic tutorial instead of this stub. Ideally, training a simple DMoE-based model hosted on one or a few devices.
Write a quick-start guide that covers:
Currently DecentralizedAverager supports one compression option for all tensors.
That is problematic if we want to go beyond float16 compression
Extras:
It would be very useful to automatically test that some new feature haven't broken model training.
Please implement a test that trains a simple model on synthetic dataset or MNIST (preferred) and test that it converges after a certain number of steps (i.e. loss is below some threshold).
Back in the day, Maxim implemented a basic MNIST training example that could come in handy.
You can find it here: https://gist.github.com/justheuristic/01d5ffe9c534d90e40badff35653ba7d
Recommendations:
This issue contains a list of features that are nice to have but not required immediately. Feel free to add stuff, edit stuff or convert stuff into working code :)
If you're interested, please leave a comment first.
Update: this idea was generalized in https://openreview.net/forum?id=U1edbV4kNu_ , we will merge the extended version.
Right now every expert is served by at most one server. If we have 100500 GPUs but we don't want that many experts, we can implement asynchronous data parallelism within each expert.
Design idea: if an expert is already present in the network as an ExpertBackend, let others replicate it via some kind of ReplicaExpertBackend. Replica periodically requests expert's parameters and sends accumulated gradients with some staleness information. In turn, master ExpertBackend incorporates these gradients into training. This will improve performance if we load updates and send accumulated gradients infrequently enough.
Update: this idea was generalized in https://openreview.net/forum?id=U1edbV4kNu_ , we will merge the extended version.
Currently all hivemind servers start with a fixed number of experts and maintain these experts until the server is shut down. Ideally, we could dynamically load most promising experts from the network and accommodate them on the current server.
Can we pose it as some optimization procedure that is performed locally from every server? For instance, let's say that each gating function call "rewards" chosen experts proportionally to their weight in the mixture for that call's input - but only if the expert returned something.
In that case, we could design HivemindServer to optimize its total reward. For instance, a server can periodically replace low-reward experts with the ones that have highest expected reward per {second? per computation? depending on server status}. If replication is implemented, a server can choose between hosting a new expert or sharing a 1/N-th of a more popular expert.
Perhaps we can even re-think HivemindServer's behavior as optimization of the expected loss function?
Update: @mryab and @borzunov significantly improved the way we do CLI logs. It seems that implementing a visual dashboard is not a priority now.
Debugging decentralized systems is easier if you can view the history of what happened.
We could use some user-friendly way of logging server load, latency, the number of errors, etc.
Currently no such thing exists. Ideally, we should use something like tensorboard/graphana as a monitoring tool.
Suggested by Dmitriy Afananasiev
Update: will attempt in sub-projects, summer 2022
Right now HivemindServer accepts requests and goes dark until they are processed. Instead, we could instantly comunicate some status response e.g. "received request, i will answer in approx. T seconds". Using this, a RemoteExpert could choose to wait for response or cancel it right away (or choose a different replica).
Updatge: Implemented a while ago.
Right now, calling DHT.get for a value with subkeys has two potential outcomes:
It would be convenient to also have an option to
Describe the bug
When installing hivemind from PyPI, we can't use the hivemind-server
CLI command, as it tries to import from the folder scripts
:
$ hivemind-server
Traceback (most recent call last):
File "/home/mryab/miniconda3/envs/hivemind_env_test/bin/hivemind-server", line 5, in <module>
from scripts.run_server import main
ModuleNotFoundError: No module named 'scripts'
We should probably change the directory name the way it's done in transformers or fairseq.
To Reproduce
Install hivemind from PyPI and run hivemind-server
Environment
Currently form_batch has a nasty problem: if batch is nearly full and the next task has multiple examples, pool adds this task to a batch even if batch size exceeds max_batch_size.
Can we change this to make sure that
Please use manage_technical_debt branch
Since 0.8.15, RemoteExpert and RemoteMixtureOfExperts will automatically convert output tensors to the same device as inputs. This works fine in most cases, but one can imagine a niche use case where some of the tensors should stay on CPU.
Perhaps we should switch to respecting devices from RemoteExpert.info['output_schema']
Since we use one accelerator per hivemind.Server, we can support a soft typing as such:
A list of small issues to be solved in bulk before we bump the version
Clean up utils:
utils/__init__.py
?Discussion: gRPC proto files
Etc:
On the surface, QUIC is praised for two features that we may need:
That said, there can be caveats in switching to quic, and we would appreciate opinions from those who have experience using it on a large scale. Feel free to comment below.
If everything goes well, it would be great to implement a QUIC-based prototype for hivemind.DHT and/or hivemind.Server and benchmark them against current gRPC-based implementation.
There are several ways to use QUIC in python:
Based on discussion with @UARTman on 09.02.2020 - we can greatly improve TesseractNetwork by specializing Kademlia DHT to our needs. As of now, there are two issues with the default implementation:
Operating under NAT
At the time of writing, a RemoteExpert client always connects to a runtime via an IP and a port. This is impractical since participants may have have no open ports. We could instead implement direct p2p connections via dht.
Faster lookup
The majority of keys stored in our DHT are expert prefixes used by gating function. These prefixes are not meant to live indefinitely: if no runtime sends prefix information in
However, Kademlia assumes that if you store a key, this key must be still there after some time. This causes many inefficiencies such as wasting memory on stale keys or "refreshing" keys every 1 or 24 hours.
Subj. Reported by @unconst . Need to fix and add a test for that.
Currently, RemoteExpert
uses the same compression type for forward and backward passes. This may not be optimal.
[based on @MaximKsh 's ongoing work on #149 ]
Implement a functionality in hivemind.P2P for communicating with other P2P instances
Ideally, we would like to support the following functionality:
### process 1 ###
peer1 = hivemind.P2P()
# runs p2pd in background, instantiates p2pclient, starts listening for incoming connections
async def my_function(x):
return x ** 2
await peer1.add_handler('foo', my_function)
print(peer1.id) # prints long hash-like address
### process 2 ###
peer2 = hidemind.P2P()
x = 5
res = await peer2.call(PEER1_ID_HERE, 'foo', x)
assert res == 25
Functionality-wise,
P2P.__init__
, add docstring for what is relevant for hivemindCreate a simple setup with 3 nodes where
TODO:
As before ( #64 ), we have a number of things we may want to do right before the v0.9 release
RemoteMixtureOfExperts uses grid_size while server side uses expert_pattern (e.g. ffn.[0:256].[0:256]
), should we switch to expert_pattern everywhere? (@justheuristic )
Should we use slots for data structures such as _IntermediateResult
Should we switch to declaring objects bottom-up vs top-down? For instance, _IntermediateResult is used before its declared (@mryab )
theguyshetoldyounottoworryabout rename it? (@mryab )
Can we get rid of return_futures=True option in node.get_*
rename _IntermediateResult to SearchState, rationale: it is not necessarily a result, it also stores search data other than the result (@justheuristic )
rename LocalStorage to TimedStorage (or similar)? reason: it is not always used as node's local storage (@justheuristic )
make alias DHTID.from(x) = DHTID.generate(source=x)? (@justheuristic )
why are we using umsgpack instead of just msgpack? (@justheuristic )
we force the same vector compression rules during forward & backward pass #99 (@Vsevolod-pl )
update documentation on hivemind.dht: subkeys, beam_search, no first_k_active
update documentation on hivemind.server: compression
consider setting maxsize to DHTValueType, e.g. 10k
similar: consider forcing maxsize on any DHT record
rename hivemind.utils.grpc -> hivemind.utils.protobuf ?
tests still raise warning about non-copyable ndarray in (de)serialize_torch_tensor
remove receiver_threads in DHT and DecetralizedAverager, hard-code 1 thread (@justheuristic )
update grpc version, replace grpc.experimental.aio to grpc.aio ( grpc/grpc#23240 )
in hivemind.dht.DHT._get(endpoint) we're currently not caching channels at all. Should we?
Investigate segfault in our circleci env (@mryab @justheuristic )
perhaps we should move all heavy class definitions out of init.py modules (@mryab )
Allow different tensor compression schemes for backward pass? #99
rewrite test_dht_protocol, test_empty_table and test_dht_node to use await instead of loop.run_until_complete
what will the server do if it is given an input with incorrect requires_grad? (not as in schema)
proper channel caching: we could implement a centralized process-local channel cache
since we're using gRPC for all requests, we may be able to share one port for everything
should we replace prints in tests/benchmarks with logger.info? Do we need as many prints in dht tests? (@mryab , see #116 comments )
currently test_utils is a package containing a single function used on just one occasion, should we just move it there?
should we make our daemons resistant to KeyboardInterrupt when they are running in background?
naming collision: hivemind.utils.grpc actually points to the external package, not hivemind/utils/grpc.py. This is due to wildcard imports (from hivemind.utils.grpc import *). Should we switch away from them?
DHT.get_my_endpoint() implemented by pinging k random peers
currently we have no limit on gRPC message size in ConnectionHandler , this is a potential vulnerability for open infrastructure. Should we impose an upped limit and send large tensors in chunks?
TimedStorage: add .pop() method
DHT/DecentralizedAverager - rename return_future to something intuitive, e.g. sync=False
MPFuture: make sure await asyncio.create_task(mpfuture)
works without extra wrappers
In DecentrlizedAverager there is a lock_averaged_tensor with is not an asyncio-friendly lock. If we acquire it for long, the averager will be paralyzed! We should run it in executor or make a special mp+aio lock.
Right now if an ExpertBackend uses dropout or other non-derterminism, it will use different random during forward and backward passes.
Torch has previously solved this problem via gradient checkpoint - see the source code. Can we implement something similar?
Please use manage_technical_debt branch
(as reported by @mryab )
Currently, both hivemind.server.connection_handler and hivemind.client.averaging.DecentralizerAverager run serialize_torch_tensor and deserialize_torch_tensor directly in asyncio loop.
This may increase latency, especially when using tensor compression. We should try running it in executor and see if it affects the performance in any way.
As of right now, hivemind.Server supports arbitrary expert class with fixed input sizes, but it is impossible to deploy custom experts with hivemind-server
script. As a result, users who need custom experts must launch hivemind.Server manually from python.
We must refactor the server creator to support custom experts:
register_expert
decorator in hivemind/layers/register.py
that adds a given expert@hivemind.register_exprert('my_expert_name', sample_inputs=lambda hid_dim: (torch.randn(3, 128, hid_dim))
class MyCustomExpert(torch.nn.Module)
...
hivemind/server/layers
to use register_expert
instead of manually adding experts to global dictionariesget_expert
method in hivemind/layers/register.py
hivemind-server
script, add an option to import a custom module (that may or may not register experts inside)hivemind/tests/test_moe.py
that creates a server with a custom expert, then runs forward and backward for this expertTake functionality from https://github.com/mhchia/py-libp2p-daemon-bindings and merge it inside hivemind.
First of all, we need base functionality:
Use asyncio instead of anyio.
CodeCov, while being pretty useless right now, will be essential at some point in the future, when our tests will be numerous.
Implement quantile compression:
In our preliminary experiments, all peers have independent gating functions and we can only synchronize them manually. It would be great to implement some sort of builtin averaging mechanism.
For instance, every T seconds, assemble peers into groups at random, then perform all-reduce within each group. In case of failure, rollback and repeat T seconds later.
Can we make tesseract work in python3.7? We used to requre py3.8 features like multiprocessing.shared_memory but they are gone now.
If we can run in py3.7 after minor modiffications, please apply those modiffications and modify compatibility list in setup.py.
Right now, hivemind works with default tcp protocol with no security. If we are to run on a global scheme, we need to:
1. make sure peers do not risk their personal data by running hivemind nodes
This is the boring, but necessary one: we need to make sure we use up-to-date security protocols in both hivemind.dht and hivemind.client/server interaction
2. figure out how to make hivemind model resistant to malicious peers
This part is more tricky, there are several attack vectors:
2a. send NaN or wrong gradients to the expert in order to jeopardize its parameters
2b. overwrite existing keys in DHT with wrong information to prevent other peers from finding experts
There are intuitive ways to resist both of these vectors by e.g. dropping outlier gradients in 2a, but security usually works better with expertise, not intuition.
We would appreciate any advice on potential attack vectors and ways to mitigate them.
problem: if many peers join at once, they will all pick one averager (latest at the time) as a target for loading initial state. This is causes choke points as one averager struggles to service many newcomers.
possible solution:
Currently we test commits manually via tests/benchmark_throughput.py
. It would be way better to implement automatic tests that cover current implementation.
At least, we should cover:
This is a global project roadmap that states our priorities for the nearest future. These priorities can and should be disputed here or elsewhere, after which we will update the roadmap.
Target scenario: 100 volunteers training 2xl-like over the internet
Target scenario: 500 peers training 1B+ over the internet
As of now, forward/backward_timeout
arguments correspond only to timeouts for Server interactions. However, this is not the only possible cause of freezes: for example, beam search might take too long to finish during the forward pass. To fix this, we need to make both timeouts more general (that is, they should measure the execution time of the entire function)
(@mryab 's suggestions)
Right now we have to create server configs manually in python. Lets create an option to bootstrap a server from a config file!
Right now, our tests can take upwards of 10 minutes both in CircleCI and locally, which slows down the development workflow and leads to unnecessary context switches. We should find a way to reduce the time requirements and make sure it stays that way.
We need a simple training configuration where we can test the effects of different strategies for decentralized adaptive optimizers
Known strategies
Implement
Compare convergence with equal bandwidth load
(based on @mryab 's suggestion)
Add an option for hivemind.server.Server to save all experts regularly. Probably as a background thread. Hopefully not reducing Runtime throughput.
del
or process exitdel
It would be really useful if we had a simple Dockerfile containing the build instructions for the library. Then, we could create an image which would be easy to distribute and run for the volunteer participants without setting up the environment themselves.
Virtual batching and LR scheduling are popular techniques with many applications. It would be nice to have an example of how to implement them with hivemind.
As of 0.8.15, ExpertBackend supports two ways of doing so:
<edit>
The goal of this project is to implement pre-configured decentralized optimizers such as DADAM and decentralized Local AMSGrad. We intend to create a general interface that runs adaptive optimizers with periodic averaging of gradients, parameters and optimizer statistics on different time-frames]
The final version should support:
</edit>
Code draft:
# usage:
class LAMBWithAveraging(FusedLAMB, AdaptiveOptimizer):
def get_averaged_statistics() -> Sequence[Tensor]
return TODO()
opt = DecentralizedOptimizer(model, base_optimizer=LAMBWithAveraging(model.parameters()),
prefix='MY_UNIQUE_STRING')
### code
class AdaptiveOptimizer(abc.Mixin):
def get_averaged_statistics(self):
raise NotImplementedError()
class DecentralizedOptimizer(abc.meta):
def __init__(self, parameters, base_optimizer, dht, prefix: str, ..., ):
assert isinstance(base_optimizer, AdaptiveOptimizer), "TODO see example here"
self.parameters = list(parameters) # on GPU
self.base_optimizer = base_optimizer
self.global_step = ...
zeros_like_grads = [torch.zeros_like(x.detach().cpu()) for x in self.parameters]
self.grad_averager = DecentralizedAverager(
averaged_tensors= zeros_like_grads # average gradients only
prefix=prefix+'_grad_averager', **some_kwargs
)
parameters = [x.cpu().detach().float() self.parameters()]
averaged_statistics = [x.cpu().detach().float() for x in base_optimizer.get_averaged_statistics()]
self.almost_v_averager = DecentralizedAverager(
averaged_tensors= parameters + averaged_statistics # average params and second momenta
prefix=prefix+'_v_averager', **some_other_kwargs
)
self.v_averaging_job = None
def get_model_params_and_v(self) -> Tuple[torch.Tensor, ...]:
TODO()
def step(self):
accumulate_grads()
report_progress_to_dht()
if self.v_averaging_job.done():
local_params_and_v = self.paramters + self.base_optimzier.get_averaged_stats()
with self.almost_v_averager.get_tensors() as averaged_params_and_v:
copy_from(averaged_params_and_v).to(local_params_and_v)
if accumulated_enough():
local_grads = [x.grad for x in model.parameters()]
with self.grad_averager.get_tensors() as averaged_grads:
copy_from(local_grads).to(averaged_grads)
self.grad_averager.step()
with self.grad_averager.get_tensors() as averaged_grads:
copy_from(averaged_grads).to(local_grads)
# TODO maybe average gradients continuously in background
if not self.use_local_statistics:
TODO(revert base_optimizer.get_averaged_statistics() to prev averaged state)
self.base_optimizer.step()
self.global_step += 1 # MAKE SURE IT IS in sync with collaboration
if not self.use_local_statistics:
TODO(revert optimizer statistics back to local ones AFTER local update)
### AVERAGE PARAMETERS AND STATISTICS! ###
if self.global_step % self.K == 0:
if not v_averaging_job.done():
if self.await_v_averager:
v_averaging_job.wait()
local_params_and_v = self.paramters + self.base_optimzier.get_averaged_stats()
with self.almost_v_averager.get_tensors() as averaged_params_and_v:
copy_from(averaged_params_and_v).to(local_params_and_v)
else:
warning("Param averager is taking too long, TODO explain why we skip")
return
local_params_and_v = self.paramters + self.base_optimzier.get_averaged_stats()
with self.almost_v_averager.get_tensors() as averaged_params_and_v:
copy_from(local_params_and_v).to(averaged_params_and_v)
self.v_averaging_job = run_in_background(almost_v_averager.step())
def zero_grad(self):
self.opt.zero_grad()
zero_grads_in_self_grad_averager()
self.report_to_dht()
When creating DHT, calling run_in_background makes the process wait forever. So tests don't finish.
Right now, RemoteMixtureOfExperts.beam_search can take hundreds of milliseconds if at least some of DHT peers are slow to respond. This is initially due to two bottlenecks:
Suggestions:
Move beam search to hivemind.dht.DHT (as discussed with @mryab)
Right now, every beam search call invokes hundreds of requests to a DHT. Each request creates a separate MPFuture (pipe-based) which induces communication overhead. Let's instead implement beam search inside hivemind.dht.DHT and only make one request per search (or even per batch).
Asynchronous beam search (as proposed by @unconst)
A beam search for every batch happens in node's local cache. All experts that are not in cache are considered absent for this search, but there is a background process that constantly adds new experts.
Variant: specify a time budget, search across the DHT for up to this many seconds. On hitting the deadline, reuse what you can from cache.
Better DHT prefixes (by @me)
Right now, DHT prefixes are stored as binary entries. This means that if we have a key structure of [0..999].[0..999].[0..999], each beam search step requires 999 requests to a DHT in the worst case.
To avoid this complexity, we can support an additional DHT data type: unordered set.
A key of this type has multiple values with independent expirations.
With this data type, we can store prefixes as dictionaries {next_index -> expert}, e.g.
DHT["expert.110."] = {
"1.": "expert.110.1.15",
"20.": "expert.110.20.119",
...
"248.": "expert.110.248.119"
}
So, we no longer need first_k_active: we just retrieve the prefix and it automatically tells us all the active suffixes.
Variant: during traverse_dht, collect all matching dictionaries and combine their keys.
Negative caching:(by Dmitry Afanasiev)
Every time a DHT key is not found, add it to a negative cache for T seconds. During that time, all subsequent GET requests return None without search. If a key released from a negative cache and is not found again, add it to a negative cache for 2 * T seconds. When a key is found (or stored on this node), reset the negative cache timer to 0.
Variant: when a key is released from negative cache, search for it in background without stalling the beam search procedure?
Right now, if a RemoteExpert sends incorrect data to a TesseractServer, the task will not return anything, making it hard to debug. The same problem occurs when runtime fails for any other reason (e.g. OOM).
We can do better than this:
TaskPool.form_batch
), weed out malformed tasks BEFORE forming a batchRight now, if a peer leaves the network permanently, his experts are gone with him. Instead, it should be possible to save best experts by using p2p storage similar to BitTorrent.
There are several ways to implement such storage and we should first figure out which one will best suit our needs:
If you're willing to contribute to such system, please leave a comment below.
In preliminary tests we observed
It would be great to enable quantization as a builtin feature for ExpertBackend, e.g.
expert = ExpertBackend(..., outputs_schema=[BatchTensorDescriptor(..., compression="int8_normalize")]
Here's AllReduceProtocol's inner logic: . It implements Butterfly All-Reduce
Note the part_sizes
- these are fractions of the averaged vector assigned to each respective peer.
Request:
if support averaging with client-mode peers:
Client-mode peer always has part_sizes[i]==0
Such peers can call aggregate_part from others, but no one should call aggregate_part on the client-mode peer
implement a basic test case similar to this, but with 1-2 client-only peers
update AllreduceRunner to work with the updated AllreduceProtocol
Currently for testing purposes we create hivemind.Server in a background thread for each test. It seems that CUDA initialization happens somewhere in this thread, so we can't have two tests using server communication in a single file (otherwise we have CUDA initialization error).
A better testing strategy would be to start a server in a separate process, because that is intended usage of a server and the entire backend logic is built around process (or network) communication. Moreover, it should enable us to run tests on GPU given the previous paragraph.
figure out a way to install hivemind in google colab
Test connectivity:
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.