Giter Site home page Giter Site logo

google-deepmind / reverb Goto Github PK

View Code? Open in Web Editor NEW
701.0 26.0 92.0 1.9 MB

Reverb is an efficient and easy-to-use data storage and transport system designed for machine learning research

License: Apache License 2.0

Starlark 5.29% Python 19.29% Dockerfile 0.23% Shell 0.63% C++ 74.30% C 0.24%

reverb's Introduction

Reverb

PyPI - Python Version PyPI version

Reverb is an efficient and easy-to-use data storage and transport system designed for machine learning research. Reverb is primarily used as an experience replay system for distributed reinforcement learning algorithms but the system also supports multiple data structure representations such as FIFO, LIFO, and priority queues.

Table of Contents

Installation

Please keep in mind that Reverb is not hardened for production use, and while we do our best to keep things in working order, things may break or segfault.

⚠️ Reverb currently only supports Linux based OSes.

The recommended way to install Reverb is with pip. We also provide instructions to build from source using the same docker images we use for releases.

TensorFlow can be installed separately or as part of the pip install. Installing TensorFlow as part of the install ensures compatibility.

$ pip install dm-reverb[tensorflow]

# Without Tensorflow install and version dependency check.
$ pip install dm-reverb

Nightly builds

PyPI version

$ pip install dm-reverb-nightly[tensorflow]

# Without Tensorflow install and version dependency check.
$ pip install dm-reverb-nightly

Debug builds

Starting with version 0.6.0, debug builds of Reverb are uploaded to Google Cloud Storage. The builds can be downloaded or installed directly via pip following the patterns below. gsutils can be used to navigate the directory structure to ensure the files are there, e.g. gsutil ls gs://rl-infra-builds/dm_reverb/builds/dbg. To build your own debug binary, see the build instructions.

For python 3.8 and 3.9 follow this pattern:

$ export reverb_version=0.8.0
# Python 3.9
$ export python_version=39
$ pip install https://storage.googleapis.com/rl-infra-builds/dm_reverb/builds/dbg/$reverb_version/dm_reverb-$reverb_version-cp$python_version-cp$python_version-manylinux2010_x86_64.whl

Build from source

This guide details how to build Reverb from source.

Reverb Releases

Due to some underlying libraries such as protoc and absl, Reverb has to be paired with a specific version of TensorFlow. If installing Reverb as pip install dm-reverb[tensorflow] the correct version of Tensorflow will be installed. The table below lists the version of TensorFlow that each release of Reverb is associated with and some versions of interest:

  • 0.13.0 dropped Python 3.8 support.
  • 0.11.0 first version to support Python 3.11.
  • 0.10.0 last version to support Python 3.7.
Release Branch / Tag TensorFlow Version
Nightly master tf-nightly
0.14.0 v0.14.0 2.14.0
0.13.0 v0.13.0 2.14.0
0.12.0 v0.12.0 2.13.0
0.11.0 v0.11.0 2.12.0
0.10.0 v0.10.0 2.11.0
0.9.0 v0.9.0 2.10.0
0.8.0 v0.8.0 2.9.0
0.7.x v0.7.0 2.8.0

Quick Start

Starting a Reverb server is as simple as:

import reverb

server = reverb.Server(tables=[
    reverb.Table(
        name='my_table',
        sampler=reverb.selectors.Uniform(),
        remover=reverb.selectors.Fifo(),
        max_size=100,
        rate_limiter=reverb.rate_limiters.MinSize(1)),
    ],
)

Create a client to communicate with the server:

client = reverb.Client(f'localhost:{server.port}')
print(client.server_info())

Write some data to the table:

# Creates a single item and data element [0, 1].
client.insert([0, 1], priorities={'my_table': 1.0})

An item can also reference multiple data elements:

# Appends three data elements and inserts a single item which references all
# of them as {'a': [2, 3, 4], 'b': [12, 13, 14]}.
with client.trajectory_writer(num_keep_alive_refs=3) as writer:
  writer.append({'a': 2, 'b': 12})
  writer.append({'a': 3, 'b': 13})
  writer.append({'a': 4, 'b': 14})

  # Create an item referencing all the data.
  writer.create_item(
      table='my_table',
      priority=1.0,
      trajectory={
          'a': writer.history['a'][:],
          'b': writer.history['b'][:],
      })

  # Block until the item has been inserted and confirmed by the server.
  writer.flush()

The items we have added to Reverb can be read by sampling them:

# client.sample() returns a generator.
print(list(client.sample('my_table', num_samples=2)))

Continue with the Reverb Tutorial for an interactive tutorial.

Detailed overview

Experience replay has become an important tool for training off-policy reinforcement learning policies. It is used by algorithms such as Deep Q-Networks (DQN), Soft Actor-Critic (SAC), Deep Deterministic Policy Gradients (DDPG), and Hindsight Experience Replay, ... However building an efficient, easy to use, and scalable replay system can be challenging. For good performance Reverb is implemented in C++ and to enable distributed usage it provides a gRPC service for adding, sampling, and updating the contents of the tables. Python clients expose the full functionality of the service in an easy to use fashion. Furthermore native TensorFlow ops are available for performant integration with TensorFlow and tf.data.

Although originally designed for off-policy reinforcement learning, Reverb's flexibility makes it just as useful for on-policy reinforcement -- or even (un)supervised learning. Creative users have even used Reverb to store and distribute frequently updated data (such as model weights), acting as an in-memory lightweight alternative to a distributed file system where each table represents a file.

Tables

A Reverb Server consists of one or more tables. A table holds items, and each item references one or more data elements. Tables also define sample and removal selection strategies, a maximum item capacity, and a rate limiter.

Multiple items can reference the same data element, even if these items exist in different tables. This is because items only contain references to data elements (as opposed to a copy of the data itself). This also means that a data element is only removed when there exists no item that contains a reference to it.

For example, it is possible to set up one Table as a Prioritized Experience Replay (PER) for transitions (sequences of length 2), and another Table as a (FIFO) queue of sequences of length 3. In this case the PER data could be used to train DQN, and the FIFO data to train a transition model for the environment.

Using multiple tables

Items are automatically removed from the Table when one of two conditions are met:

  1. Inserting a new item would cause the number of items in the Table to exceed its maximum capacity. Table's removal strategy is used to determine which item to remove.

  2. An item has been sampled more than the maximum number of times permitted by the Table's rate limiter. Such item is deleted.

Data elements not referenced anymore by any item are also deleted.

Users have full control over how data is sampled and removed from Reverb tables. The behavior is primarily controlled by the item selection strategies provided to the Table as the sampler and remover. In combination with the rate_limiter and max_times_sampled, a wide range of behaviors can be achieved. Some commonly used configurations include:

Uniform Experience Replay

A set of N=1000 most recently inserted items are maintained. By setting sampler=reverb.selectors.Uniform(), the probability to select an item is the same for all items. Due to reverb.rate_limiters.MinSize(100), sampling requests will block until 100 items have been inserted. By setting remover=reverb.selectors.Fifo() when an item needs to be removed the oldest item is removed first.

reverb.Table(
     name='my_uniform_experience_replay_buffer',
     sampler=reverb.selectors.Uniform(),
     remover=reverb.selectors.Fifo(),
     max_size=1000,
     rate_limiter=reverb.rate_limiters.MinSize(100),
)

Examples of algorithms that make use of uniform experience replay include SAC and DDPG.

Prioritized Experience Replay

A set of N=1000 most recently inserted items. By setting sampler=reverb.selectors.Prioritized(priority_exponent=0.8), the probability to select an item is proportional to the item's priority.

Note: See Schaul, Tom, et al. for the algorithm used in this implementation of Prioritized Experience Replay.

reverb.Table(
     name='my_prioritized_experience_replay_buffer',
     sampler=reverb.selectors.Prioritized(0.8),
     remover=reverb.selectors.Fifo(),
     max_size=1000,
     rate_limiter=reverb.rate_limiters.MinSize(100),
)

Examples of algorithms that make use of Prioritized Experience Replay are DQN (and its variants), and Distributed Distributional Deterministic Policy Gradients.

Queue

Collection of up to N=1000 items where the oldest item is selected and removed in the same operation. If the collection contains 1000 items then insert calls are blocked until it is no longer full, if the collection is empty then sample calls are blocked until there is at least one item.

