Giter Site home page Giter Site logo

learning-at-home / hivemind Goto Github PK

View Code? Open in Web Editor NEW
1.8K 1.8K 134.0 12.31 MB

Decentralized deep learning in PyTorch. Built to train models on thousands of volunteers across the world.

License: MIT License

Python 99.85% Dockerfile 0.15%
asynchronous-programming asyncio deep-learning dht distributed-systems distributed-training hivemind machine-learning mixture-of-experts neural-networks pytorch volunteer-computing

hivemind's People

Contributors

artek0chumak avatar borzunov avatar cirquit avatar dbaranchuk avatar dependabot[bot] avatar dpirad007 avatar dvmazur avatar eltociear avatar foksly avatar greenfatguy avatar ial32 avatar ikmckenz avatar justheuristic avatar leshanbog avatar maximksh avatar mponty avatar mryab avatar muxaujl11110 avatar nevec avatar ploshkin avatar rapixar avatar restyled-commits avatar romakail avatar srogatch avatar uartman avatar vahe1994 avatar vsevolod-pl avatar xtinkt avatar yhn112 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

hivemind's Issues

Basic tutorial

(after we make a standardized way to run servers)

It would be great to add a basic tutorial instead of this stub. Ideally, training a simple DMoE-based model hosted on one or a few devices.

Write a quick-start guide that covers:

  • Installation
  • Toy example
    • single trainer + local server + MNIST
  • Setting up distributed training
  • Writing a custom expert
  • your suggestions :)

DecentralizedAverager per-tensor compression

Currently DecentralizedAverager supports one compression option for all tensors.
That is problematic if we want to go beyond float16 compression

  • allow setting different compression schemes for different tensors
  • implement some general "best practice" compression scheme that adapts to tensor dimensions
  • support tensor compression in load_state_from_peers
  • [optional] natively support lower precision in DecentralizedAverager

