Giter Site home page Giter Site logo

txtq's Introduction

CRAN Travis build status AppVeyor build status Codecov

txtq - a small message queue for parallel processes

The txtq package helps parallel R processes send messages to each other. Let's say Process A and Process B are working on a parallel task together. First, both processes grab the queue.

path <- tempfile() # Define a path to your queue.
path # In real life, temp files go away when the session exits, so be careful.
#> [1] "/tmp/Rtmpt14rFr/file517839a8bfa2"
q <- txtq(path) # Create the queue.

The queue uses text files to keep track of your data. In the data frame of messages, the time column is the POSIXct Sys.time() stamp of when each message was pushed.

list.files(q$path()) # The queue's underlying text files live in this folder.
#> [1] "db"    "head"  "lock"  "total"
q$list() # You have not pushed any messages yet.
#> [1] title   message time   
#> <0 rows> (or 0-length row.names)

Then, Process A sends instructions to Process B.

q$push(title = "Hello", message = "process B.")
q$push(
  title = c("Calculate", "Calculate"),
  message = c("sqrt(4)", "sqrt(16)")
)
q$push(title = "Send back", message = "the sum.")

You can inspect the contents of the queue from either process.

q$list()
#>       title    message                time
#> 1     Hello process B. 2018-10-09 22:04:08
#> 2 Calculate    sqrt(4) 2018-10-09 22:04:08
#> 3 Calculate   sqrt(16) 2018-10-09 22:04:08
#> 4 Send back   the sum. 2018-10-09 22:04:08
q$list(1) # You can specify the number of messages to list.
#>   title    message                time
#> 1 Hello process B. 2018-10-09 22:04:08
q$count()
#> [1] 4

As Process A is pushing the messages, Process B can consume them.

q$pop(2) # If you pass 2, you are assuming the queue has >=2 messages.
#>       title    message                time
#> 1     Hello process B. 2018-10-09 22:04:08
#> 2 Calculate    sqrt(4) 2018-10-09 22:04:08

Those popped messages are not technically in the queue any longer.

q$list()
#>       title  message                time
#> 1 Calculate sqrt(16) 2018-10-09 22:04:08
#> 2 Send back the sum. 2018-10-09 22:04:08
q$count() # Number of messages technically in the queue.
#> [1] 2

But we still have a full log of all the messages that were ever sent.

q$log()
#>       title    message                time
#> 1     Hello process B. 2018-10-09 22:04:08
#> 2 Calculate    sqrt(4) 2018-10-09 22:04:08
#> 3 Calculate   sqrt(16) 2018-10-09 22:04:08
#> 4 Send back   the sum. 2018-10-09 22:04:08
q$total() # Number of messages that were ever queued.
#> [1] 4

Let's let Process B get the rest of the instructions.

q$pop() # q$pop() with no arguments just pops one message.
#>       title  message                time
#> 1 Calculate sqrt(16) 2018-10-09 22:04:08
q$pop() # Call q$pop(-1) to pop all the messages at once.
#>       title  message                time
#> 1 Send back the sum. 2018-10-09 22:04:08

Now let's say Process B follows the instructions in the messages. The last step is to send the results back to Process A.

q$push(title = "Results", message = as.character(sqrt(4) + sqrt(16)))

Process A can now see the results.

q$pop()
#>     title message                time
#> 1 Results       6 2018-10-09 22:04:08

The queue can grow large if you are not careful. Popped messages are kept in the database file.

q$push(title = "not", message = "popped")
q$count()
#> [1] 1
q$total()
#> [1] 6
q$list()
#>   title message                time
#> 1   not  popped 2018-10-09 22:04:08
q$log()
#>       title    message                time
#> 1     Hello process B. 2018-10-09 22:04:08
#> 2 Calculate    sqrt(4) 2018-10-09 22:04:08
#> 3 Calculate   sqrt(16) 2018-10-09 22:04:08
#> 4 Send back   the sum. 2018-10-09 22:04:08
#> 5   Results          6 2018-10-09 22:04:08
#> 6       not     popped 2018-10-09 22:04:08

To keep the database file from getting too big, you can clean out the popped messages.

q$clean()
q$count()
#> [1] 1
q$total()
#> [1] 1
q$list()
#>   title message                time
#> 1   not  popped 2018-10-09 22:04:08
q$log()
#>   title message                time
#> 1   not  popped 2018-10-09 22:04:08

You can also reset the queue to remove all messages, popped or not.

q$reset()
q$count()
#> [1] 0
q$total()
#> [1] 0
q$list()
#> [1] title   message time   
#> <0 rows> (or 0-length row.names)
q$log()
#> [1] title   message time   
#> <0 rows> (or 0-length row.names)

When you are done, you can destroy the files in the queue.

q$destroy()
file.exists(q$path())
#> [1] FALSE

This entire time, the queue was locked when either process was trying to create, access, or modify it. That way, the results stay correct even when multiple processes try to read or change the data at the same time.

Similar work

liteq

Gábor Csárdi's liteq package offers essentially the same functionality implemented with SQLite. It has a some additional features, such as the ability to detect crashed workers and re-queue failed messages, but it was in an early stage of development at the time txtq was released.

Other message queues

There is a plethora of message queues beyond R, most notably ZeroMQ and RabbitMQ. In fact, Jeroen Ooms and Whit Armstrong maintain rzmq, a package to work with ZeroMQ from R. Even in this landscape, txtq has advantages.

  1. The txtq user interface is friendly, and its internals are simple. No prior knowledge of sockets or message-passing is required.
  2. txtq is lightweight, R-focused, and easy to install. It only depends on R and a few packages on CRAN.
  3. Because txtq it is file-based,
    • The queue persists even if your work crashes, so you can diagnose failures with q$log() and q$list().
    • Job monitoring is easy. Just open another R session and call q$list() while your work is running.

txtq's People

Contributors

wlandau-lilly avatar ifellows avatar

Watchers

 avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.