reverb.Table(
    name='my_queue',
    sampler=reverb.selectors.Fifo(),
    remover=reverb.selectors.Fifo(),
    max_size=1000,
    max_times_sampled=1,
    rate_limiter=reverb.rate_limiters.Queue(size=1000),
)

# Or use the helper classmethod `.queue`.
reverb.Table.queue(name='my_queue', max_size=1000)

Examples of algorithms that make use of Queues are IMPALA and asynchronous implementations of Proximal Policy Optimization.

Item selection strategies

Reverb defines several selectors that can be used for item sampling or removal:

  • Uniform: Sample uniformly among all items.
  • Prioritized: Samples proportional to stored priorities.
  • FIFO: Selects the oldest data.
  • LIFO: Selects the newest data.
  • MinHeap: Selects data with the lowest priority.
  • MaxHeap: Selects data with the highest priority.

Any of these strategies can be used for sampling or removing items from a Table. This gives users the flexibility to create customized Tables that best fit their needs.

Rate Limiting

Rate limiters allow users to enforce conditions on when items can be inserted and/or sampled from a Table. Here is a list of the rate limiters that are currently available in Reverb:

  • MinSize: Sets a minimum number of items that must be in the Table before anything can be sampled.
  • SampleToInsertRatio: Sets that the average ratio of inserts to samples by blocking insert and/or sample requests. This is useful for controlling the number of times each item is sampled before being removed.
  • Queue: Items are sampled exactly once before being removed.
  • Stack: Items are sampled exactly once before being removed.

Sharding

Reverb servers are unaware of each other and when scaling up a system to a multi server setup data is not replicated across more than one node. This makes Reverb unsuitable as a traditional database but has the benefit of making it trivial to scale up systems where some level of data loss is acceptable.

Distributed systems can be horizontally scaled by simply increasing the number of Reverb servers. When used in combination with a gRPC compatible load balancer, the address of the load balanced target can simply be provided to a Reverb client and operations will automatically be distributed across the different nodes. You'll find details about the specific behaviors in the documentation of the relevant methods and classes.

If a load balancer is not available in your setup or if more control is required then systems can still be scaled in almost the same way. Simply increase the number of Reverb servers and create separate clients for each server.

Checkpointing

Reverb supports checkpointing; the state and content of Reverb servers can be stored to permanent storage. While checkpointing, the Server serializes all of its data and metadata needed to reconstruct it. During this process the Server blocks all incoming insert, sample, update, and delete requests.

Checkpointing is done with a call from the Reverb Client:

# client.checkpoint() returns the path the checkpoint was written to.
checkpoint_path = client.checkpoint()

To restore the reverb.Server from a checkpoint:

# The checkpointer accepts the path of the root directory in which checkpoints
# are written. If we pass the root directory of the checkpoints written above
# then the new server will load the most recent checkpoint written from the old
# server.
checkpointer = reverb.platform.checkpointers_lib.DefaultCheckpointer(
  path=checkpoint_path.rsplit('/', 1)[0])

# The arguments passed to `tables=` must be the same as those used by the
# `Server` that wrote the checkpoint.
server = reverb.Server(tables=[...], checkpointer=checkpointer)

Refer to tfrecord_checkpointer.h for details on the implementation of checkpointing in Reverb.

Starting Reverb using reverb_server (beta)

Installing dm-reverb using pip will install a reverb_server script, which accepts its config as a textproto. For example:

$ reverb_server --config="
port: 8000
tables: {
  table_name: \"my_table\"
  sampler: {
    fifo: true
  }
  remover: {
    fifo: true
  }
  max_size: 200 max_times_sampled: 5
  rate_limiter: {
    min_size_to_sample: 1
    samples_per_insert: 1
    min_diff: $(python3 -c "import sys; print(-sys.float_info.max)")
    max_diff: $(python3 -c "import sys; print(sys.float_info.max)")
  }
}"

The rate_limiter config is equivalent to the Python expression MinSize(1), see rate_limiters.py.

Citation

If you use this code, please cite the Reverb paper as

@misc{cassirer2021reverb,
      title={Reverb: A Framework For Experience Replay},
      author={Albin Cassirer and Gabriel Barth-Maron and Eugene Brevdo and Sabela Ramos and Toby Boyd and Thibault Sottiaux and Manuel Kroiss},
      year={2021},
      eprint={2102.04736},
      archivePrefix={arXiv},
      primaryClass={cs.LG}
}

reverb's People

Contributors

acassirer avatar ayzaan avatar chungshan avatar derekmauro avatar ebrevdo avatar eltociear avatar ethanluoyc avatar fastturtle avatar gribozavr avatar hawkinsp avatar hbq1 avatar jblespiau avatar jharmsen avatar lifeiteng avatar mjwillson avatar nimrod-gileadi avatar ormandi avatar pcish avatar ppwwyyxx avatar pwohlhart avatar qstanczyk avatar rchen152 avatar sabelaraga avatar sguada avatar ssingh19 avatar tfboyd avatar tkoeppe avatar tomhennigan avatar uduse avatar yilei avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

reverb's Issues

Table.queue() rate limiter not working

In the reverb.rate_limiters.Queue() docstring it's written

NOTE: Do not use this RateLimiter directly. Use Table.queue instead.

However, if I try to instantiate the following

    server = reverb.Server(tables=[
        reverb.Table(
            name='my_table',
            sampler=reverb.selectors.Fifo(),
            remover=reverb.selectors.Fifo(),
            max_size=100
            rate_limiter=reverb.Table.queue('my_queue', 100)),
    ],
        port=8000
    )

The following is thrown

File "/home/fedetask/Desktop/python-venv/lib/python3.6/site-packages/reverb/server.py", line 160, in init
rate_limiter=rate_limiter.internal_limiter,
AttributeError: 'Table' object has no attribute 'internal_limiter'

ImportError: libpython3.7m.so.1.0: cannot open shared object file: No such file or directory

I used pip to install reverb (using the exact commands in the Installation section), but got the following error when trying to import reverb.

In [1]: import reverb
2020-08-14 14:31:15.200537: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
---------------------------------------------------------------------------
ImportError                               Traceback (most recent call last)
<ipython-input-1-a0cab28380f6> in <module>
----> 1 import reverb

~/anaconda3/lib/python3.7/site-packages/reverb/__init__.py in <module>
     25 # pylint: enable=g-bad-import-order
     26
---> 27 from reverb import item_selectors as selectors
     28 from reverb import rate_limiters
     29

~/anaconda3/lib/python3.7/site-packages/reverb/item_selectors.py in <module>
     17 import functools
     18
---> 19 from reverb import pybind
     20
     21 Fifo = pybind.FifoSelector

~/anaconda3/lib/python3.7/site-packages/reverb/pybind.py in <module>
----> 1 import tensorflow as _tf; from .libpybind import *; del _tf

ImportError: libpython3.7m.so.1.0: cannot open shared object file: No such file or directory

I use anaconda and my Python version is 3.7.6.

Any suggestions? Thanks in advance.

A question about 'sample_stats' in a client.server_info() output

Hey,

I have some issues understanding a client.server_info() output regarding sample_stats.
For example, I run "Example 1: Overlapping Trajectories" from the Reverb Tutorial. Executing client.server_info() afterwards I got the output:

{'my_table': TableInfo(name='my_table', sampler_options=prioritized {
priority_exponent: 0.8
}
, remover_options=fifo: true
is_deterministic: true
, max_size=1000000, max_times_sampled=0, rate_limiter_info=samples_per_insert: 1.0
min_diff: -1.7976931348623157e+308
max_diff: 1.7976931348623157e+308
min_size_to_sample: 2
insert_stats {
completed: 2
completed_wait_time {
}
pending_wait_time {
}
}
sample_stats {
completed: 64
completed_wait_time {
}
pending_wait_time {
}
}
, signature=None, current_size=2, num_episodes=1, num_deleted_episodes=0)}

It says that 64 items were sampled. But we sampled just once a batch of 2 items consisting of 3 timesteps each. What does the number 64 mean?

Also, could you please answer is it possible to disable reverb warning messages. For example, each sampling reverb notifies that

[reverb/cc/client.cc:159] Sampler and server are owned by the same process so Table uniform_table is accessed directly
without gRPC.

which is sometimes inconvenient.

Thank you!

SIGABRT when writing arrays with varying sizes

Reverb will crash and raise signal 6 SIGABRT when writing arrays with varying sizes. Here is a short notebook for reproducing this issue. I've reproduced it on a standard colab runtime and on my local system, on both version 0.1.0 and nightly.