hivemind.dht nice-to-haves

  • bulk traverse_dht - implement a search algorithm that looks for several queries in parallel, reusing search metadata ( #53 )
  • channel reuse - DHTProtocol creates a new channel on every run. This is inefficient for consecutive requests (e.g. find -> store)
  • congestion - #51 broke max_concurrent rpc limit, @justheuristic has sworn to fix that ( #55 )
  • proactive caching - add an option for DHTNode to automatically re-get k most recently used keys that approach expiration
  • use uvloop ( #55 )
  • efficient routing table strucure for get_nearest_neighbors ( #60 )
  • update benchmark_dht
    • tune channel params on the benchmark
    • tune bucket size and caching

Extras:

  • add fault tolerance test
  • notify peers on shutdown

Basic Docs

  • update README.md
  • move documentation readthedocs.org
  • issue templates
  • add tutorial

Implement / revive convergence test

It would be very useful to automatically test that some new feature haven't broken model training.

Please implement a test that trains a simple model on synthetic dataset or MNIST (preferred) and test that it converges after a certain number of steps (i.e. loss is below some threshold).

Back in the day, Maxim implemented a basic MNIST training example that could come in handy.
You can find it here: https://gist.github.com/justheuristic/01d5ffe9c534d90e40badff35653ba7d

Recommendations:

  • Test should be easy to run, preferably a single command like this test;
  • Make sure tests can be run without GPU so that we can run them automatically for pull-requests;
  • When applicable, use test_utils module;

Assorted feature requests

This issue contains a list of features that are nice to have but not required immediately. Feel free to add stuff, edit stuff or convert stuff into working code :)
If you're interested, please leave a comment first.

Runtime: ReplicaExpertBackend

Update: this idea was generalized in https://openreview.net/forum?id=U1edbV4kNu_ , we will merge the extended version.

Right now every expert is served by at most one server. If we have 100500 GPUs but we don't want that many experts, we can implement asynchronous data parallelism within each expert.

Design idea: if an expert is already present in the network as an ExpertBackend, let others replicate it via some kind of ReplicaExpertBackend. Replica periodically requests expert's parameters and sends accumulated gradients with some staleness information. In turn, master ExpertBackend incorporates these gradients into training. This will improve performance if we load updates and send accumulated gradients infrequently enough.

Server: Dynamic expert loading

Update: this idea was generalized in https://openreview.net/forum?id=U1edbV4kNu_ , we will merge the extended version.

Currently all hivemind servers start with a fixed number of experts and maintain these experts until the server is shut down. Ideally, we could dynamically load most promising experts from the network and accommodate them on the current server.

Can we pose it as some optimization procedure that is performed locally from every server? For instance, let's say that each gating function call "rewards" chosen experts proportionally to their weight in the mixture for that call's input - but only if the expert returned something.

In that case, we could design HivemindServer to optimize its total reward. For instance, a server can periodically replace low-reward experts with the ones that have highest expected reward per {second? per computation? depending on server status}. If replication is implemented, a server can choose between hosting a new expert or sharing a 1/N-th of a more popular expert.

Perhaps we can even re-think HivemindServer's behavior as optimization of the expected loss function?

Server: Dashboard

Update: @mryab and @borzunov significantly improved the way we do CLI logs. It seems that implementing a visual dashboard is not a priority now.
Debugging decentralized systems is easier if you can view the history of what happened.

We could use some user-friendly way of logging server load, latency, the number of errors, etc.
Currently no such thing exists. Ideally, we should use something like tensorboard/graphana as a monitoring tool.

More responsive communication between RemoteExpert and HivemindServer

Suggested by Dmitriy Afananasiev
Update: will attempt in sub-projects, summer 2022

Right now HivemindServer accepts requests and goes dark until they are processed. Instead, we could instantly comunicate some status response e.g. "received request, i will answer in approx. T seconds". Using this, a RemoteExpert could choose to wait for response or cancel it right away (or choose a different replica).

Richer DHT.get with subkeys

Updatge: Implemented a while ago.
Right now, calling DHT.get for a value with subkeys has two potential outcomes:

  • latest=True -> fetch all subkeys with latest expiration time, take your time
  • latest=False -> gimme something - maybe just one subkey, maybe someone else's cached subkey

It would be convenient to also have an option to

  • required_subkeys - do not finish search until you find all of these subkeys
  • min_subkeys - do not finish until you find a certain number of subkeys with sufficient expiration time
  • while we're at it, DHTNode cache refresh should also re-request the existing subkeys

hivemind-server unavailable when installing from PyPI

Describe the bug
When installing hivemind from PyPI, we can't use the hivemind-server CLI command, as it tries to import from the folder scripts:

$ hivemind-server
Traceback (most recent call last):
  File "/home/mryab/miniconda3/envs/hivemind_env_test/bin/hivemind-server", line 5, in <module>
    from scripts.run_server import main
ModuleNotFoundError: No module named 'scripts'

We should probably change the directory name the way it's done in transformers or fairseq.

To Reproduce
Install hivemind from PyPI and run hivemind-server

Environment

  • Python 3.8.8
  • hivemind 0.9.0

Enforce max_batch_size

Currently form_batch has a nasty problem: if batch is nearly full and the next task has multiple examples, pool adds this task to a batch even if batch size exceeds max_batch_size.

Can we change this to make sure that

  • if adding another task to a batch would exceed batch size, we leave that task for the next batch
  • if a task exceeds max_batch_size by itself, we consider this task failed

Please use manage_technical_debt branch

_RemoteCallMany output devices

Since 0.8.15, RemoteExpert and RemoteMixtureOfExperts will automatically convert output tensors to the same device as inputs. This works fine in most cases, but one can imagine a niche use case where some of the tensors should stay on CPU.

Perhaps we should switch to respecting devices from RemoteExpert.info['output_schema']

Since we use one accelerator per hivemind.Server, we can support a soft typing as such:

  • if output_schema[i].device == cpu, always use cpu
  • if output_schema[i].device != cpu, allocate data on nearest device: prefer same index as input, else torch default device

pre-v0.8 refactoring

A list of small issues to be solved in bulk before we bump the version

Clean up utils:

  • Remove Connection after we switch to grpc-based connections ( #45 )
  • Do we really need this many serializers? (or serializers.py at all)
  • relative vs absolute imports: discuss
  • SharedFuture:
    • decided to update and keep it
    • rename to MPFuture?
  • utils.data - maybe rename or merge into utils/__init__.py?
  • Check if concurrent autograd is still necessary in pytorch 1.5 (maybe bump version)
    • no, but we only need it for _RemoteModuleCall.backward
      • Lets implement custom code in moe.py and ditch autograd.py!
  • (delayed) remove find_open_port in favor of builtin port picker or reserve_port (reason: find_open_port fails ~1-3% of the time)

Discussion: gRPC proto files

  • switch from compile_grpc function to pre-compiling in setup.py? (if so: add to pb2 files to .gitignore)
  • use a dedicated folder to store .proto files

Etc:

  • maybe merge hivemind.runtime back into hivemind.server?
  • move all "Thanks to X" notes to acknowledgements section in the readme
  • update PyPI package (0.8.0)
  • create a test case for python -m test_utils.run_server (rationale: we broke it on more than one occasion)
  • msgpackserializer - use strict type checking
  • allow setting max_send/receive_message_length when creating a server
  • conn_handler_processes => num_connection_handlers to avoid confusion?
  • which branches do we still need?

Discussion: use QUIC?

On the surface, QUIC is praised for two features that we may need:

  • establishing connections faster
  • simpler NAT traversal (quic uses UDP)

That said, there can be caveats in switching to quic, and we would appreciate opinions from those who have experience using it on a large scale. Feel free to comment below.

If everything goes well, it would be great to implement a QUIC-based prototype for hivemind.DHT and/or hivemind.Server and benchmark them against current gRPC-based implementation.

There are several ways to use QUIC in python:

Specialized DHT

Based on discussion with @UARTman on 09.02.2020 - we can greatly improve TesseractNetwork by specializing Kademlia DHT to our needs. As of now, there are two issues with the default implementation:

Operating under NAT
At the time of writing, a RemoteExpert client always connects to a runtime via an IP and a port. This is impractical since participants may have have no open ports. We could instead implement direct p2p connections via dht.

Faster lookup
The majority of keys stored in our DHT are expert prefixes used by gating function. These prefixes are not meant to live indefinitely: if no runtime sends prefix information in $t$ seconds, we can safely delete it.

However, Kademlia assumes that if you store a key, this key must be still there after some time. This causes many inefficiencies such as wasting memory on stale keys or "refreshing" keys every 1 or 24 hours.

Wrap P2P stream_open

[based on @MaximKsh 's ongoing work on #149 ]

Implement a functionality in hivemind.P2P for communicating with other P2P instances

Ideally, we would like to support the following functionality:

### process 1 ###
peer1 = hivemind.P2P()
# runs p2pd in background, instantiates p2pclient, starts listening for incoming connections

async def my_function(x):
   return x ** 2
await peer1.add_handler('foo', my_function)

print(peer1.id) # prints long hash-like address


### process 2 ###
peer2 = hidemind.P2P()
x = 5
res = await peer2.call(PEER1_ID_HERE, 'foo', x)

assert res == 25

Functionality-wise,

  • start p2pd with recommended parameters
    • add explicit parameters to P2P.__init__, add docstring for what is relevant for hivemind
    • support two recommended settings: full node (white IP) and client-only (behind NAT or firewall)
    • allow bootstrapping with initial_peers or IPFS peers (default)
  • Instantiate p2pclient inside p2pd
    • install p2pclient as a part of requirements
    • check for successful start using client.identify() instead of sleep(0.2)
    • store peer ID in hivemind.P2P.id
  • allow user-defined protocol handlers
    • [async] def add_handler(peer, protocol, *args, **kwargs)
    • async def P2P.call(peer, protocol, *args, **kwargs)
    • make sure that call works correctly with large messages (e.g. 100mb)
    • make sure that call can be cancelled in asyncio (and the recipient is properly notified)
    • add test for call between two P2P instances (normal call, unknown peer/protocol, cancelled call, custom initial_peers)

p2pd test nat traversal

image

Create a simple setup with 3 nodes where

  • node S1 starts first; it is available publicly; it is a full DHT node
  • nodes A and B are bootstrap-ed from S1 and
  • all nodes use QUIC with secio tls/noise

TODO:

  • check that nodes A and B can communicate directly if they are not behind NAT (localhost-only)
  • check that nodes A and B can communicate if they are behind NAT (ping me if you need access to a VM for S1)

v0.9 refactoring concerns

As before ( #64 ), we have a number of things we may want to do right before the v0.9 release

  • RemoteMixtureOfExperts uses grid_size while server side uses expert_pattern (e.g. ffn.[0:256].[0:256]), should we switch to expert_pattern everywhere? (@justheuristic )

  • Should we use slots for data structures such as _IntermediateResult

  • Should we switch to declaring objects bottom-up vs top-down? For instance, _IntermediateResult is used before its declared (@mryab )

  • theguyshetoldyounottoworryabout rename it? (@mryab )

  • Can we get rid of return_futures=True option in node.get_*

  • rename _IntermediateResult to SearchState, rationale: it is not necessarily a result, it also stores search data other than the result (@justheuristic )

    • await SearchState.future -> await SearchState?
    • SearchState.future.cancel() -> SearchState.finish_search()?
  • rename LocalStorage to TimedStorage (or similar)? reason: it is not always used as node's local storage (@justheuristic )

  • make alias DHTID.from(x) = DHTID.generate(source=x)? (@justheuristic )

  • why are we using umsgpack instead of just msgpack? (@justheuristic )

  • we force the same vector compression rules during forward & backward pass #99 (@Vsevolod-pl )

  • update documentation on hivemind.dht: subkeys, beam_search, no first_k_active

  • update documentation on hivemind.server: compression

  • consider setting maxsize to DHTValueType, e.g. 10k

  • similar: consider forcing maxsize on any DHT record

  • rename hivemind.utils.grpc -> hivemind.utils.protobuf ?

  • tests still raise warning about non-copyable ndarray in (de)serialize_torch_tensor

  • remove receiver_threads in DHT and DecetralizedAverager, hard-code 1 thread (@justheuristic )

  • update grpc version, replace grpc.experimental.aio to grpc.aio ( grpc/grpc#23240 )

  • in hivemind.dht.DHT._get(endpoint) we're currently not caching channels at all. Should we?

  • Investigate segfault in our circleci env (@mryab @justheuristic )

  • perhaps we should move all heavy class definitions out of init.py modules (@mryab )

  • Allow different tensor compression schemes for backward pass? #99

  • rewrite test_dht_protocol, test_empty_table and test_dht_node to use await instead of loop.run_until_complete

  • what will the server do if it is given an input with incorrect requires_grad? (not as in schema)

    • it will actually ignore requires_grad completely and override it in expert_backend.backward.
    • Should we respect schema's requires_grad in expert_backend.backward()?
  • proper channel caching: we could implement a centralized process-local channel cache

  • since we're using gRPC for all requests, we may be able to share one port for everything

  • should we replace prints in tests/benchmarks with logger.info? Do we need as many prints in dht tests? (@mryab , see #116 comments )

  • currently test_utils is a package containing a single function used on just one occasion, should we just move it there?

  • should we make our daemons resistant to KeyboardInterrupt when they are running in background?

    • problem symptom: you're running in jupyter, you press ctrl+C and it kills all dht daemons
    • applies to: DHT, DecentralizedAverager, Server
  • naming collision: hivemind.utils.grpc actually points to the external package, not hivemind/utils/grpc.py. This is due to wildcard imports (from hivemind.utils.grpc import *). Should we switch away from them?

  • DHT.get_my_endpoint() implemented by pinging k random peers

  • currently we have no limit on gRPC message size in ConnectionHandler , this is a potential vulnerability for open infrastructure. Should we impose an upped limit and send large tensors in chunks?

  • TimedStorage: add .pop() method

  • DHT/DecentralizedAverager - rename return_future to something intuitive, e.g. sync=False

  • MPFuture: make sure await asyncio.create_task(mpfuture) works without extra wrappers

  • In DecentrlizedAverager there is a lock_averaged_tensor with is not an asyncio-friendly lock. If we acquire it for long, the averager will be paralyzed! We should run it in executor or make a special mp+aio lock.

Technical debt: RemoteMixtureOfExperts (v0.8)

  • beam search uses tuple endpoints (i.e. address, port), while dht switched to string endpoints
  • beam search needs one extra step in beam search because prefix.123.321 != expert.123.321
  • we may no longer need parallel autograd if it is implemented in pytorch (not the case)
    • remove hivemind.utils.autograd in favor of _RemoteExpertCallMany
  • add a more feature-rich test for moe.py (with several DHT nodes and experts)
  • cancel unused queries in first_k_active?
  • when declaring experts, introduce some kind of "grace period" - only "declare" prefixes that have not been updated for that period. (rationale: first prefixes are likely to be already updated by other peers)

Allow non-deterministic layers in DMoE

Right now if an ExpertBackend uses dropout or other non-derterminism, it will use different random during forward and backward passes.

Torch has previously solved this problem via gradient checkpoint - see the source code. Can we implement something similar?

  • find out a way to enforce the same random state on forward and backward pass
  • implement some basic test to verify that backprop through stochastic layers is computed correctly

Please use manage_technical_debt branch

Run tensor (de)serialization in executor

(as reported by @mryab )
Currently, both hivemind.server.connection_handler and hivemind.client.averaging.DecentralizerAverager run serialize_torch_tensor and deserialize_torch_tensor directly in asyncio loop.

This may increase latency, especially when using tensor compression. We should try running it in executor and see if it affects the performance in any way.

Better support for custom experts in Server

As of right now, hivemind.Server supports arbitrary expert class with fixed input sizes, but it is impossible to deploy custom experts with hivemind-server script. As a result, users who need custom experts must launch hivemind.Server manually from python.

We must refactor the server creator to support custom experts:

  • Implement register_expert decorator in hivemind/layers/register.py that adds a given expert
@hivemind.register_exprert('my_expert_name', sample_inputs=lambda hid_dim: (torch.randn(3, 128, hid_dim))
class MyCustomExpert(torch.nn.Module)
   ...
  • make sure that the decorator returns an unmodified expert module
  • add an assertion that forbids registering multiple experts with the same name
  • re-write hivemind/server/layers to use register_expert instead of manually adding experts to global dictionaries
  • implement get_expert method in hivemind/layers/register.py
  • in hivemind-server script, add an option to import a custom module (that may or may not register experts inside)
  • write a unit test in hivemind/tests/test_moe.py that creates a server with a custom expert, then runs forward and backward for this expert

Quantile compression [benchmark performance]

Implement quantile compression:

  • given a tensor and a set of 255 quantiles ([0, 1] at uniform intervals), pack tensor into int8 buffer
  • unpack back into float32/float16 on decoding
  • benchmark performance (compression + decompression with 1M / 100M parameters)
  • Use @mponty 's implementation

Gating function averaging

In our preliminary experiments, all peers have independent gating functions and we can only synchronize them manually. It would be great to implement some sort of builtin averaging mechanism.

For instance, every T seconds, assemble peers into groups at random, then perform all-reduce within each group. In case of failure, rollback and repeat T seconds later.

python 3.7 support

Can we make tesseract work in python3.7? We used to requre py3.8 features like multiprocessing.shared_memory but they are gone now.

If we can run in py3.7 after minor modiffications, please apply those modiffications and modify compatibility list in setup.py.

Discussion: p2p security

Right now, hivemind works with default tcp protocol with no security. If we are to run on a global scheme, we need to:

1. make sure peers do not risk their personal data by running hivemind nodes
This is the boring, but necessary one: we need to make sure we use up-to-date security protocols in both hivemind.dht and hivemind.client/server interaction

2. figure out how to make hivemind model resistant to malicious peers
This part is more tricky, there are several attack vectors:
2a. send NaN or wrong gradients to the expert in order to jeopardize its parameters
2b. overwrite existing keys in DHT with wrong information to prevent other peers from finding experts

There are intuitive ways to resist both of these vectors by e.g. dropping outlier gradients in 2a, but security usually works better with expertise, not intuition.

We would appreciate any advice on potential attack vectors and ways to mitigate them.

optimize load_state_from_peers

problem: if many peers join at once, they will all pick one averager (latest at the time) as a target for loading initial state. This is causes choke points as one averager struggles to service many newcomers.

possible solution:

  • modify the RPC load state so that each averager can only service T clients at a time. All subsequent clients get enqueued and wait.
  • modify DecentralizedAverager.load_state_from_peers to dynamically switch away to alternative donor averagers after the first one sends you away
  • [optional] also negotiate for bandwidth: donors reveal their free bandwidth and clients can pick the best host
  • remove split_for_streaming/combine_from_streaming

Add test coverage, run tests on commit

Currently we test commits manually via tests/benchmark_throughput.py. It would be way better to implement automatic tests that cover current implementation.

At least, we should cover:

  • set up automatic tests on commit (thx UARTMan)
  • Creating server and running a request via RemoteExpert
  • Training a small network on a synthetic dataset - test that it converges (on CPU)
  • Gating function: run DMoE gating function with several servers
  • Network: figure out a way to test network fault-tolerance (after we refactor it)
  • Massive utilities test (after #54 )
  • Measure code coverage via CodeCov
  • version matrix: py3.{7,8,9}

Roadmap

This is a global project roadmap that states our priorities for the nearest future. These priorities can and should be disputed here or elsewhere, after which we will update the roadmap.

v0.7 "It runs something" (released)

  • convert internal hivemind code to an open-source library
  • switch from dmueller/kademlia to an internal DHT implementation
  • run proof-of-concept experiments

v0.8 "It runs at scale" (released)

  • Ensure DHT scales to 1000+ nodes
    • Switch from rpcudp to gRPC (due to scalability issues)
  • Optimizer hivemind.Server for large amount of small tensors
  • Implement parallel backward in RemoteMixtureOfExperts
  • Add benchmarks for MoE and DHT performance
  • Publish to PyPI

v0.9 "It trains something" (released)

  • Averaging gating function over peers #95
  • Speed up beam search #92
  • Optional tensor compression / quantization for RemoteExpert #88
  • Refactoring concerns #98
  • Open-source experiments outside our test infrastructure ( e.g. this )

v0.10 "You can train with us" (released)

  • Extended tutorials:
    • step by step tutorial on model training with DecentralizedAverager (#219 )
    • tutorial on defining and training custom experts (postponed)
  • Make it easier to contribute compute to hivemind:
    • NAT traversal for household PCs ( #165 )
    • Support running hivemind in google colab / from behind firewalls (#146 #147 )
  • Further optimizations for DecentralizedAverager
    • Elastic scaling of moshpit averaging with the number of active trainers (found workaround for now)
    • Advanced compression strategies to reduce communication throughput ( #170 #195 )
  • Distributed training security
    • audit security issues #93
    • make it difficult for a malicious peer to jeopardize training ( #219 , TBC)
  • Refactoring concerns #98

v1.0 "most of the code makes sense without reading the source" (nov-dec)

  • overhaul optimizers
    • must work decently with default parameters for both examples
    • see optimizer roadmap in #398
  • update quickstart.md
    • use the the new optimizer instead of DecentralizedSGD
  • overhaul DHT benchmark
  • add Optimizer benchmark

v1.1 "You can set up collaborative training easily"

Target scenario: 100 volunteers training 2xl-like over the internet

  • add more examples
    • at least one should include set up guide
  • additional tutorial with computer vision (dino, imagenet, dalle?)
  • Do something about the number of open files
    • investigate what contributes to # open files
    • is there a (cheap) way to reduce that to at 4096 (or 4096) without compromising performance?
  • Support training with only client and aux peers
    • (A) ensure that aux peers can download state from clients or
    • (B) add an option for aux peer to pretend as normal with batch size = 0
  • more extreme compression: powerSGD variant(s)
  • investigate QUIC at scale
    • test hole punching
    • make sure our config fully supports relays
  • Remove duplicate CI runs
  • Add warnings to typical failure modes
  • Deprecate CollaborativeOptimizer & co

1.2 Decentralized Model-parallelism

Target scenario: 500 peers training 1B+ over the internet

  • libp2p in hivemind.server
  • proper LoadBalancedExpert
  • hivemind.Optimizer in hivemind.server
  • FP16 in hivemind.server

Important, but not urgent

  • more extreme compression: some way to integrate BNB directly
  • Security: option to use CenteredClip
  • Some means of saving expert snapshot in a fault-tolerant way #94
  • Some means for storing the training data (a-la scientific torrents)
  • enhanced API of hivemind.Optimizer (extract all necessary methods of StateAverager/ProgressTracker)
  • moshpit + elasticity
  • alternative linear programming variants

Make RemoteMixtureOfExperts timeouts more general

As of now, forward/backward_timeout arguments correspond only to timeouts for Server interactions. However, this is not the only possible cause of freezes: for example, beam search might take too long to finish during the forward pass. To fix this, we need to make both timeouts more general (that is, they should measure the execution time of the entire function)

Technical Debt

tesseract.client

  • MoE does not support failures during backward
  • RemoteExpert doesn't check if inputs are valid (should check via info)

tesseract.runtime

  • non-deterministic layers use different random during forward and backward passes
  • layers with batchnorm update stats twice. Update during backward only? upd: we now assume that forward is a pure deterministic function; moved discussion to a separate thread
  • form_batch: when batch is nearly full and the next task has multiple examples, pool adds this task to a batch even if batch size exceeds max_batch_size
  • we probably can shutdown TesseractRuntime without using a dedicated mp.Pipe, see also: eventfd

tesseract.utils

  • SharedFuture: cancel, set_running_or_notify_cancel

tesseract.tests

  • sometimes interrupting tests/benchmark_throughput.py leaves process hanging (only responds to sigterm/sigkill), find & eliminate the cause

Server configuration scripts

(@mryab 's suggestions)

Right now we have to create server configs manually in python. Lets create an option to bootstrap a server from a config file!

  • we made some progress towards this in make_dummy_server, but it needs refactoring out of test_utils
  • we may (or may not) benefit from existing config engines such as https://hydra.cc

Optimize the tests

Right now, our tests can take upwards of 10 minutes both in CircleCI and locally, which slows down the development workflow and leads to unnecessary context switches. We should find a way to reduce the time requirements and make sure it stays that way.

  • Identify and speed up the slow tests. Main culprits: multiple iterations in tests using random samples, large model sizes for DMoE tests.
  • All the tests in our CI pipelines run sequentially, which increases the runtime with the number of tests. It is possible to use pytest-xdist, since the default executor has 2 cores.
  • Add a global timeout to ensure that future tests don't introduce any regressions

Decentralized adaptive optimizers - build a testbed

We need a simple training configuration where we can test the effects of different strategies for decentralized adaptive optimizers

Known strategies

Implement

  • a basic training setup for CIFAR10 or wikitext2 (ping me @friday, we'll figure it out)
  • naive strategy with averaging parameters and optimizers
  • alternative: paper2 from above

Compare convergence with equal bandwidth load

Regular checkpointing by Server

(based on @mryab 's suggestion)

Add an option for hivemind.server.Server to save all experts regularly. Probably as a background thread. Hopefully not reducing Runtime throughput.

basic p2pd wrapper

  • create a hivemind/hivemind/p2p package (init.py and all)
  • implement class P2P that manages p2pd process
    • start one in background with kwargs
    • kill it on del or process exit
    • one solution is to use a subprocess
  • create a test that makes sure that the p2pd process is killed on del

Create Dockerfile, publish images

It would be really useful if we had a simple Dockerfile containing the build instructions for the library. Then, we could create an image which would be easy to distribute and run for the volunteer participants without setting up the environment themselves.

Sample experts with virtual batching, learning rate schedule, etc

proposed by @mryab from #126

Virtual batching and LR scheduling are popular techniques with many applications. It would be nice to have an example of how to implement them with hivemind.

As of 0.8.15, ExpertBackend supports two ways of doing so:

  • hacky way: by wrapping over optimizer and implementing LR schedule in opt.step
  • orthodox way: by subclassing ExpertBackend and implementing apply_gradients however we want

Support communication-efficient optimizers

<edit>
The goal of this project is to implement pre-configured decentralized optimizers such as DADAM and decentralized Local AMSGrad. We intend to create a general interface that runs adaptive optimizers with periodic averaging of gradients, parameters and optimizer statistics on different time-frames]

The final version should support:

  • wrapping any adam-like optimizers such as Adam, AMSGrad, LAMB(FusedLAMB), Radam, etc
  • full torch.optim.Optimizer interface including state_dict/load_state_dict
    • for parameter groups, either support it or explicitly throw an error
  • averaging optimizer statistics in background while the model is training, incorporating local updates after the averaging is done
  • have a default confguration that converges :)
    • test convergence locally with a toy task
    • test in standard ALBERT-base setup for wikitext103
    • test with ALBERT-large on OpenBookCorpus

</edit>

Code draft:

# usage:
class LAMBWithAveraging(FusedLAMB, AdaptiveOptimizer):
    def get_averaged_statistics() -> Sequence[Tensor]
       return TODO()

opt = DecentralizedOptimizer(model, base_optimizer=LAMBWithAveraging(model.parameters()),
                             prefix='MY_UNIQUE_STRING')


### code

class AdaptiveOptimizer(abc.Mixin):
   def get_averaged_statistics(self):
     raise NotImplementedError()

class DecentralizedOptimizer(abc.meta):
    def __init__(self, parameters, base_optimizer, dht, prefix: str, ..., ):
       assert isinstance(base_optimizer, AdaptiveOptimizer), "TODO see example here"

       self.parameters = list(parameters)  # on GPU
       self.base_optimizer = base_optimizer
       self.global_step = ...
       
       zeros_like_grads = [torch.zeros_like(x.detach().cpu()) for x in self.parameters]
       self.grad_averager = DecentralizedAverager(
           averaged_tensors= zeros_like_grads  # average gradients only
            prefix=prefix+'_grad_averager', **some_kwargs
           )
       
       parameters = [x.cpu().detach().float() self.parameters()]
       averaged_statistics = [x.cpu().detach().float() for x in base_optimizer.get_averaged_statistics()]
       
       self.almost_v_averager = DecentralizedAverager(
           averaged_tensors= parameters + averaged_statistics  # average params and second momenta
            prefix=prefix+'_v_averager', **some_other_kwargs
           )
       self.v_averaging_job = None
       
    def get_model_params_and_v(self) -> Tuple[torch.Tensor, ...]:
        TODO()
              
    def step(self):
       accumulate_grads()
       report_progress_to_dht()
       
       if self.v_averaging_job.done():
          local_params_and_v = self.paramters + self.base_optimzier.get_averaged_stats()
          with self.almost_v_averager.get_tensors() as averaged_params_and_v:
             copy_from(averaged_params_and_v).to(local_params_and_v)

       
       if accumulated_enough():
       
           local_grads = [x.grad for x in model.parameters()]
           with self.grad_averager.get_tensors() as averaged_grads:
               copy_from(local_grads).to(averaged_grads)
           self.grad_averager.step()
           with self.grad_averager.get_tensors() as averaged_grads:
               copy_from(averaged_grads).to(local_grads)
           # TODO maybe average gradients continuously in background
             
           if not self.use_local_statistics:
               TODO(revert base_optimizer.get_averaged_statistics() to prev averaged state)
           self.base_optimizer.step()
           self.global_step += 1  # MAKE SURE IT IS in sync with collaboration
           
           if not self.use_local_statistics:
               TODO(revert optimizer statistics back to local ones AFTER local update)
           
           
           ### AVERAGE PARAMETERS AND STATISTICS! ###
           if self.global_step % self.K == 0:
               if not v_averaging_job.done():
                   if self.await_v_averager:
                      v_averaging_job.wait()
                      local_params_and_v = self.paramters + self.base_optimzier.get_averaged_stats()
                      with self.almost_v_averager.get_tensors() as averaged_params_and_v:
                         copy_from(averaged_params_and_v).to(local_params_and_v)
            
                   else:
                       warning("Param averager is taking too long, TODO explain why we skip")
                       return
           
               local_params_and_v = self.paramters + self.base_optimzier.get_averaged_stats()
               with self.almost_v_averager.get_tensors() as averaged_params_and_v:
                   copy_from(local_params_and_v).to(averaged_params_and_v)
               
               self.v_averaging_job = run_in_background(almost_v_averager.step())
           
       
    def zero_grad(self):
       self.opt.zero_grad()
       zero_grads_in_self_grad_averager()
       self.report_to_dht()

Proper macOS support

When creating DHT, calling run_in_background makes the process wait forever. So tests don't finish.

Faster RemoteMixtureOfExperts beam_search

Right now, RemoteMixtureOfExperts.beam_search can take hundreds of milliseconds if at least some of DHT peers are slow to respond. This is initially due to two bottlenecks:

  • beam search needs a lot of communication between server and local dht node
  • dht itself awaits for all peers to respond

Suggestions:

Move beam search to hivemind.dht.DHT (as discussed with @mryab)
Right now, every beam search call invokes hundreds of requests to a DHT. Each request creates a separate MPFuture (pipe-based) which induces communication overhead. Let's instead implement beam search inside hivemind.dht.DHT and only make one request per search (or even per batch).

Asynchronous beam search (as proposed by @unconst)
A beam search for every batch happens in node's local cache. All experts that are not in cache are considered absent for this search, but there is a background process that constantly adds new experts.

Variant: specify a time budget, search across the DHT for up to this many seconds. On hitting the deadline, reuse what you can from cache.

Better DHT prefixes (by @me)
Right now, DHT prefixes are stored as binary entries. This means that if we have a key structure of [0..999].[0..999].[0..999], each beam search step requires 999 requests to a DHT in the worst case.

To avoid this complexity, we can support an additional DHT data type: unordered set.
A key of this type has multiple values with independent expirations.

  • on STORE(key, subkey, value, expiration), add (subkey, value) to this set until expiration
  • on GET(key), retrieve all entries currently associated with this set

With this data type, we can store prefixes as dictionaries {next_index -> expert}, e.g.

DHT["expert.110."] = {
   "1.": "expert.110.1.15",
   "20.": "expert.110.20.119",
   ...
   "248.": "expert.110.248.119"
}

So, we no longer need first_k_active: we just retrieve the prefix and it automatically tells us all the active suffixes.

Variant: during traverse_dht, collect all matching dictionaries and combine their keys.

Negative caching:(by Dmitry Afanasiev)
Every time a DHT key is not found, add it to a negative cache for T seconds. During that time, all subsequent GET requests return None without search. If a key released from a negative cache and is not found again, add it to a negative cache for 2 * T seconds. When a key is found (or stored on this node), reset the negative cache timer to 0.

Variant: when a key is released from negative cache, search for it in background without stalling the beam search procedure?

Handle errors in client-runtime interaction

Right now, if a RemoteExpert sends incorrect data to a TesseractServer, the task will not return anything, making it hard to debug. The same problem occurs when runtime fails for any other reason (e.g. OOM).

We can do better than this:

  • add option to check input correctness when forming a batch (TaskPool.form_batch), weed out malformed tasks BEFORE forming a batch
  • in case of an exception, propagate some exception to RemoteExpert
  • make sure runtime/server does NOT stop after one failed batch

Distributed expert snapshots

Right now, if a peer leaves the network permanently, his experts are gone with him. Instead, it should be possible to save best experts by using p2p storage similar to BitTorrent.

There are several ways to implement such storage and we should first figure out which one will best suit our needs:

  • the storage must be fault-tolerant
  • transferring snapshots must not cause too much strain on network bandwidth
  • there should be a mechanism by which peers can decide which of these abandoned experts they should take up first

If you're willing to contribute to such system, please leave a comment below.

Support tensor compression (fp16/int8)

In preliminary tests we observed

  • no quality degradation from converting tensors to fp16 and back before sending them across
  • negligible drawdown when tensors are encoded as fp32 mean/std and int8 data

It would be great to enable quantization as a builtin feature for ExpertBackend, e.g.

expert = ExpertBackend(..., outputs_schema=[BatchTensorDescriptor(..., compression="int8_normalize")]

Support client-only participants in AllReduceProtocol

Here's AllReduceProtocol's inner logic: . It implements Butterfly All-Reduce

Note the part_sizes - these are fractions of the averaged vector assigned to each respective peer.

Request:

  • if support averaging with client-mode peers:

  • Client-mode peer always has part_sizes[i]==0

  • Such peers can call aggregate_part from others, but no one should call aggregate_part on the client-mode peer

  • implement a basic test case similar to this, but with 1-2 client-only peers

  • update AllreduceRunner to work with the updated AllreduceProtocol

Move dummy Server to a separate process in tests

Currently for testing purposes we create hivemind.Server in a background thread for each test. It seems that CUDA initialization happens somewhere in this thread, so we can't have two tests using server communication in a single file (otherwise we have CUDA initialization error).

A better testing strategy would be to start a server in a separate process, because that is intended usage of a server and the entire backend logic is built around process (or network) communication. Moreover, it should enable us to run tests on GPU given the previous paragraph.

Setup hivemind.DHT in google colab

  • figure out a way to install hivemind in google colab

    • at the minimum, it must be able to run hivemind.DHT with listen=False
    • if you are able to install full hivemind in colab's python 3.7, please also add CI tests for python3.7 here
  • Test connectivity:

    • two DHT instances: one in colab with listen=False, another on a dedicated server with listen_on="[::]:1337"
    • check that colab-based DHT node can interact with an external DHT node (get/store value)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.