Here is core code and resulting error I get when running on my local system (this kills the colab runtime so it provides limit feedback there).

dummy_data = [
    (np.array([0, 1]), np.array([0, 1]), np.array([0, 1])),          
    (np.array([0, 1]), np.array([0, 1]), np.array([0, 1])),          
    (np.array([0, 1, 2, 3]), np.array([0, 1, 2, 3]), np.array([0, 1, 2, 3])),          
]

with client.writer(max_sequence_length=3) as writer:
    for step, d in enumerate(dummy_data):
        writer.append(d)
        if step >= 2:
            # In this example, the item consists of the 3 most recent timesteps that
            # were added to the writer and has a priority of 1.5.
            writer.create_item(
                table='my_table', num_timesteps=3, priority=1.5)
2020-09-20 22:02:00.793610: F tensorflow/core/framework/tensor_util.cc:94] Check failed: offset + from_data.size() <= to_data.size() (104 vs. 96)

Process finished with exit code 134 (interrupted by signal 6: SIGABRT)

I'm not sure if this use case is meant to be supported by reverb, but, if not, some more informative errors might be warranted. Is there any documentation that covers reverb's assumptions with regards to the shape and dtype of data being written to a given table (if there are any)?

CPP api

Hi, is there a plan to provide a c++ api? I'm using tensorflow c++ api because my project has real time constraints and I would be nice to have a c++ implementation of a replay buffer.

What's the best way to deal with variable episode length?

A solution provided in Acme pads all episodes to the maximum episode length, but I'm afraid that this will drastically reduce the performance if the episode length has a high variance. For example, if the maximum episode length is 300, while most episodes end at 15 steps, this could be a 20 folds decrease in data throughout. For my tasks, it's ok to always have a batch size of one (sampling one episode at a time), would this change the requirement of all items must have the same shape?

What are the best ways to deal with this situation?

Here are some ideas that came to my mind but I'm not sure if they can solve the problem:

  • create multiple tables corresponds to various padding length, and insert to different tables based the actual episode lengths
  • used delta-encoding

Related: #19 #47

Controlling the number of spawned threads

Hi all, thanks for open-sourcing this software!

I'm wondering if it's possible to somehow limit the number of threads spawned by the reverb service.

As an example, gRPC allows customizing the number of threads used to process incoming request (server_builder.cc):

https://github.com/grpc/grpc/blob/6a477958c06ef6a09e7628fd5b1bf4c174ca78b8/src/cpp/server/server_cc.cc#L59-L64
https://github.com/grpc/grpc/blob/88af358f9ec9b2a84f82b3f4e795df62c920c29b/src/cpp/server/server_builder.cc#L152-L169

However, it looks that this kind of configuration is not exposed to the user in Reverb, i.e. the user is unable to tweak these parameters:

https://github.com/deepmind/reverb/blob/da75a95c4a0aa96af797a6bed22cac3c73bc0448/reverb/cc/platform/default/server.cc#L34-L57

Am I missing something? Is it possible to somehow tweak these thread-related parameters, especially the threadpool size of the gRPC server?

Thanks!

Update priority

Since I followed PER which is described in this paper, I believe that there must exist a way to update priority every time we compute TD error on every minibatch. However, I carefully looked at all your examples but it seems no function like this existing.

Expose C++ Usage

Given that reverb is written in C++, it would be convenient to have instructions for building and using reverb directly in C++ without the Python. This would work well with the tensorflow C++ API or the pytorch C++ API.

This worked for me to build the C++ libraries:

bazel build --verbose_failures --copt="-Wno-error=stringop-truncation" --cxxopt='-std=c++17' //reverb/cc:client
bazel build --verbose_failures --copt="-Wno-error=stringop-truncation" --cxxopt='-std=c++17' //reverb/cc:table
bazel build --verbose_failures --copt="-Wno-error=stringop-truncation" --cxxopt='-std=c++17' //reverb/cc:sampler
bazel build --verbose_failures --copt="-Wno-error=stringop-truncation" --cxxopt='-std=c++17' //reverb/cc/selectors:uniform
bazel build --verbose_failures --copt="-Wno-error=stringop-truncation" --cxxopt='-std=c++17' //reverb/cc/selectors:fifo
bazel build --verbose_failures --copt="-Wno-error=stringop-truncation" --cxxopt='-std=c++17' //reverb/cc/table_extensions:base
bazel build --verbose_failures --copt="-Wno-error=stringop-truncation" --cxxopt='-std=c++17' //reverb/cc/platform/default:server

And then to include it in my project's CMakeLists.txt:

include_directories(SYSTEM ../reverb/)
# This includes the build headers such as the protobuf headers *.pb.h:
include_directories(SYSTEM ../reverb/bazel-bin/)
# To find the right version of GRPC:
include_directories(SYSTEM ../reverb/bazel-reverb/external/com_github_grpc_grpc/include/)
# To find the right version of abseil:
include_directories(SYSTEM ../reverb/bazel-reverb/external/com_google_absl/)
# the below is so that it finds protobuf
include_directories(SYSTEM /home/xander/anaconda3/envs/my_model/lib/python3.7/site-packages/tensorflow/include/)
target_link_libraries(MyModel PRIVATE /home/xander/dev/reverb/bazel-bin/reverb/cc/libclient.so
/home/xander/dev/reverb/bazel-bin/reverb/cc/checkpointing/libcheckpoint_cc_proto.so
/home/xander/dev/reverb/bazel-bin/reverb/cc/selectors/libuniform.so
/home/xander/dev/reverb/bazel-bin/reverb/cc/selectors/libfifo.so
/home/xander/dev/reverb/bazel-bin/reverb/cc/platform/default/libserver.so
/home/xander/dev/reverb/bazel-bin/external/com_google_absl/absl/random/internal/libranden.so
/home/xander/dev/reverb/bazel-bin/external/com_google_absl/absl/status/libstatus.so
/home/xander/dev/reverb/bazel-bin/external/com_google_absl/absl/random/internal/libranden_hwaes.so
/home/xander/dev/reverb/bazel-bin/external/com_google_absl/absl/random/internal/libranden_hwaes_impl.so
/home/xander/dev/reverb/bazel-bin/external/com_google_absl/absl/random/internal/libranden_slow.so
/home/xander/dev/reverb/bazel-bin/external/com_google_absl/absl/random/internal/libpool_urbg.so
/home/xander/dev/reverb/bazel-bin/external/com_google_absl/absl/synchronization/libsynchronization.so
/home/xander/dev/reverb/bazel-bin/reverb/cc/table_extensions/libbase.so
/home/xander/tensorflow-2.5.0/bazel-bin/tensorflow/libtensorflow_cc.so
/home/xander/tensorflow-2.5.0/bazel-bin/tensorflow/libtensorflow_framework.so.2
/home/xander/dev/reverb/bazel-bin/reverb/cc/libreverb_service_async_impl.so
/home/xander/dev/reverb/bazel-bin/reverb/cc/libtable.so)

And this C++:

#include <absl/types/optional.h>
#include <tensorflow/core/protobuf/struct.pb.h>
#include <reverb/cc/platform/server.h>
#include <reverb/cc/table.h>
#include <reverb/cc/selectors/uniform.h>
#include <reverb/cc/selectors/fifo.h>
#include <reverb/cc/rate_limiter.h>
#include <reverb/cc/table_extensions/interface.h>

namespace myspace {

    void create_reverb_server() {
        auto rate_limiter = std::make_shared<deepmind::reverb::RateLimiter>(
                /*sample_to_insert_ratio=*/0.5,
                /*min_size=*/0,
                /*min_diff=*/0.0, // TODO: What are min_diff and max_diff for?
                /*max_diff=*/0.0);
        auto table = deepmind::reverb::Table(
                                  /*name=*/"test_table",
                                  /*sampler=*/std::make_shared<deepmind::reverb::UniformSelector>(),
                                  /*remover=*/std::make_shared<deepmind::reverb::FifoSelector>(),
                                  /*max_size=*/(int64_t)100000,
                                  /*max_times_sampled=*/(int32_t)0,
                                  rate_limiter,
                                  std::vector<std::shared_ptr<deepmind::reverb::TableExtension>>{},
                                  absl::optional<tensorflow::StructuredValue>{}
                                  );
    }

} // namespace myspace

However, biulding this produces a linker error:

/home/xander/dev/my_model/pyplutusdata/pyplutusdata/Reverb.h:30: error: undefined reference to 'deepmind::reverb::Table::Table(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, std::shared_ptr<deepmind::reverb::ItemSelector>, std::shared_ptr<deepmind::reverb::ItemSelector>, long, int, std::shared_ptr<deepmind::reverb::RateLimiter>, std::vector<std::shared_ptr<deepmind::reverb::TableExtension>, std::allocator<std::shared_ptr<deepmind::reverb::TableExtension> > >, std::optional<tensorflow::StructuredValue>)'
  • I can't figure out why the symbol isn't found. It's declared here, defined here, and built into libtable.so here. You can see above that I'm linking against libtable.so and indeed the linker successfully finds ~Table() and RateLimiter(double, long, double, double). The Table constructor is the only symbol missing for some reason.
  • The ability to build reverb as a monolithic .so would be very convenient for this endeavor.

Segmentation fault on python 3.8 following instruction from acme

Hello all,

I am trying to install dm-acme on python 3.8.

Following the instructions on the GitHub page, I installed

pip install dm-acme
pip install dm-acme[reverb]
pip install dm-acme[tf]

However I am getting a segmentation fault error when importing reverb.

python3.8 crashed with SIGSEGV in tensorflow::OpDefBuilder::~OpDefBuilder()

I am using the following package versions:

dm_acme-0.2.1-py3-none-any.whl
dm_reverb_nightly-0.3.0.dev20210617-cp38-cp38-manylinux2010_x86_64.whl
tfp_nightly-0.14.0.dev20210618-py2.py3-none-any.whl
tf_nightly-2.6.0.dev20210618-cp38-cp38-manylinux2010_x86_64.whl
tensorflow_metadata-1.0.0-py3-none-any.whl

Do you have any idea of the source of the issue?
Thanks.

Kind regards,
Michelangelo

sampling without replacement

There doesn't seem to be a way of doing sampling without replacement.
Is this true?
Are there any plans to provide this option in future releases?

SIGSEGV: in process client segfaults when server is closed while waiting for samples

In process client seems to cause a segfault when the server stops while it is waiting (colab notebook). I wasn't able to reproduce this on the standard client (haven't tried on the tf-client).

Code to reproduce:

import reverb
import tensorflow as tf
import threading


# Initialize the reverb server.
simple_server = reverb.Server(
    tables=[
        reverb.Table(
            name='my_table',
            sampler=reverb.selectors.Fifo(),
            remover=reverb.selectors.Fifo(),
            max_size=int(1e6),
            rate_limiter=reverb.rate_limiters.MinSize(5) ,
            signature=(tf.TensorSpec((None,), tf.int64),) * 3,
            ),
    ],
    # Sets the port to None to make the server pick one automatically.
    port=None)

# NOTE: this uses the process client
client = simple_server.in_process_client()

thread = threading.Thread(target=lambda: list(client.sample("my_table", 3)))
thread.start()

# Causes a segfault (caused by the waiting client thread?)
simple_server.stop()

Error message:

Process finished with exit code 139 (interrupted by signal 11: SIGSEGV)

The tutorial uses deprecated methods

Hi, I ran the tutorial on both colab and the local machine, but it seems that some methods used in the tutorial are not supported in the current version of Reverb. Can you update the tutorial?

image

Torch support

Can we use reverb to store and retrieve PyTorch tensors? If yes, are there any performance implications?

python3.9 support

Installing reverb for python3.9 failed
ERROR: Could not find a version that satisfies the requirement dm-reverb>=0.2.0; extra == "reverb" (from tf-agents[reverb]) (from versions: none)
ERROR: No matching distribution found for dm-reverb>=0.2.0; extra == "reverb" (from tf-agents[reverb])

I also noticed that this package is marked for python3.{6,7,8} only. Is there any plan for supporting python3.9?

Thanks.

Acme's r2d2 fails due to reverb error

Hi, I am trying to run r2d2 agent from Acme, and it throws an error just after first episode is run: [reverb/cc/client.cc:159] Sampler and server are owned by the same process (21400) so Table priority_table is accessed directly without gRPC.
and the program is later killed.

Any idea why this is occurring and how can I get it to work?

Thanks!

Reverb error while running DQN

I am training a DQN agent in acme but after a while in the training I get the following error (or info??) that gets repeated over and over

[reverb/cc/selectors/prioritized.cc:213] Tree needs to be initialized because node with index 0 has approximation error 0.000137329, which exceeds the threshold of 0.0001

I searched in the code but could not find much. Any idea what's the cause of this? What's the approximation error of a node?

Storing tables on disk (vs. in-memory)

In the current implementation, I understand that all the tables are stored in the memory, which can be possibly problematic for low-memory machines / substantial amount of experience. Is there possibly some way to avoid keeping full experience buffer in the memory? I have hoped for .ckpt files to do that, but it seems that they don't solve my problem.

What is the best way to write trajectories from multiple running actors into reverb ?

We're looking for an example of code on how to write full trajectories from multiples actors into reverb.
The setting is:

  • N environments running in parallel (different process)
  • variable episode length
  • Use all trajectories once they are available
    Our first idea is to create N different table for each running actors. But it is not clear how to retrieve all the
    trajectories from reverb and use them as a single dataset of trajectories.

Provide Windows binaries

I use Windows. I wanted to use Acme to do RL research. Acme requires Reverb, and Reverb doesn't have Windows binaries for download. I don't know how to compile packages on my own. This means that now I can't use either Reverb or Acme.

Memory Leak caused during benchmarking reverb's performance?

Hi @ebrevdo,

I got some issues during benchmark the performance of reverb. The setting is as follows,

  • Setup one Consumer to sampling data from reverb, and multiple Producers to generate data and add it to Reverb Server through the recommended TrajectoryWriter.
  • Each data element is a nested numpy array with data_item = {'obs': np.array((84, 84, 4), dtype=np.uint8), 'reward': np.array((), dtype=np.int32)}. Each data item contains 28236 Bytes or ~28Kb.
  • As adding more workers or Producers, it saturates at around 1.4Gb/s, as shown by the following figure.

image

My first question is that does the above performance look normal to you?
The second one is when I run the program for a while, it crashes the machine. By checking the memory, we found that the memory usage grows very fast when there are 64 workers, which means there might be a memory leak?

We also put the code below for your information.

Commend to run it:

CUDA_VISIBLE_DEVICES='' python3 code_below.py --lp_launch_type=local_mp --num_workers 64

We are using a DGX-A100 machine to run the experiments, which has 256 cpu cores and 1024G memory.

"""Benchmarking reverb performance.  """
import itertools
import time

import numpy as np
import launchpad as lp
import reverb
import tensorflow as tf
import tree
from absl import app, flags

TEST_TABLE = 'queue'
FLAGS = flags.FLAGS
flags.DEFINE_integer('seq_length', 20, 'length of a trajectory')
flags.DEFINE_integer('batch_size', 64, 'the batch size to sample trajs')
flags.DEFINE_integer('num_workers', 64, 'number of producers')


class Spec:

  def __init__(self, shape, dtype):
    self.shape = shape
    self.dtype = dtype


spec = {'obs': Spec((84, 84, 4), np.uint8), 'action': Spec((), np.int32)}


def get_queue_size(client: reverb.Client) -> int:
  table_info = client.server_info()[TEST_TABLE]
  return (table_info.rate_limiter_info.insert_stats.completed -
          table_info.rate_limiter_info.sample_stats.completed)


class Producer:
  """A producer generate fake data."""

  def __init__(self, client, seq_length=1):
    self._client = client
    self._seq_length = seq_length

  def run(self):
    writer = self._client.trajectory_writer(
        num_keep_alive_refs=self._seq_length,
        get_signature_timeout_ms=300_000,
    )

    def randint(x):
      return np.random.randint(255, size=x.shape, dtype=x.dtype)

    for cnt in itertools.count(1):
      data = tree.map_structure(randint, spec)
      writer.append(data)
      if cnt % self._seq_length == 0:
        traj = tree.map_structure(lambda x: x[-self._seq_length:],
                                  writer.history)
        writer.create_item(TEST_TABLE, priority=1, trajectory=traj)


class Consumer:
  """A consumer to read data from the reverb server."""

  def __init__(self, client, seq_length, batch_size=1) -> None:
    self._client = client
    self._seq_length = seq_length
    self._batch_size = batch_size

  def _make_dataset(self):

    def _inner_make_dataset(unused_idx):
      ds = reverb.TrajectoryDataset.from_table_signature(
          server_address=self._client.server_address,
          table=TEST_TABLE,
          max_in_flight_samples_per_worker=100,
      )
      return ds

    num_parallel_calls = 16
    ds = tf.data.Dataset.range(num_parallel_calls)
    ds = ds.interleave(
        map_func=_inner_make_dataset,
        cycle_length=num_parallel_calls,
        num_parallel_calls=num_parallel_calls,
        deterministic=False,
    )

    ds = ds.batch(self._batch_size)
    ds.prefetch(-1)
    ds = iter(ds)

    return ds

  def run(self):
    ds = self._make_dataset()

    log_interval = 20
    step_size = self._batch_size * self._seq_length * 4

    t0 = time.time()
    for i in itertools.count(1):
      next(ds)
      if i % log_interval == 0:
        fps = int(i * step_size / (time.time() - t0))
        print('| FPS: {}, QSize: {}'.format(fps, get_queue_size(self._client)))


def main(_):

  seqlen, bs = FLAGS.seq_length, FLAGS.batch_size
  sig = tree.map_structure(
      lambda x: tf.TensorSpec(shape=(seqlen,) + x.shape, dtype=x.dtype),
      spec,
  )

  def make_queue():
    return [reverb.Table.queue(name=TEST_TABLE, max_size=10000, signature=sig)]

  program = lp.Program('BenchRerverb')

  with program.group('reverb'):
    client = program.add_node(lp.ReverbNode(make_queue))

  with program.group('Consumer'):
    program.add_node(
        lp.CourierNode(
            Consumer, client=client, seq_length=seqlen, batch_size=bs))

  with program.group('Producer'):
    for _ in range(FLAGS.num_workers):
      program.add_node(
          lp.CourierNode(Producer, client=client, seq_length=seqlen))

  lp.launch(program)


if __name__ == '__main__':
  app.run(main)

Building from Source: libpybind.so undefined symbol

I'm building reverb from source, but am running into an undefined symbol error when trying to import the built reverb.

To build from source I do as the instructions say:
bazel build -c opt //reverb/pip_package:build_pip_package
and
./bazel-bin/reverb/pip_package/build_pip_package --dst /tmp/reverb_build/dist/

(Note that I did not do this within a docker container as I have bazel installed)

The build process completes successfully.

However, when I do import reverb I run into the following error:

  File "/.../conda/lib/python3.6/site-packages/reverb/pybind.py", line 1, in <module>
    import tensorflow as _tf; from .libpybind import *; del _tf
ImportError: /.../conda/lib/python3.6/site-packages/reverb/libpybind.so: undefined symbol: _ZNSaIlEC1Ev

Is there any reason why this might happen? I'm on a linux machine and used a gcc-7 installed from conda to install reverb.

Support for macOS

I'm using macOS (Catalina Version 10.15.6) as my main development environment, and it would be great to use Reverb on this OS.

Client calls in separate process hanging on WSL2

I'm trying to run reverb client and server in separate processes using multiprocessing. I've tried both dm-reverb 0.2.0 and dm-reverb-nightly 0.3.0.dev20210402 on Python 3.8, but getting the same result with the following piece of code. One key info is that I'm running this on Windows machine with WSL2 on Ubuntu 20.04.

The main process client works fine, but side process client hangs at the call to client.server_info(). I've also tried client.sample() after writing some data from main process and trying to access in the side process with the same result.

import reverb
import multiprocessing

def main():
    server = reverb.Server(tables=[
        reverb.Table(
            name='cartpole',
            sampler=reverb.selectors.Uniform(),
            remover=reverb.selectors.Fifo(),
            max_size=100,
            rate_limiter=reverb.rate_limiters.MinSize(1))
        ],
        port=8000
    )
    
    client = reverb.Client('localhost:8000')
    print(f"Main process : {client.server_info()}")

    side_proc = multiprocessing.Process(target=side_process)
    side_proc.start()

    server.wait()

def side_process():
    print("Side process started")
    client = reverb.Client('localhost:8000')
    print(f"Side process : {client.server_info()}")
    print("Side process terminated")

if __name__ == '__main__':
    main()
$ /bin/python3 /home/devin/test/reverbtest.py
2021-04-03 19:21:45.471910: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
[reverb/cc/platform/tfrecord_checkpointer.cc:144] Initializing TFRecordCheckpointer in /tmp/tmpon4xcimy
[reverb/cc/platform/tfrecord_checkpointer.cc:338] Loading latest checkpoint from /tmp/tmpon4xcimy
[reverb/cc/platform/default/server.cc:55] Started replay server on port 8000
Main process : {'cartpole': TableInfo(name='cartpole', sampler_options=uniform: true
, remover_options=fifo: true
is_deterministic: true
, max_size=100, max_times_sampled=0, rate_limiter_info=samples_per_insert: 1.0
min_diff: -1.7976931348623157e+308
max_diff: 1.7976931348623157e+308
min_size_to_sample: 1
insert_stats {
  completed_wait_time {
  }
  pending_wait_time {
  }
}
sample_stats {
  completed_wait_time {
  }
  pending_wait_time {
  }
}
, signature=None, current_size=0, num_episodes=0, num_deleted_episodes=0)}
Side process started
^Z

Removing absl's http_archive

Hi,

I noticed that https://github.com/deepmind/reverb/blob/master/WORKSPACE#L31 hard codes upstream tensorflow's absl version as a http_archive.

Does this mean that reverb will only support the latest TF version? Currently if I try to use TF 2.4 with the latest master branch, I get very lousy errors like:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Users/cyang/anaconda3/envs/myenv/lib/python3.8/site-packages/reverb/__init__.py", line 27, in <module>
    from reverb import item_selectors as selectors
  File "/Users/cyang/anaconda3/envs/myenv/lib/python3.8/site-packages/reverb/item_selectors.py", line 19, in <module>
    from reverb import pybind
  File "/Users/cyang/anaconda3/envs/myenv/lib/python3.8/site-packages/reverb/pybind.py", line 1, in <module>
    import tensorflow as _tf; from .libpybind import *; del _tf
ImportError: dlopen(/Users/cyang/anaconda3/envs/myenv/lib/python3.8/site-packages/reverb/libpybind.so, 2): Symbol not found: __ZN10tensorflow15TensorShapeBaseINS_11TensorShapeEEC2EN4absl12lts_202103244SpanIKxEE
  Referenced from: /Users/cyang/anaconda3/envs/myenv/lib/python3.8/site-packages/reverb/libpybind.so
  Expected in: flat namespace

The error is caused by mismatch in the absl version in the WORKSPACE file, and the absl version used by my pip installed tensorflow.

A fix to this is probably to eliminate the use of that http_archive, then add the include rules for absl manually similar to how you did it for TF: https://github.com/deepmind/reverb/blob/master/reverb/cc/platform/default/repo.bzl#L186.
But since absl is actually right under tensorflow/include/absl, perhaps using tensorflow's includes everywhere is already sufficient?
I do see that probably internally you would still want to use those @com_google_absl targets for consistency. So I'm not sure what's the best resolution.

Another possible fix is to make tensorflow an external dependency, similar to how tensorflow_text did it: https://github.com/tensorflow/text/blob/master/WORKSPACE#L53. This should also avoid the absl inconsistency.

Feature request: timeout argument for `client.sample`

Similarly to #4, it would be useful to be able to back out of sampling without needing to wrap things in a thread or use an executor. I agree that in many cases, you'd want to sample asynchronously to maximize throughput, but there are cases where the predictability and simplicity are preferable even if comes at the expense of efficiency (e.g., in research). A timeout argument would simplify the synchronous setting without sacrificing safeguards from indefinite waiting.

Error import reverb

Hello,

I cannot import a self-built release version of dm_reverb-0.1.0-cp36-cp36m-linux_aarch64.whl.

System information

Host OS: Ubuntu 18.04.5 LTS
Tensorflow: 2.3.1
GCC: 7.5.0
Python: 3.6.9

Result

>>> import tensorflow as tf
2021-07-15 17:54:33.353847: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.10.2
>>> tf.__version__
'2.3.1'
>>> import reverb
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/ubuntu/.local/lib/python3.6/site-packages/reverb/__init__.py", line 27, in <module>
    from reverb import item_selectors as selectors
  File "/home/ubuntu/.local/lib/python3.6/site-packages/reverb/item_selectors.py", line 19, in <module>
    from reverb import pybind
  File "/home/ubuntu/.local/lib/python3.6/site-packages/reverb/pybind.py", line 1, in <module>
    import tensorflow as _tf; from .libpybind import *; del _tf
ImportError: /home/ubuntu/.local/lib/python3.6/site-packages/reverb/libschema_cc_proto.so: undefined symbol: _ZNK6google8protobuf7Message25InitializationErrorStringEv
>>> 

Building wheel on Apple M1

I tried to compile wheel direct on macOS Big Sur, but I get an error:

Starting local Bazel server and connecting to it...
INFO: Repository python_includes instantiated at:
/Users/martin/Projects/reverb/WORKSPACE:82:16: in
/Users/martin/Projects/reverb/reverb/cc/platform/default/repo.bzl:300:25: in cc_tf_configure
Repository rule unexported__python_includes_repo_impl defined at:
/Users/martin/Projects/reverb/WORKSPACE:82:16: in
/Users/martin/Projects/reverb/reverb/cc/platform/default/repo.bzl:297:43: in cc_tf_configure
ERROR: An error occurred during the fetch of repository 'python_includes':
Traceback (most recent call last):
File "/Users/martin/Projects/reverb/reverb/cc/platform/default/repo.bzl", line 237, column 43, in _python_includes_repo_impl
python_solib = _find_python_solib_path(repo_ctx)
File "/Users/martin/Projects/reverb/reverb/cc/platform/default/repo.bzl", line 98, column 13, in _find_python_solib_path
fail("Could not locate python shared library path:\n{}"
Error in fail: Could not locate python shared library path:
src/main/tools/process-wrapper-legacy.cc:80: "execvp(python3.8-config, ...)": No such file or directory
ERROR: Error fetching repository: Traceback (most recent call last):
File "/Users/martin/Projects/reverb/reverb/cc/platform/default/repo.bzl", line 237, column 43, in _python_includes_repo_impl
python_solib = _find_python_solib_path(repo_ctx)
File "/Users/martin/Projects/reverb/reverb/cc/platform/default/repo.bzl", line 98, column 13, in _find_python_solib_path
fail("Could not locate python shared library path:\n{}"
Error in fail: Could not locate python shared library path:
src/main/tools/process-wrapper-legacy.cc:80: "execvp(python3.8-config, ...)": No such file or directory
INFO: Repository bazel_skylib instantiated at:
/Users/martin/Projects/reverb/WORKSPACE:58:10: in
/private/var/tmp/_bazel_martin/79641e1633fab9ab92513b178e080b75/external/com_github_grpc_grpc/bazel/grpc_deps.bzl:211:21: in grpc_deps
Repository rule http_archive defined at:
/private/var/tmp/_bazel_martin/79641e1633fab9ab92513b178e080b75/external/bazel_tools/tools/build_defs/repo/http.bzl:336:31: in
INFO: Repository build_bazel_rules_swift instantiated at:
/Users/martin/Projects/reverb/WORKSPACE:68:25: in
/private/var/tmp/_bazel_martin/79641e1633fab9ab92513b178e080b75/external/build_bazel_rules_apple/apple/repositories.bzl:117:11: in apple_rules_dependencies
/private/var/tmp/_bazel_martin/79641e1633fab9ab92513b178e080b75/external/build_bazel_rules_apple/apple/repositories.bzl:84:14: in _maybe
Repository rule http_archive defined at:
/private/var/tmp/_bazel_martin/79641e1633fab9ab92513b178e080b75/external/bazel_tools/tools/build_defs/repo/http.bzl:336:31: in
ERROR: /private/var/tmp/_bazel_martin/79641e1633fab9ab92513b178e080b75/external/tensorflow_solib/BUILD:2:11: @tensorflow_solib//:framework_lib depends on @python_includes//:python_includes in repository @python_includes which failed to fetch. no such package '@python_includes//': Could not locate python shared library path:
src/main/tools/process-wrapper-legacy.cc:80: "execvp(python3.8-config, ...)": No such file or directory
ERROR: /private/var/tmp/_bazel_martin/79641e1633fab9ab92513b178e080b75/external/tensorflow_solib/BUILD:2:11: @tensorflow_solib//:framework_lib depends on @python_includes//:numpy_includes in repository @python_includes which failed to fetch. no such package '@python_includes//': Could not locate python shared library path:
src/main/tools/process-wrapper-legacy.cc:80: "execvp(python3.8-config, ...)": No such file or directory
ERROR: Analysis of target '//reverb/pip_package:build_pip_package' failed; build aborted: Analysis failed
INFO: Elapsed time: 29.788s
INFO: 0 processes.
FAILED: Build did NOT complete successfully (46 packages loaded, 253 targets configured)
currently loading: @com_github_grpc_grpc//
Fetching @protobuf_archive; fetching
Fetching @eigen_archive; fetching

Using

Xcode CLI
Python 3.8.2
Numpy 1.18.5
Mac-optimized TensorFlow r2.4rc0
bazel 4.0.0-homebrew (installed by HomeBrew)

I need a wheel natively for Apple M1 chip. Thanks.

Restoring a checkpoint does not restore a checkpoint

Following the readme guidelines, when you trigger a checkpoint, you can provide the path to a DefaultCheckpointer and give that to the server to have it restore the state.

This does not actually seems to work. I'm using Reverb version 0.1.0.dev20200716 because later versions crash for me (as I detailed in issue #13 )

Here's a script I wrote to try and get it to work.

import reverb
import tensorflow as tf

OBSERVATION_TABLE = 'OBSERVATION'
PORT = 5000

def start_server(checkpoint_path = None):

    if checkpoint_path:
        checkpointer = reverb.checkpointers.DefaultCheckpointer(checkpoint_path)
    else:
        checkpointer = None

    server = reverb.Server(  # noqa
        tables=[
            reverb.Table(
                name=OBSERVATION_TABLE,
                sampler=reverb.selectors.Uniform(),
                remover=reverb.selectors.Fifo(),
                max_size=1000,
                rate_limiter=reverb.rate_limiters.MinSize(1),
            )
        ],
        port=5000,
        checkpointer=checkpointer
    )

    return server, reverb.client.Client(f'localhost:{PORT}'), reverb.TFClient(f'localhost:{PORT}')


def do_writes(client):
    print('writing data')
    writer = client.writer(1)
    for i in range(10):
        writer.append(tf.constant([i], dtype=tf.float32))
        writer.create_item(OBSERVATION_TABLE, 1, 0.8)


def do_reads():
    print('reading data')
    dataset = reverb.ReplayDataset(f'localhost:{PORT}', OBSERVATION_TABLE, (tf.float32,), (1,), 4)
    for item in dataset.take(20):
        print(item.data)


def insert_run():
    server, client, tf_client = start_server()
    do_writes(client)
    do_reads()

    checkpoint_path = client.checkpoint()
    print(checkpoint_path)
    print('done')


def checkpoint_run(path):
    server, client, tf_client = start_server(path)
    do_reads()
    print('done')


def main():
    # first run this script with this line
    insert_run() 

    # then once you get a checkpoint path from running the above, comment out the above line and uncomment the below
    # below line, but passing it the file path you observed in the previous run. I have an example path here
    # checkpoint_run('/tmp/tmpqv10edqk/2020-08-11T15:16:41.996914817+00:00')


if __name__ == '__main__':
    main()

When you run the script using insert_run() everything works and you get print outs of data from the dataset as well as a checkpoint path.

When you run the script instead using checkpoint_run and providing it the final path reported from the previous run of the script, it blocks indefinitely on the first iteration of the dataset, presumably because it's waiting for the first data to be inserted since it seems to not have actually loaded the table checkpoint.

I can confirm that the file path reported exists with the following contents

DONE  chunks.tfrecord  tables.tfrecord

I can also confirm that reverb reports using the checkpoint in its output because you can see the following log.

[reverb/cc/platform/tfrecord_checkpointer.cc:143] Initializing TFRecordCheckpointer in /tmp/tmpqv10edqk/2020-08-11T15:16:41.996914817+00:00
[reverb/cc/platform/tfrecord_checkpointer.cc:320] Loading latest checkpoint from /tmp/tmpqv10edqk/2020-08-11T15:16:41.996914817+00:00

Any thoughts?

UPDATE After trying one more idea, I found if I pass the parent directory of the checkpoint path, it will load the data, which presumably is the latest only.

That solves some of my problems, but it means two things

  1. I think the readme is wrong (the returned path from the checkpoint won't work)
  2. It means I cannot specify a specific checkpoint, only the latest.

So I think the revised issue is you can't seem to load a specific checkpoint, only the latest.

How to use the checkpointer to save replay buffer in the disk?

I found that there are no files in the directory of the checkpointer's folder, neigther /tmp/xxxxx or the specific predefined directory.
I also noticed that the TFCheckpointer class have inferfaces for saving and loading replay buffer, but when i call replay_checkpointer.save() i got the error said "Don't have the attribute 'save' ".
So how to use the checkpointer class or reverb's saver class to save replay buffer in the disk?
Thanks~

Cannot import reverb

After installing reverb for tf-agents using

pip3 install tf-agents[reverb]

I cannot import reverb because of this FileNotFoundError:

[GCC 8.4.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import reverb
2020-11-01 13:34:47.428913: W tensorflow/stream_executor/platform/default/dso_loader.cc:60] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory; LD_LIBRARY_PATH: :/home/silver/.mujoco/mujoco200/bin
2020-11-01 13:34:47.428939: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/silver/.local/lib/python3.6/site-packages/reverb/__init__.py", line 33, in <module>
    from reverb.dataset import ReplayDataset
  File "/home/silver/.local/lib/python3.6/site-packages/reverb/dataset.py", line 25, in <module>
    from reverb.cc.ops import gen_dataset_op
  File "/home/silver/.local/lib/python3.6/site-packages/reverb/cc/ops/gen_dataset_op.py", line 4, in <module>
    "libgen_dataset_op_gen_op.so"))
  File "/home/silver/.local/lib/python3.6/site-packages/tensorflow/python/framework/load_library.py", line 57, in load_op_library
    lib_handle = py_tf.TF_LoadLibrary(library_filename)
tensorflow.python.framework.errors_impl.NotFoundError: /home/silver/.local/lib/python3.6/site-packages/reverb/cc/ops/libgen_dataset_op_gen_op.so: undefined symbol: _ZN10tensorflow4data15DatasetOpKernel11TraceStringEPNS_15OpKernelContextEb

When I subsequently try to exit the python shell using Ctrl^D it prints this before exiting:

Exception ignored in: <bound method Buckets.__del__ of <tensorflow.python.eager.monitoring.ExponentialBuckets object at 0x7f6234677438>>
Traceback (most recent call last):
  File "/home/silver/.local/lib/python3.6/site-packages/tensorflow/python/eager/monitoring.py", line 407, in __del__
AttributeError: 'NoneType' object has no attribute 'TFE_MonitoringDeleteBuckets'
$ pip3 freeze | grep reverb
dm-reverb==0.1.0
dm-reverb-nightly==0.2.0.dev20201101
reverb==2.0.1

It appears I also have some other package named reverb installed (https://pypi.org/project/reverb/), could this be the problem? It also cannot be uninstalled:

Found existing installation: reverb 2.0.1
Uninstalling reverb-2.0.1:
  Would remove:
    /home/silver/.local/lib/python3.6/site-packages/*
  Would not remove (might be manually added):

After printing this it doesnt do anything anymore.

TFClient.insert/sample: InvalidArgumentError: Trying to access resource using the wrong type. Expected localhost got N8deepmind6reverb12_GLOBAL__N_114ClientResourceE

By upgrading from dm-reverb-nightly==0.1.0.dev20200616 to dm-reverb-nightly==0.1.0.dev20200708, TFClient fail to connect to the server.

The minimal code, which is tested on Google Colab, is following;

#!pip install tf-nightly==2.3.0.dev20200604 dm-reverb-nightly==0.1.0.dev20200616 # OK
!pip install tf-nightly==2.3.0.dev20200604 dm-reverb-nightly==0.1.0.dev20200708 # NG

# Reset runtime once here!

import tensorflow as tf
import reverb
server = reverb.Server(tables =[
    reverb.Table(name='ReplayBuffer',
                 sampler=reverb.selectors.Uniform(),
                 remover=reverb.selectors.Fifo(),
                 max_size=256,
                 rate_limiter=reverb.rate_limiters.MinSize(1))])
tc = reverb.TFClient(f"localhost:{server.port}")

tc.insert([tf.constant(1.0)],tf.constant(["ReplayBuffer"]),tf.constant([1.0],dtype=tf.float64)) # -> Error1
tc.sample(tf.constant(["ReplayBuffer"]),[tf.float32]) # -> Error2

The test code works fine with dm-reverb-nightly==0.1.0.dev20200616 and fails with dm-reverb-nightly==0.1.0.dev20200708.

Error1

---------------------------------------------------------------------------
InvalidArgumentError                      Traceback (most recent call last)
<ipython-input-2-1467e2131a0c> in <module>()
----> 1 tc.insert([tf.constant(1.0)],tf.constant(["ReplayBuffer"]),tf.constant([1.0],dtype=tf.float64))

/usr/local/lib/python3.6/dist-packages/reverb/tf_client.py in insert(self, data, tables, priorities, name)
    112     with tf.name_scope(name, f'{self._name}_insert', ['insert']) as scope:
    113       return gen_client_ops.reverb_client_insert(
--> 114           self._handle, data, tables, priorities, name=scope)
    115 
    116   def update_priorities(self,

<string> in reverb_client_insert(handle, data, tables, priorities, name)

/usr/local/lib/python3.6/dist-packages/tensorflow/python/framework/ops.py in raise_from_not_ok_status(e, name)
   6849   message = e.message + (" name: " + name if name is not None else "")
   6850   # pylint: disable=protected-access
-> 6851   six.raise_from(core._status_to_exception(e.code, message), None)
   6852   # pylint: enable=protected-access
   6853 

/usr/local/lib/python3.6/dist-packages/six.py in raise_from(value, from_value)

InvalidArgumentError: Trying to access resource using the wrong type. Expected localhost got N8deepmind6reverb12_GLOBAL__N_114ClientResourceE [Op:ReverbClientInsert] name: reverb_insert/

Error2

---------------------------------------------------------------------------
InvalidArgumentError                      Traceback (most recent call last)
<ipython-input-3-680461c93a17> in <module>()
----> 1 tc.sample(tf.constant(["ReplayBuffer"]),[tf.float32])


/usr/local/lib/python3.6/dist-packages/reverb/tf_client.py in sample(self, table, data_dtypes, name)
     69     with tf.name_scope(name, f'{self._name}_sample', ['sample']) as scope:
     70       key, probability, table_size, priority, data = gen_client_ops.reverb_client_sample(
---> 71           self._handle, table, tree.flatten(data_dtypes), name=scope)
     72       return replay_sample.ReplaySample(
     73           replay_sample.SampleInfo(

<string> in reverb_client_sample(handle, table, Toutput_list, name)

/usr/local/lib/python3.6/dist-packages/tensorflow/python/framework/ops.py in raise_from_not_ok_status(e, name)
   6849   message = e.message + (" name: " + name if name is not None else "")
   6850   # pylint: disable=protected-access
-> 6851   six.raise_from(core._status_to_exception(e.code, message), None)
   6852   # pylint: enable=protected-access
   6853 

/usr/local/lib/python3.6/dist-packages/six.py in raise_from(value, from_value)

InvalidArgumentError: Trying to access resource using the wrong type. Expected localhost got N8deepmind6reverb12_GLOBAL__N_114ClientResourceE [Op:ReverbClientSample] name: reverb_sample/

As far as I tested, Client can still works fine and the deprecated method TFClient.dataset can still works fine, too.

Error with reverb installation

Thanks for making this project publicly available. I am checking it out. reverb installation is hitting the following error

ERROR: Could not find a version that satisfies the requirement dm-reverb-nightly==0.1.0.dev20200529; extra == "reverb" (from dm-acme[reverb]) (from versions: none)
ERROR: No matching distribution found for dm-reverb-nightly==0.1.0.dev20200529; extra == "reverb" (from dm-acme[reverb])

Timeout on write_item?

I'm exploring reverb and it looks like it's a pretty cool framework that solves a lot of problems.

One thing I'm unsure how to do, or if it's a feature that should be added, is how to timeout on a Writer.create_item call.

To give some motivation, suppose I have a remote trainer that is also hosting the reverb server, and a worker that is writing to this remove Reverb server using a Writer and its create_item method. Now suppose the trainer server crashes for some reason (or even just ends gracefully) taking down the Reverb server. It seems that right now the worker will just be indefinitely blocked on create_item, when what I want is for it to time out and die gracefully itself.

To give an example of this behavior, run the following "server" script in one shell

import reverb
import tensorflow as tf
import time


def main():
    server = reverb.Server(  # noqa
        tables=[
            reverb.Table(
                name='my_table',
                sampler=reverb.selectors.Uniform(),
                remover=reverb.selectors.Fifo(),
                max_size=1000,
                rate_limiter=reverb.rate_limiters.MinSize(1)),
        ],
        port=8000
    )

    time.sleep(10.)  # die after 10 seconds
    print('Server closing')


if __name__ == '__main__':
    main()

In another shell, run this "worker" script

import reverb
import tensorflow as tf
import time


def main():
    client = reverb.Client('localhost:8000')

    with client.writer(max_sequence_length=2) as writer:
        for i in range(1000):
            writer.append(tf.constant([1.]))
            if i > 0:
                print(f'creating item for i={i}')
                client.server_info(timeout=5)
                writer.create_item('my_table', 2, 1.0)
                print(f'wrote item for i={i}')
                time.sleep(0.5)

    print('closing')



if __name__ == '__main__':
    main()

What I find is that when the server goes down after its 10 second lifetime, you'll see the worker is blocked indefinitely on the create_item call.

So is there a way to time this out? The only work around I saw was to use a check of the server_info method first, which does have a timeout argument you can specify, and then catch on error. E.g., you can modify the worker like this:

def main():
    client = reverb.Client('localhost:8000')

    with client.writer(max_sequence_length=2) as writer:
        for i in range(1000):
            writer.append(tf.constant([1.]))
            if i > 0:
                print(f'creating item for i={i}')
                try:
                    client.server_info(timeout=5)
                except:
                    break
                writer.create_item('my_table', 2, 1.0)
                print(f'wrote item for i={i}')
                time.sleep(0.5)

    print('closing')

This seems a bit cumebrsome though: I'm not sure what overhead that method call is introducing, and it can still potentially fail if the server dies right after the server_info call, but before the write_item (even though that may be rare).

Is there a more appropriate way to handle this? Or could a timeout arg be added to the create_item method?

Thanks!

ReverbDataset on TPU

What would be a working setup to use reverb on a TPU?
When I try iterating over the dataset I always get the error:

NotFoundError: Op type not registered 'ReverbDataset' in binary running on ***.

Is there a special TPU software version with the Reverb ops and kernels?

Failed Import since TensorFlow 2.5.0 release

Today TensorFlow released version 2.5.0 which seems to have led to importing reverb failing with the error below (achieved by opening a Python (3.8.5) REPL and trying the import).

>>> import reverb
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/lib/python3.8/dist-packages/reverb/__init__.py", line 27, in <module>
    from reverb import item_selectors as selectors
  File "/usr/local/lib/python3.8/dist-packages/reverb/item_selectors.py", line 19, in <module>
    from reverb import pybind
  File "/usr/local/lib/python3.8/dist-packages/reverb/pybind.py", line 1, in <module>
    import tensorflow as _tf; from .libpybind import *; del _tf
ImportError: /usr/local/lib/python3.8/dist-packages/reverb/libpybind.so: undefined symbol: _ZN4absl14lts_2020_02_2518container_internal18global_next_sampleE

Reproduction Steps

  • pip install dm-reverb[tensorflow]
  • Open Python REPL and try import reverb

Current Workaround
Fix TensorFlow to version 2.4.*

Expected Behaviour
Import to work without issue.

Note: No problem when using dm-reverb-nightly[tensorflow] (This is because it installs TensorFlow version 2.6.0-dev20210514)

Any details for building the sharding reverb services?

Hi all! Thanks for open-sourcing this~
I want to use reverb to build a distributed prioritized replay buffer used in APEX or R2D2. But I don't know how to achieve that, especially what functions need to use and how to use them with a gRPC compatible load balancer. And I cannot find the document mentioned in the section of "Sharding" of the README. Are there some examples or py files to let me how to use reverb in the sharding situation?
Thanks a lot~

Support for batch append while adding data

Hi,

From the tutorial, there are only examples for adding trajectory to the reverb server from a single environment. However, a more common setting in RL is that we might have a BachedEnv holding multiple environments (say, n). In this case, on the actor side, the shape of a tensor would be [n, single_shape], but on the learner side, we need a batch of samples to be [T, batch_size, single_shape]. My question is, what is the best practice for this?

Various later versions of Reverb fail on import.

If I use Tensorflow 2.3.0 and the latest reverb nightly, when I import reverb in Python 3.8, I get the following error.

>>> import reverb
2020-08-11 15:36:08.653570: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/dev/.local/lib/python3.8/site-packages/reverb/__init__.py", line 27, in <module>
    from reverb import item_selectors as selectors
  File "/home/dev/.local/lib/python3.8/site-packages/reverb/item_selectors.py", line 19, in <module>
    from reverb import pybind
  File "/home/dev/.local/lib/python3.8/site-packages/reverb/pybind.py", line 1, in <module>
    import tensorflow as _tf; from .libpybind import *; del _tf
ImportError: /home/dev/.local/lib/python3.8/site-packages/reverb/libpybind.so: undefined symbol: _ZN10tensorflow6StatusC1ENS_5error4CodeEN4absl14lts_2020_02_2511string_viewEOSt6vectorINS_10StackFrameESaIS7_EE

The last Reverb version that works for me is 0.1.0.dev20200716. Anything after that version fails in the same way.

This is within a docker instance of Ubuntu 18.04 (with some other stuff built on top). Any thoughts?

Random seed

Hi,

Is there a way to set random seed for Reverb to force deterministic sampling? In my test case, there is only one single-threaded client, so the insertion has no randomness.

Thanks!

Experience replay with frame stacking?

Hi all! Thanks for open-sourcing this, I feel like the world has been waiting for a flexible experience replay framework.. I definitely have myself.

I was reading through the README and I was wondering if you might have an example of a replay buffer with frame stacking. How would I implement that?

Thanks!

Replay buffer as in the IMPALA paper

Hi,

The IMPALA paper states that:

Screen Shot 2021-05-05 at 2 29 17 AM

So my understanding is that, for a batch size of 100, 50 samples are drawn from new experiences, and 50 are drawn from a replay buffer.
I am wondering how to implement this behavior using reverb?

Currently I'm thinking of maintaining two Tables.
For the new experiences buffer, it will be a Queue just as in the README.
For the replay buffer, I will sample uniformly, and set max_times_sampled=1.

So perhaps something like this:

new_size = 1000
replay_size = 100000
server = reverb.Server(
    tables=[
        reverb.Table(
            name='replay_buffer',
            sampler=reverb.selectors.Uniform(),
            remover=reverb.selectors.Fifo(), 
            max_size=replay_size,
            max_times_sampled=1,
            rate_limiter=reverb.rate_limiters.MinSize(1),
        ),
        reverb.Table.queue(name='new_experiences', max_size=new_size)
    ],
    port=8000,
)

However, the doc says:

An item has been sampled more than the maximum number of times permitted by the Table’s rate limiter. Note that not all rate limiters will enforce this.

It's not clear what max_times_sampled=1 do, and which rate limiters will actually enforce it.

The remover=reverb.selectors.Fifo() is also suspicious since in my imagined use case, the remover should just always drop the sampled item every-time, leaving no place for a remover at all!
Though perhaps this is a feature -- for example, if no sampling occurs (e.g. node failure), the buffer will get to its max_size, causing items to dropped in FIFO order?

Sampling from multiple tables

Hello,
I want to create a buffer consisting of multiple queues, where each queue contains an ordered stream of observations. To construct a batch, I want to sample one observation from each queue and batch them (i.e. the batch size is the same as the number of queues). My idea is to create as many tables as queues needed, sample a batch size of 1 separately from each and then concatenate the result to get the desired batch. Is there a more efficient way of achieving this behavior?

client can not access reverb server in docker environment

When running reverb in a docker environment, reverb client does not seem to be able to connect to the reverb server:

Traceback (most recent call last):
  File "test.py", line 15, in <module>
    client.server_info()
  File "/test/lib/python3.8/site-packages/reverb/client.py", line 410, in server_info
    info_proto_strings = self._client.ServerInfo(timeout or 0)
RuntimeError: Socket closed

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.