Giter Site home page Giter Site logo

shikokuchuo / mirai Goto Github PK

View Code? Open in Web Editor NEW
169.0 6.0 9.0 9.47 MB

mirai - Minimalist Async Evaluation Framework for R

Home Page: https://shikokuchuo.net/mirai/

License: GNU General Public License v3.0

R 100.00%
r rstats cran r-package concurrency parallel-programming distributed-computing high-performance-computing asynchronous-tasks promises

mirai's People

Contributors

jcheng5 avatar shikokuchuo avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar

mirai's Issues

Impact on "select" function after running an eval_mirai

Hi @shikokuchuo
As part of my shiny application, I am currently experimenting with eval_mirai and things are working more or less as expected. A very strange behavior, however, is that when the eval_mirai has started, it suddenly breaks all my select() statements (from dplyr) inside my shiny app. The error I get is the following when a select is executed on a dataframe:

Warning: Error in select: Can't convert NA to < integer >.

I can run these statements without issue before starting the eval_mirai, but once it's triggered, those break. I was thinking it could be some kind of variable conflict, but I double-checked and there is no variable name conflicting between the mirai process and the other tasks. I am also puzzled by the fact that it finds unresolvedValues for NAs in my dataframes where there should be none.

Any clue on this?

Mirai error message outside of mirai function (Can't convert `NA` <unresolvedValue> to <integer>.)

Hi!

I've got this strange error where the mirai object seems to "pollute" the R environment and give errors in other places. For example, when running this piece of code:

library(dplyr)
library(mirai)

{
  x = mirai({
    iris %>%
      select(Species)
  }, iris = iris, count = count)
  x$data

  iris %>%
    select(Species)

}

iris %>%
  select(Species)

I got the error

Error in `select()`:
! Can't convert `NA` <unresolvedValue> to <integer>.
Run `rlang::last_error()` to see where the error occurred.

and then, whenever I run just this piece:

iris %>%
  select(Species)

I got the same error. It seems that if I run an expression in mirai and this results in error, I cannot run this expression outside of mirai either.

My session info:

> sessionInfo()
R version 4.2.0 (2022-04-22)
Platform: x86_64-pc-linux-gnu (64-bit)
Running under: Ubuntu 20.04.4 LTS

Matrix products: default
BLAS:   /usr/lib/x86_64-linux-gnu/blas/libblas.so.3.9.0
LAPACK: /usr/lib/x86_64-linux-gnu/lapack/liblapack.so.3.9.0

locale:
 [1] LC_CTYPE=pt_BR.UTF-8       LC_NUMERIC=C               LC_TIME=pt_BR.UTF-8        LC_COLLATE=pt_BR.UTF-8     LC_MONETARY=pt_BR.UTF-8    LC_MESSAGES=pt_BR.UTF-8   
 [7] LC_PAPER=pt_BR.UTF-8       LC_NAME=C                  LC_ADDRESS=C               LC_TELEPHONE=C             LC_MEASUREMENT=pt_BR.UTF-8 LC_IDENTIFICATION=C       

attached base packages:
[1] stats     graphics  grDevices utils     datasets  methods   base     

other attached packages:
[1] mirai_0.4.1 dplyr_1.0.9

loaded via a namespace (and not attached):
 [1] fansi_1.0.3       assertthat_0.2.1  utf8_1.2.2        crayon_1.5.1      R6_2.5.1          DBI_1.1.2         lifecycle_1.0.1   magrittr_2.0.3    pillar_1.7.0     
[10] rlang_1.0.2       cli_3.3.0         rstudioapi_0.13   vctrs_0.4.1       generics_0.1.2    ellipsis_0.3.2    tools_4.2.0       glue_1.6.2        purrr_0.3.4      
[19] nanonext_0.5.0    compiler_4.2.0    pkgconfig_2.0.3   sessioninfo_1.2.2 tidyselect_1.1.2  tibble_3.1.7     

server() and SIGINT

In testing, I often run server() interactively and then try to stop it with CTRL-c, but it does not seem to respond to SIGINT. How easy would it be to allow interrupts? This is not essential to my use case, simply nice to have if it's easy. If it's hard, please don't worry about it.

mirai vs callr performance: a simple test

Here is a simple test to confirm that mirai is much faster than callr, to produce the exact same results.

library(mirai)
library(tictoc)
library(tidyverse)
library(callr)

# example with mirai

tic("Process with mirai- Duration: ")

mirai <- list()
for (i in 1:2) {
mirai[[i]] <- eval_mirai(
  
  {
    library(tidyverse)
    big_calculation <- function() {
      
      rnorm(1e8) %>%  tibble::as_tibble() %>% return()
    }
  big_calculation()}
  
  )
}

while(unresolved(mirai[[1]]) | unresolved(mirai[[2]])) {
}

toc()

# example with callr

# callr does not allow for library calls in its functions, except when using :: so we need to rewrite the function here.

big_calculation_callr <- function() {
  
  magrittr::`%>%`(magrittr::`%>%`(rnorm(1e8),tibble::as_tibble()),return())
}


tic("Process with callr- Duration: ")
callr_list <- list()
for (i in 1:2) {
callr_list[[i]] <- callr::r_bg(big_calculation_callr)
}

while(is.null(callr_list[[1]]$get_exit_status()) |  is.null(callr_list[[2]]$get_exit_status())) {
}
toc()

The results speak for themselves:

Process with mirai- Duration: : 17.798 sec elapsed
Process with callr- Duration: : 66.705 sec elapsed

"'errorValue' int 7 | Object closed" with transient sockets and transient workers

As I mentioned in #48 (comment), when I work on wlandau/crew#61 and test crew transient workers, I notice a lot of "'errorValue' int 7 | Object closed" (about 50% of tasks in that test). The following example reproduces the same issue (at least on my Ubuntu machine) with just mirai and nanonext. I think it has something to do with the fact that I am using transient single-task workers and recycling the websocket with saisei() after each task.

library(mirai)
library(nanonext)
packageVersion("mirai")
#> [1] ‘0.8.2.9007’
packageVersion("nanonext")
#> [1] ‘0.8.1.9008’
daemons(n = 1L, url = "ws://127.0.0.1:5000")
count <- 0L
while (TRUE) {
  count <- count + 1L
  m <- mirai("done")
  launch(
    sprintf(
      "mirai::server(url = '%s', asyncdial = FALSE)",
      saisei(i = 1L)
    )
  )
  while (unresolved(m)) {
    msleep(10)
  }
  stopifnot(identical(m$data, "done"))
}
count
#> 64
m$data
#> 'errorValue' int 7 | Object closed
daemons(n = 0L)

WISH: Pass an expression to '.expr' as-is to mirai()

Consider the following example:

a <- 3
b <- 4
m <- mirai(2 * a + b, .args = list(a, b))

Issue

Now, assume that I have that R expression and a list of variables as standalone objects, e.g.

.expr <- quote(2 * a + b)
a <- 3
b <- 4
globals <- list(a = a, b = b)

To use this setup with mirai(), I need to do something like:

.expr2 <- bquote({
  local({
    envir <- parent.env(environment())
    for (name in names(globals)) {
      assign(name, value = globals[[name]], envir = envir)
    }
  })
  .(.expr)
})

call_expr <- bquote(mirai(.(.expr2), .args = list(globals)))
m <- eval(call_expr)

Wish

If there would be an option to not substitute() the .expr argument, then I could instead do:

m <- do.call(mirai, args = c(list(.expr), globals, .substitute = FALSE))

Question about mirai vs callr

Hi, I'd like to know the following:

  • is mirai any different from callr, or do they roughly do something similar under the hood?
  • are there advantages in using mirai vs callr?

thanks for your help

Load-testing with TLS

In wlandau/crew@686f86b, I implemented an interface in crew to leverage the new TLS capabilities in nanonext and mirai. It's really simple on my end, which is great. But I do notice some new problems in load-testing. When I tried the test below (also at https://github.com/wlandau/crew/blob/main/tests/mirai/test-tls-max_tasks.R) using mirai 0.9.0.9013 and nanonext 0.9.0.29, the task loop hangs, and eventually daemons() times out. In addition, daemons(0L) did not terminate the dispatcher or servers.

library(crew)
library(mirai)

# Implements throttling to avoid overburdening the {mirai} dispatcher.
throttler <- crew::crew_schedule()

# Efficient and convenient data structure to keep track of {mirai} tasks.
schedule <- crew::crew_schedule()
schedule$start()

# Start the {mirai} client.
n <- 20L
mirai::daemons(
  n = n,
  url = "wss://127.0.0.1:5000",
  dispatcher = TRUE,
  token = TRUE
)

# Mutable structure with {crew} worker info. This is the primary
# data structure of each {crew} launcher.
workers <- new.env(parent = emptyenv()) # For mutability.
workers$workers <- tibble::tibble(
  handle = replicate(n, new.env(), simplify = FALSE), # callr::r_bg() handles
  socket = environment(mirai::daemons)$..$default$urls, # starting URLs
  launches = rep(0L, n), # number of times a worker was launched at this index
  launched = rep(FALSE, n), # FALSE if the worker is definitely done.
  assigned = rep(0L, n), # Cumulative "assigned" stat to check backlog (#79).
  complete = rep(0L, n) # Cumulative "complete" stat to check backlog (#79).
)

# For {mirai} servers with online == 0L and instance == 1L,
# rotate the websocket URL. Also set workers$launched to FALSE,
# which signals that tally() can safely update the cumulative
# "assigned" and "complete" statistics (#79).
rotate <- function(workers) {
  info <- mirai::daemons()$daemons
  done <- which(info[, "online"] < 1L & info[, "instance"] > 0L)
  for (index in done) {
    socket <- mirai::saisei(i = index, force = FALSE)
    if (!is.null(socket)) {
      workers$workers$socket[index] <- socket # Next launch is at this URL.
      workers$workers$launched[index] <- FALSE # Lets tally() update stats.
    }
  }
}

# For workers that are definitely done and not going to dial in until the
# next launch, update the cumulative "assigned" and "complete" which {crew}
# uses to detect backlogged workers (#79). A backlogged worker is a {mirai}
# server with more assigned than complete tasks. Detecting the backlog
# is important because if a worker is disconnected and backlogged,
# then {crew} will need to relaunch it so the backlogged tasks can run.
tally <- function(workers) {
  info <- mirai::daemons()$daemons
  index <- !(workers$workers$launched) # Workers safe to update.
  workers$workers$assigned[index] <- as.integer(info[index, "assigned"])
  workers$workers$complete[index] <- as.integer(info[index, "complete"])
  invisible()
}

# In {crew}, the scale() method of the launcher class
# re-launches all backlogged non-launched workers,
# and then it may launch additional non-launched workers
# in order to meet the demand of the task load.
# The scale() function below is a simplified version which launches
# all non-launched workers.
scale <- function(workers) {
  for (index in which(!workers$workers$launched)) { # non-launched workers
    # I would have used mirai::launch_server() here, but callr::r_bg()
    # allows me to manually terminate the server without calling
    # mirai::daemons(n = 0L). This is important for updating the final
    # assigned and complete tallies later on.
    workers$workers$handle[[index]] <- callr::r_bg(
      func = function(url, tls) {
        mirai::server(
          url = url,
          tls = tls,
          maxtasks = 100L
        )
      },
      args = list(
        url = workers$workers$socket[index],
        tls = environment(mirai::daemons)$..$default$tls$client
      )
    )
    # Increment the launch count.
    workers$workers$launches[index] <- workers$workers$launches[index] + 1L
    # Signal to tally() to wait for this worker to complete
    # instead of updating the cumulative assigned and complete stats.
    workers$workers$launched[index] <- TRUE
  }
}

index <- 0L # current task
n_tasks <- 6000L # all tasks
results <- list()
while (index < n_tasks || schedule$nonempty()) { # while there is work to do
  if (!throttler$throttle()) { # avoid overburdening the {mirai} dispatcher
    rotate(workers) # Rotate the URLs of done workers.
    tally(workers) # Update the cumulative stats for done workers.
    scale(workers) # Re-launch all the done workers.
  }
  # If there are still tasks to launch, launch one.
  if (index < n_tasks) {
    index <- index + 1L
    cat("push", index, "\n")
    task <- mirai(index, index = index)
    # The "schedule" is nothing fancy for the purposes of #88 and #89,
    # it is just a fast data structure for bookkeeping {mirai} objects
    # without the other frills in {crew}.
    schedule$push(task)
  }
  # Try to process the results of finished tasks.
  if (schedule$nonempty()) { # If there are still tasks to process...
    # Call nanonext::.unresolved() and move resolved tasks
    # from the hash table in schedule$pushed to the first-in/first-out
    # linked list in schedule$collected.
    schedule$collect()
    task <- schedule$pop() # Return a task that was resolved and collected.
    # pop() returns NULL if there is no resolved/collected task.
    if (!is.null(task)) {
      data <- task$data
      results[[data]] <- data
      cat("pop", data, "\n")
    }
  }
}

# Terminate the dispatcher.
daemons(n = 0L)

# Manually terminate any remaining workers.
for (handle in workers$workers$handle) {
  if (inherits(handle, "r_process") && handle$is_alive()) {
    handle$kill()
  }
}

# Check the results.
all(sort(as.integer(unlist(results))) == seq_len(n_tasks))

The simple persistent-worker load test below (also at https://github.com/wlandau/crew/blob/main/tests/mirai/test-tls-persistent.R) behaves a bit better. The tasks seem to complete, though they do take much longer to collect (probably because of encryption, right?). But neither daemons(0L) nor quitting the R session terminates the dispatcher or servers.

library(crew)
library(mirai)

# Efficient and convenient data structure to keep track of {mirai} tasks.
# It has a hash table for new tasks and a first-in/first-out linked list
# for resolved tasks. It calls nanonext::.unresolved() to collect resolved
# tasks, but otherwise it does not rely on {mirai}/{nanonext}. I highly doubt
# it is the source of the {crew} bugs in #88 or #89.
schedule <- crew::crew_schedule()
schedule$start()

# Start the {mirai} client and servers with TLS.
daemons(n = 20L, url = "wss://127.0.0.1:0", dispatcher = TRUE, token = TRUE)
codes <- lapply(environment(daemons)$..$default$urls, launch_server)

# Run the tasks.
index <- 0L # current task
n_tasks <- 6000L # all tasks
results <- list()
while (index < n_tasks || schedule$nonempty()) { # while there is work to do
  # If there are still tasks to launch, launch one.
  if (index < n_tasks) {
    index <- index + 1L
    cat("push", index, "\n")
    task <- mirai(index, index = index)
    # The "schedule" is nothing fancy for the purposes of #88 and #89,
    # it is just a fast data structure for bookkeeping {mirai} objects
    # without the other frills in {crew}.
    schedule$push(task)
  }
  # Try to process the results of finished tasks.
  if (schedule$nonempty()) { # If there are still tasks to process...
    # Call nanonext::.unresolved() and move resolved tasks
    # from the hash table in schedule$pushed to the first-in/first-out
    # linked list in schedule$collected.
    schedule$collect()
    task <- schedule$pop() # Return a task that was resolved and collected.
    # pop() returns NULL if there is no resolved/collected task.
    if (!is.null(task)) {
      data <- task$data
      results[[data]] <- data
      cat("pop", data, "\n")
    }
  }
}

# Should be TRUE
all(sort(unlist(results)) == seq_len(n_tasks))

# Clean up the dispatcher.
daemons(n = 0L)

Hanging tasks in a {targets} pipeline an SGE cluster

This issue presents the same as #53, but it emulates a real-life example on a machine with plenty of resources.

I am running crew and crew.cluster in a targets pipeline. I am using development versions of all the packages:

The pipeline has 2001 lightweight tasks. There are 20 workers which run on a powerful SGE cluster and each have 2GB memory at minimum. The login node where the dispatcher and client run has 263 GB total memory. In the _targets.R file, I launch all 20 workers up front, but they can still scale down and up because I set seconds_idle = 5 (and auto_scale = "demand" by default). I include the _targets.R which defines the pipeline below, and it is representative of the types of simulation studies my team and I do with targets. (https://books.ropensci.org/targets/walkthrough.html explains _targets.R file basics, and https://books.ropensci.org/targets/crew.html explains integration with crew.)

library(targets)
controller <- crew.cluster::crew_controller_sge(
  seconds_launch = 120,
  workers = 20L,
  sge_memory_gigabytes_required = 2L,
  seconds_idle = 5,
  seconds_exit = 5,
  script_lines = paste0("module load R/", getRversion())
)
controller$start()
controller$launch(n = 20L)
tar_option_set(controller = controller)
list(
  tar_target(x, seq_len(2000)),
  tar_target(y, tibble::tibble(x = x, a = 1, b = 2, c = "3"), pattern = map(x))
)

When I run the pipeline with targets::tar_make(callr_function = NULL, terminate = FALSE), at first I see tasks resolve quickly at a rate of several per second, but then the pipeline hangs (roughly 25% to 30% of the time). targets continues its event loop, which I know because crew workers continue to relaunch after they idle out, but the tasks themselves do not complete. During the whole process, I see MEM% in htop stay at or below 0.1 for both the client R session and the dispatcher.

When I examine the controller in the R session that ran tar_make(), here is what I see. Multiple workers have tasks assigned but not complete in the hanging state. All the workers show as offline because I am printing this after they idled out, but at the time the hanging started, 2 of the workers were still running (I saw the jobs on the SGE cluster with qstat).

> controller <- tar_option_get("controller")
> controller$router$poll()
> tibble::tibble(controller$router$daemons) # 

# A tibble: 20 × 4
   online instance assigned complete
    <int>    <int>    <int>    <int>
 1      0        1        0        0
 2      0        1        0        0
 3      0        1        0        0
 4      0        1        0        0
 5      0        1        0        0
 6      0        1        0        0
 7      0        1        1        0
 8      0        1        0        0
 9      0        1        2        1
10      0        1        1        0
11      0        1        0        0
12      0        1        0        0
13      0        1        0        0
14      0        1        0        0
15      0        1        2        1
16      0        1        0        0
17      0        1        0        0
18      0        1        0        0
19      0        1        2        1
20      0        1        0        0

Many workers launched more than once because the idle timeout was aggressive and the tasks were quick. This in itself is not concerning, but it is part of the picture.

> controller$launcher$workers[, c("socket", "launches")]
# A tibble: 20 × 2
   socket                                                             launches
   <chr>                                                                 <int>
 1 ws://40.1.29.176:37387/6ba3c307a158262a0efe51a75674f6e91996d0bb           5
 2 ws://40.1.29.176:37387/2/5808b07413249336ab45df960bf15263e0347cb1         9
 3 ws://40.1.29.176:37387/3/cd16738d8dc9c4f89855626800d4afecdbeba851         9
 4 ws://40.1.29.176:37387/4/5f7c04c16f7be62ad3f47e8b2366dd2284f46bac         9
 5 ws://40.1.29.176:37387/5/ad8198fc0794d43faf0821ada2093b8d00c85588         9
 6 ws://40.1.29.176:37387/6/7f5f0522c0e753c00351a155b2c88d56fa7710bb         4
 7 ws://40.1.29.176:37387/7/10931a6c1db602731a2de242c4d96bb4a6c19ed8         3
 8 ws://40.1.29.176:37387/8/03870112255ec377a1ea052c63adb20e9b39b364         1
 9 ws://40.1.29.176:37387/9/58e2c309221f0b75455b04a764dae7f534e80f89         1
10 ws://40.1.29.176:37387/10/60c3c3272b102ac80ea6390c2e37fc65bbbe86ff        1
11 ws://40.1.29.176:37387/11/f42555c1c471dea1a884717ac35a9d9beea5ffe2        1
12 ws://40.1.29.176:37387/12/8d7d4f9460f70b9bcb6f6af8d6da1176aacd0f8b        1
13 ws://40.1.29.176:37387/13/340a2ab8590f67b1d5680c80ce0c348dbd2db42b        1
14 ws://40.1.29.176:37387/14/6fe3c24631a20689306cc2f0d1b00f9b264c7525        1
15 ws://40.1.29.176:37387/15/ffc82e8933eecf5e520e2d7c18df14a091c63743        1
16 ws://40.1.29.176:37387/16/e881afe42fe95bde1568cdd0434dae3fd4cb210b        1
17 ws://40.1.29.176:37387/17/2543943f5a4e835e0aa3813474f5283ce2ec3252        1
18 ws://40.1.29.176:37387/18/b6c9284a9d8019329e3a83452f34585452a2fff3        1
19 ws://40.1.29.176:37387/19/c9886a9c227344b9a3db4be3f94b3f6681a7f9b1        1
20 ws://40.1.29.176:37387/20/2dfc44716baf37cd11842b99914bfd2857d5da42        1

And most importantly, there are 5 tasks stuck in an unresolved state.

> length(controller$queue)
[1] 5
> for (i in seq_len(5)) print(controller$queue[[i]]$handle[[1]]$data)
'unresolved' logi NA
'unresolved' logi NA
'unresolved' logi NA
'unresolved' logi NA
'unresolved' logi NA

In my experiments with this and similar pipelines, it seems like I can set auto_scale = "none" and still sometimes produce the hanging (while some workers are still running and should be churning through tasks). But the hanging does seem to stop if seconds_idle is very high, so maybe it has something to do with auto-scaling down. crew assumes workers and tasks can launch independently and asynchronously relative to one another.

I am sorry to keep bothering you about hanging tasks in my pipelines, and I am sorry I can't seem to reproduce the problem with mirai alone. Hopefully this example provides more clues and more ways we can troubleshoot.

Trouble passing arguments to daemons() which are forwarded to dispatcher()

I think it is because I am calling daemons() inside an R6 class and some of these forwarded arguments are fields of the object. I am fine to hard-code asyncdial = TRUE in crew, but I suspect a more general problem about environment scoping.

Using the asyncdial field, daemons() errors out:

class <- R6::R6Class(
  classname = "test",
  public = list(
    asyncdial = TRUE,
    listen_for_servers = function() {
      socket <- "ws://10.0.0.9:59604"
      mirai::daemons(
        url = socket,
        n = 1L,
        dispatcher = TRUE,
        asyncdial = self$asyncdial
      )
    },
    run_test = function() {
      self$listen_for_servers()
      Sys.sleep(2)
      px <- callr::r_bg(
        function(socket) mirai::server("ws://10.0.0.9:59604")
      )
      Sys.sleep(2)
      print(mirai::daemons())
      m <- mirai::mirai(ps::ps_pid())
      Sys.sleep(2)
      print(m$data)
      mirai::daemons(0L)
      invisible()
    }
  )
)
object <- class$new()
object$run_test()
#> Error in request_ack(sock) : 
#>   dispatcher process launch - timed out after 1s

Hard-coding asyncdial, everything works:

class <- R6::R6Class(
  classname = "test",
  public = list(
    asyncdial = TRUE,
    listen_for_servers = function() {
      socket <- "ws://10.0.0.9:59604"
      mirai::daemons(
        url = socket,
        n = 1L,
        dispatcher = TRUE
      )
    },
    run_test = function() {
      self$listen_for_servers()
      Sys.sleep(2)
      px <- callr::r_bg(
        function(socket) mirai::server("ws://10.0.0.9:59604")
      )
      Sys.sleep(2)
      print(mirai::daemons())
      m <- mirai::mirai(ps::ps_pid())
      Sys.sleep(2)
      print(m$data)
      mirai::daemons(0L)
      invisible()
    }
  )
)
object <- class$new()
object$run_test()
#> $connections
#> [1] 1
#> 
#> $daemons
#> status_online status_busy tasks_assigned tasks_complete instance #
#> ws://10.0.0.9:59604             1           0              0              0          1
#> 
#> [1] 55721

globals: How to use a function with a free variable that lives in the global environment?

Consider:

f <- function(x) a + x
a <- 1

This can be called as:

> f(2)
[1] 3

How can this be called via mirai()? The following attempt doesn't work:

> library(mirai)
> f <- function(x) a + x
> a <- 42
> m <- mirai(f(3), f = f, a = a)
> m$data
'miraiError' chr Error in f(3): object 'a' not found

Session info

> sessionInfo()
R version 4.2.3 (2023-03-15)
Platform: x86_64-pc-linux-gnu (64-bit)
Running under: Ubuntu 22.04.2 LTS

Matrix products: default
BLAS:   /home/henrik/shared/software/CBI/R-4.2.3-gcc11/lib/R/lib/libRblas.so
LAPACK: /home/henrik/shared/software/CBI/R-4.2.3-gcc11/lib/R/lib/libRlapack.so

locale:
 [1] LC_CTYPE=en_US.UTF-8       LC_NUMERIC=C              
 [3] LC_TIME=en_US.UTF-8        LC_COLLATE=en_US.UTF-8    
 [5] LC_MONETARY=en_US.UTF-8    LC_MESSAGES=en_US.UTF-8   
 [7] LC_PAPER=en_US.UTF-8       LC_NAME=C                 
 [9] LC_ADDRESS=C               LC_TELEPHONE=C            
[11] LC_MEASUREMENT=en_US.UTF-8 LC_IDENTIFICATION=C       

attached base packages:
[1] stats     graphics  grDevices utils     datasets  methods   base     

other attached packages:
[1] mirai_0.8.1.9005

loaded via a namespace (and not attached):
[1] compiler_4.2.3 nanonext_0.8.1

Daemons (background processes) persist after ending R session

It is currently documented for daemons:

It is highly recommended to shut down daemons by setting daemons(0) or explicitly unloading the package before exiting your R session. This will ensure that all processes exit cleanly and resources are freed.

This issue is now fixed in ea68890 and any daemons at the end of an R session will be shut down properly. This will be in future CRAN releases >= 0.4.1.

In the meantime, dev versions incorporating this fix may be installed using:
install.packages("mirai", repos = "https://shikokuchuo.r-universe.dev")

Crash detection

If a server crashes while running a task, is there a way to promptly know if the task is never going to complete? I tried the following steps on my SGE cluster. On a server node:

library(mirai)
server("tcp://CLIENT_IP:5555")

On the client with a different node and different IP than the server:

library(mirai)
daemons("tcp://CLIENT_IP:5555")
m <- mirai({Sys.sleep(10); "finished"})

Then before 10 seconds completed, I terminated the server process. On the client, the mirai object looks the same as when the job was running.

m
#> < mirai >
#>  - $data for evaluated result

m$data
#>  'unresolved' logi NA

m$aio
#> <pointer: 0x29fca20>
#> attr(,"ctx")
#> < nanoContext >
#>  - id: 1
#>  - socket: 2
#>  - state: opened
#>  - protocol: req

task does not complete if saisei() is called before launching the server

As I work on wlandau/crew#61, I notice that my tasks do not complete if the socket is switched with saisei() before the server connects.

library(mirai)
packageVersion("nanonext")
#> [1] ‘0.8.1.9006’
packageVersion("mirai")
#> [1] ‘0.8.2.9004’
packageDescription("mirai")$GithubSHA1
#> [1] "f74b480b1c7c33c700030ca1972aa9bee362cd8d"
daemons(n = 1L, url = "ws://127.0.0.1:5000", dispatcher = TRUE, token = TRUE)
#> [1] 1
task <- mirai(ps::ps_pid())
socket <- saisei(i = 1L)
px <- callr::r_bg(
  func = function() mirai::server(url = socket, asyncdial = FALSE),
  args = list(socket = socket)
)
Sys.sleep(10)
task$data
#> 'unresolved' logi NA
daemons(n = 0L)

Necessity of linger on exit for servers that time out

As servers have the option to time out or task-out after a set number of tasks, it would be ideal to exit the process immediately thereafter - however, at present, this is only possible after an 'exitlinger' period, which by default is set to 1s. This should be sufficient for sending objects of ~ 1GB in size.

What is currently not possible is for exit to be conditional upon the send being completed.

This is, I believe, due to:

  1. If no linger period is implemented in R, the interpreter thinks execution has ended and reaps all child threads even though the send is in progress asynchronously at the C level.
  2. C functions that are part of the NNG library do not help as sends are recorded as complete once the socket accepts the message for transport. That means that NNG's definition of a send being complete only means the responsibility is transferred to the system sockets. However this does not guarantee that the send actually completes if the process is reaped in the meantime.

It would be great if a solution can be found.

Automatic down-scaling in server()

For my distributed computing use case, automatic down-scaling would be extremely helpful in order to avoid excessive idle time and to proactively avoid hitting walltime limits on clusters. It would be extremely useful if server() could automatically terminate if

  1. If it has been idling for too long without receiving a task (idle timeout), or
  2. If the total runtime of the server reaches a limit and that server has no currently assigned jobs (soft walltime), or
  3. If it has already completed a given number of tasks (task limit).

It would be great to have new arguments to server() to set the max idle time, soft walltime limit, and max number of tasks.

saisei() returns sockets with literal port 0

As we previously discussed, a port of 0 in the url argument of daemons() should tell NNG to choose the port. But saisei() returns sockets with a literal port of 0 in the output websocket path. I think this may explain why I still see instances of #50 in crew.

library(mirai)
packageVersion("mirai")
#> [1] ‘0.8.2.9009’
daemons(
  n = 1L,
  url = "ws://127.0.0.1:0",
  dispatcher = TRUE,
  token = TRUE
)
#> [1] 1
daemons()$daemons
#> online instance assigned complete
#> ws://127.0.0.1:51358/7abbf05f04cb077a5e2f89fc0c1de673d1c0bae5      0        0        0        0
saisei(i = 1L)
#> [1] "ws://127.0.0.1:0/ca4f8b8c59f78ed827fd0771c099881439a5f373"

Some tasks go unnoticed in a simple case

I was revisiting https://github.com/wlandau/crew/blob/main/tests/throughput/test-persistent.R just now, and I noticed a task that did not seem to register with the dispatcher. It was easy to create a quick reproducible example of this behavior with just mirai:

library(mirai)
daemons(n = 4, url = "ws://127.0.0.1:5000", dispatcher = TRUE, token = TRUE)
urls <- environment(daemons)$..$default$urls
tasks1 <- replicate(5, mirai(TRUE))
launch_server(url = urls[1])
tasks2 <- replicate(5, mirai(TRUE))
Sys.sleep(1)
daemons()$daemons
daemons(0)

When I run the code above, daemons()$daemons tells me that only 7 of the 10 tasks were assigned and completed:

ws://127.0.0.1:5000/1/8b8c503e1c8cbcd077b1b79c7ac8bac55e5e9ced      1        1        7        7
ws://127.0.0.1:5000/2/5bd2d543258c4082e95f470da9aee91b95f4fa06      0        0        0        0
ws://127.0.0.1:5000/3/df141b21c0d541b49d663c6f7a8661a4ebbf1280      0        0        0        0
ws://127.0.0.1:5000/4/8bcff54b5c8e8ca6d917b72b2abc0c792b31fe22      0        0        0        0

Whenever I run this with starting with daemons(n = 4), the number of tasks assigned/completed is always 7. And interestingly, if n = 2, then assigned and completed both are 9. Trying a grid of n values from 1 to 10, I always see n + assigned = 11. For n > 10, I see assigned = 1 and completed = 1.

I tried mirai versions 0.8.7.9006, 0.8.7, and 0.8.4, all with the same result. nanonext is at 0.8.3.9001. Findings are the same on both my local Ubuntu machine and my Macbook. nanonext::nng_version() returns c("1.6.0pre", "mbed TLS 3.4.0") on both machines.

Trouble after update with custom TCP sockets

Version 0.7.2.9010 with custom TCP sockets looks promising. However, I am having a bit of trouble with the active queue after the update up to e9a6edd. On the client, I tried:

library(mirai)
daemons("tcp://127.0.0.1:5000", nodes = 1)

And with a server process on the same machine as the client:

library(mirai)
server("tcp://127.0.0.1:5000")

Then back on the client, I see:

daemons()
#> $connections
#> [1] 0
#> 
#> $daemons
#> [1] "remote"
#> 
#> $nodes
#> [1] 1

And a quick mirai() looks unresolved after about 10 seconds.

m <- mirai::mirai(ps::ps_pid())
Sys.sleep(10)
m$data
#> 'unresolved' logi NA

Warning message: In listen(sock, url = listen, autostart = autostart) : 9 | Not supported

Hi!

I am trying to run the following example:

library(mirai)
# Only run examples in interactive R sessions

m <- mirai(x + y + 1, x = 2, y = 3)
m
m$data
Sys.sleep(0.2)
m$data

m <- mirai(as.matrix(df), df = data.frame())
call_mirai(m)$data

but my mirai$data never changes. When running the first line, I receive the warning: Warning message: In listen(sock, url = listen, autostart = autostart) : 9 | Not supported.

If I try call_mirai(m)$data the R session keeps busy and eventually crashes.

Here is my session info:

R version 4.1.2 (2021-11-01)
Platform: x86_64-pc-linux-gnu (64-bit)
Running under: Ubuntu 20.04.4 LTS

Matrix products: default
BLAS:   /usr/lib/x86_64-linux-gnu/blas/libblas.so.3.9.0
LAPACK: /usr/lib/x86_64-linux-gnu/lapack/liblapack.so.3.9.0

locale:
 [1] LC_CTYPE=pt_BR.UTF-8       LC_NUMERIC=C               LC_TIME=pt_BR.UTF-8        LC_COLLATE=pt_BR.UTF-8     LC_MONETARY=pt_BR.UTF-8   
 [6] LC_MESSAGES=pt_BR.UTF-8    LC_PAPER=pt_BR.UTF-8       LC_NAME=C                  LC_ADDRESS=C               LC_TELEPHONE=C            
[11] LC_MEASUREMENT=pt_BR.UTF-8 LC_IDENTIFICATION=C       

attached base packages:
[1] stats     graphics  grDevices utils     datasets  methods   base     

other attached packages:
[1] mirai_0.4.1

loaded via a namespace (and not attached):
[1] compiler_4.1.2 tools_4.1.2    nanonext_0.5.0

I had to install the libnng-dev with sudo apt-get install libnng-dev before installing mirai.

Typo in ?daemons

Should "2 remote servers" in:

mirai/R/mirai.R

Lines 663 to 664 in 3999904

#' # 2 remote servers via dispatcher (using zero wildcard)
#' daemons(2, url = "ws://:0")

be "2 remote daemons"?

Environment subscripting error with single-task daemons

I have been trying to troubleshoot wlandau/crew#51, and I ran across an issue where workers with maxtasks = 1 sometimes return tasks showing "Error in envir[[\".expr\"]]: subscript out of bounds". Here is a reproducible example. I ran mirai 0.8.1.9003 with nanonext 0.8.0.9001 on R 4.2.1 on an Ubuntu machine.

library(mirai)
library(nanonext)
daemons(n = 1L, url = "ws://127.0.0.1:5000")
tasks <- lapply(seq_len(100L), function(x) {
  mirai(x, x = x)
})
results <- list()
px <- NULL
launches <- 0L
while(length(results) < 100L) {
  if (is.null(px) || !px$is_alive()) {
    px <- callr::r_bg(function() {
      mirai::server("ws://127.0.0.1:5000", maxtasks = 1L)
    })
    launches <- launches + 1L
  }
  done <- integer(0L)
  for (i in seq_along(tasks)) {
    if (!.unresolved(tasks[[i]])) {
      done <- c(done, i)
      results[[length(results) + 1L]] <- tasks[[i]]
    }
  }
  tasks[done] <- NULL
}
print(launches)
#> [1] 100
data <- as.character(lapply(results, function(x) x$data))
print(data)
#> [1] "1"                                                   
#> [2] "2"                                                   
#> [3] "3"                                                   
#> [4] "4"                                                   
#> [5] "5"                                                   
#> [6] "6"                                                   
#> [7] "7"                                                   
#> [8] "Error in envir[[\".expr\"]]: subscript out of bounds"
#> ...
sum(grepl("^Error", data))
#> [1] 24
daemons(0L)

Informative error messages

If I call daemons(0) while servers on the connection are running, server() correctly exits, but the error message is strange:

attempt to bind a variable to R_UnboundValue

Other times I get:

Error in server("tcp://xx.xx.xx.138:5555") : object 'envir' not found

It's a minor thing, but more descriptive error messages would help me troubleshoot complicated user errors on my end.

WISH: Support passing name-value pairs via '.args' of mirai()

Picking up on the side-discussion on argument .args that started in #49 (comment).

As it stands now (mirai 0.8.2.9036), the .args argument of mirai() takes a vector of symbols as input. The following are all working examples:

m <- mirai::mirai(2 * a + b, .args = list(a, b))
m$data
## [1] 10
m <- mirai::mirai(2 * a + b, .args = c(a, b))
m$data
## [1] 10
m <- mirai::mirai(2 * a + b, .args = data.frame(a, b))
m$data
## [1] 10

Issues

Too broad acceptance

?mirai::mirai says that .args takes an "(optional) list of objects". The above .args = c(a, b) example does not use a list. One can also debate whether .args = data.frame(a, b) should work (although technically it's also a list).

No validation of '.args'

There is no up-front validation of what is passed to .args. Instead, mis-specification of .args results in run-time errors from worker failing to evaluate the expression. For example,

m <- mirai::mirai(2 * a + b, .args = c("a", "b"))
m$data
## 'miraiError' chr Error in 2 * a: non-numeric argument to binary operator
m <- mirai::mirai(2 * a + b, .args = alist(a, b))
m$data
## 'miraiError' chr Error in 2 * a: non-numeric argument to binary operator
m <- mirai::mirai(2 * a + b, .args = 42)
m$data
'miraiError' chr Error in eval(expr = envir[[".expr"]], envir = envir, enclos = NULL): object 'a' not found

No support for name-value pairs

I think there will be a lot of users that will expect being able to do:

m <- mirai::mirai(2 * a + b, .args = list(a = 3, b = 4))
m$data
'miraiError' chr Error in eval(expr = envir[[".expr"]], envir = envir, enclos = NULL): object 'a' not found

This will allow users to write more clear code. Currently, anyone who wishes to pass a named list of values has to resort to do.call(), which counter to the "neat API" that mirai tries to support. For example, as it stands now, we have to write code like:

expr <- quote(2 * a + b)
globals <- list(a = 3, b = 4)
m <- do.call(mirai, c(list(expr), globals))

instead of a much cleaner and less obscure:

expr <- quote(2 * a + b)
globals <- list(a = 3, b = 4)
m <- mirai(expr, .args = globals)

Suggestion

  1. Redesign current behavior to support only the .args = c(a, b) form.

  2. Add support for specifying name-value elements, i.e. .args = list(a = 3, b = 4).

I think those will be the most common use cases. Using c() for one and list() for the other could help distinguish the two. Of course, both forms could be handled using list() too.

FWIW, the fact that c(a, b) uses symbols has the advantage that static-code inspection (e.g. codetools validation by R CMD check) can pick up typos. In contrast, .args = c("a", "btypo") won't be detected until run-time.

Trouble with timerstart and the local active queue daemon exiting early

After 284104c, I noticed odd things happening during and after experimenting with timerstart. If I start a client on my local Macbook with:

daemons("ws://127.0.0.1:5000", nodes = 1)

And then call a server:

Rscript -e 'mirai::server("ws://127.0.0.1:5000", walltime = 1000, timerstart = 1L)'

then mirais have trouble evaluating even as the server stays online.

# client
m <- mirai(1)
Sys.sleep(10)
m$data
#> 'unresolved' logi NA

The error persists if I set timerstart to 0L, 1L, or 2L. Then, when I terminate all my R processes and then start a new mirai::daemons("ws://127.0.0.1", nodes = 1) client, the local active queue daemon exits within about a second and daemons()$nodes returns "'errorValue' int 5 | Timed out". after restarting my computer, the local active queue daemon does not start at all and I see the same "'errorValue' int 5 | Timed out".

segfault in Rscript

Hi there! I've been trying to test out mirai non-interactively but have been running into a segfault about memory mapping

I can reproduce this error pretty handidly:

Rscript -e "library(mirai); x <- mirai ({1}); print(x)"

As an aside, I just wanted to say thanks for expanding the support for async capabilities in R.

Question about serializable objects and blocking

Two questions: Does Mirai have the same limitations as callr and future in that many things like DB connections are unserializable and therefore a new connection has to be established inside every async code block? And is mirai self-blocking if used in a Shiny app, or can it be used for true same-session non-blocking async code?

"Warning: stack imbalance" when calling server() inside a function

After mirai 0.8.7.9025 (which I am almost certain fixes wlandau/crew#88) I started noticing instances of a stack imbalance warning when I call server() from inside another function (which happens in crew). Oddly, it only seems to happen on Linux machines (RHEL and Ubuntu), and it does not seem to interfere with task completion. Reprex:

library(mirai)
packageVersion("mirai")
#> [1] ‘0.8.7.9025’
mirai::daemons(
  n = 1L,
  url = "ws://127.0.0.1:0",
  dispatcher = TRUE,
  token = TRUE
)
#> [1] 1
task <- mirai::mirai(TRUE)
url <- rownames(daemons()$daemons)[1]
run_server <- function(url) {
  mirai::server(url = url, maxtasks = 1L, cleanup = 0L)
}
run_server(url)
#> Warning: stack imbalance in '{', 5 then 4
nanonext::msleep(100)
task$data
#> [1] TRUE

daemons() delay and timeout

A very minor note, but in mirai 0.7.2.9012, I notice the following behavior of daemons() with the active queue:

library(mirai)
daemons("tcp://127.0.0.1:5000", nodes = 1)
system.time(poll <- daemons()) # 1s delay
#>    user  system elapsed 
#>   0.001   0.000   1.000 
poll
#> $connections
#> [1] 1
#> 
#> $daemons
#> [1] "remote"
#> 
#> $nodes
#> 'errorValue' int 5 | Timed out

The 1s delay is just a little odd, and I would expect daemons()$nodes to be a empty, e.g. a (named) numeric of length 0. The delay vanishes as soon as I launch a server that dials into the client.

I am super excited that $nodes now returns all the sockets and their statuses! That really allows for the autoscaling I plan to do!

Working with Shiny: How to Make Mirai work with ObserveEvent ?

hi @shikokuchuo
I am trying to make mirai work inside a Shiny application. SO far so good when it comes to running a mirai subprocess, however I am having issues to "detect" when the mirai process is actually finished with an observeEvent.

To make things simple I am doing the following:

reactive_values$mirai_process_val <- eval_mirai(<script>)

and in the server part of Shiny:

observeEvent(reactive_values$mirai_process_val, {if (!unresolved(reactive_values$mirai_process_val){<series_of_actions>}})

What works:

  • when the mirai process starts, the reactive value is changed as expected, and I can see the ObserveEvent working to check for the unresolved value.
  • However, it seems that when the mirai process is resolved, there does not seem to be a signal coming back that is picked up by the ObserveEvent.

My guess is that there is only the $data part of the mirai object that is updated, and therefore it is not being picked up as a new value by ObserveEvent.

Any suggestion on how to resolve this?

CRAN 0.8.0 release changes

Just giving notice that I am looking at the interface and parameter names prior to launch of the next CRAN release. It is just the UI that I want to optimise - all existing functionality will be preserved.

So far, may just be limited to daemons() - perhaps going back to the

daemons(n, url, q = TRUE)

type of interface. There is no real need to pass through arguments for either local or remote daemons and I don't think this is intuitive at the moment.

@wlandau I guess a CRAN release will also help you make crew available to a wider audience for testing.

Hanging tasks on Github Actions Ubuntu Runners (R CMD Check)

As you know, I have been struggling with the final stages of ropensci/targets#1044, which integrates crew into targets. targets encodes instructions in special classed environments which govern the behavior of tasks and data. In R CMD check on GitHub Actions Ubuntu runners, when many of these objects are sent to and from mirai() tasks, the overall work stalls and times out. It only happens on GitHub Actions Ubuntu runners (probably Windows too, but it didn't seem worth checking), and it only happens inside R CMD check.

After about a week of difficult troubleshooting, I managed to reproduce the same kind of stalling using just mirai and nanonext. I have one example with 1000 tasks at https://github.com/wlandau/mirai/blob/reprex/tests/test.R, and I have another example at https://github.com/wlandau/mirai/blob/reprex2/tests/test.R which has 300 tasks and uses callr to launch the server process. In the first example, you can see time stamps starting at https://github.com/wlandau/mirai/actions/runs/4670004460/jobs/8269199542#step:9:105. The tasks get submitted within about a 20-second window, then something appears to freeze, and then the 5-minute timeout is reached. In the second example, the timestamps at https://github.com/wlandau/mirai/actions/runs/4670012640/jobs/8269219432#step:9:99 show activity within the first 8 seconds, and only 5 of the 300 tasks run within the full 5 minutes. (I know you have a preference against callr, but it was hard to find ways to get this problem to reproduce, and I think mirai servers can be expected to work if launched from callr::r_bg().)

Sorry I have not been able to do more to isolate the problem. I still do not understand why it happens, and I was barely able to create examples that do not use targets or crew. I hope this much is helpful.

Future Development Crew / Mirai

Not to propose a new direction in development, but this does get me thinking big-picture about the mirai/crew integration. mirai has a dispatcher as a separate process which frees the user session. But in crew, because of auto-scaling and transient workers, the main R process needs to be an event loop that continuously calls push(), pop(), or wait() (similar to Gabor's task queue idea for callr). The crew event loop is even more important now that there is throttling. All this makes me wonder if crew itself could manually run an iteration of dispatch each time e.g. pop() is called, rather than overburdening the mirai dispatcher. It might not be feasible due to the scope and intent of mirai, but I keep wondering about this.

Originally posted by @wlandau in wlandau/crew#76 (comment)

Active queue daemons continue to run unless servers are connected first

On the client on my local Macbook, I watched htop as I ran the following.

library(mirai)
daemons("tcp://127.0.0.1:45000", nodes = 1, .compute = "p1")
#> [1] "remote"
daemons("tcp://127.0.0.1:45500", nodes = 2, .compute = "p2")
#> [1] "remote"
daemons(0, .compute = "p1")
#> [1] 0
daemons(0, .compute = "p2")
#> [1] 0

Each time called I daemons() with a socket, I noticed a new R process (for the active queue daemon, right?). If I don't connect any servers and instead call daemons(0) on the compute profile, those active queue processes kept running.

It was also interesting and helpful to see that each compute profile created its own daemon to run its own active queue. I don't anticipate having very many compute profiles, so I can work with that.

Edge case when mirai is submitted with no server connected

When I submit a mirai without disconnecting a server first, I run into errors polling the nodes. Then if I restart my R session, I see a segfault and the active queue daemon keeps running in the background. Things start working again as soon as I manually terminate the local active daemon process.

On my end, I will implement safeguards in crew to only launch a mirais when there is at least one server connected (the server could be busy though). Is there a way for mirai to make sure active queue daemons are cleaned up in cases like this one?

> library(mirai)
> daemons("tcp://127.0.0.1:5000", nodes = 1)
[1] 1
> daemons()
$connections
[1] 1

$daemons
[1] 1

$nodes
tcp://127.0.0.1:5000 
                   0 

> m <- mirai(1)

> daemons()
$connections
[1] 1

$daemons
[1] 1

$nodes
'errorValue' int 5 | Timed out

RFC - User Interface

Now that a 'mirai' is self-resolving, the original pairing of eval_mirai() with call_mirai() is no longer required.

It seems that naturally a minimal mirai() can be the main function of the package. eval_mirai() can be retained as an alias.

To balance things, tirer() can be an alias for call_mirai(). This is French for 'pull' and has the same word length as 'mirai'. It has further connotations which fits well with the actual meaning here with no real equivalent in English.

Likely implementation in v0.2.0. Comments welcome.

Multiple client sockets

I am developing a branch of crew based on https://github.com/shikokuchuo/mirai#example-1-connecting-to-remote-servers--remote-server-queues, and I am really optimistic about how it will turn out. Currently, however, building on mirai instead of rrq makes it difficult to implement heterogeneous workers to accommodate tasks with varying resource requirements (wlandau/crew#23).

Would you be willing to consider support for multiple concurrent client sockets instead of just the current one stored in environment(daemons)$sock? I would be fine with managing multiple socket objects and supplying the right ones to mirai() and daemons() as needed.

Can Shiny Use Mirai to Take Advantage of Multiple Cores

Suppose there is a busy site that executes a 5s calculation when a user clicks a button. We know that R is single threaded, so if it's busy working on user_1's calculation, user_2 must wait.

I'm wondering whether mirai will automatically distribute each individual users calculation over the number of cores that a machine has so as to minimize wait time.

The calculations of each of user are independent from one another.

library(shiny)
library(mirai)

daemons(2)

ui <- fluidPage(
  
)

server <- function(input, output, session) {
  observeEvent(input$button, {
   
    call_mirai(
      takes_a_few_sec(a, b, c)
    ) 
  }
}

shinyApp(ui, server)

Load-testing with TLS on MacOS

Sporadic hangs seen in #64 (fixed) persist on MacOS under extreme load-testing, only for TLS (wss). It has been noted this may not present in more realistic usage.

To investigate why this platform behaves differently.

Servers do not connect if the initial websocket port is 0

Previously, if I set port 0 in a websocket, NNG automatically chooses a free port. With mirai 0.8.1.9002 and nanonext 0.8.0.9001, I am noticing that servers having trouble connecting.

With an initial port of 0:

library(mirai)
daemons(
  url = "ws://127.0.0.1:0",
  dispatcher = TRUE
)
daemons()
#> $connections
#> [1] 1
#> 
#> $daemons
 #>                     status_online status_busy tasks_assigned tasks_complete instance #
#> ws://127.0.0.1:59603             0           0              0              0          0
socket <- "ws://127.0.0.1:59603"
px <- callr::r_bg(
  function(socket) mirai::server(socket),
  args = list(socket = socket)
)
Sys.sleep(2)
daemons()
#> $connections
#> [1] 1
#> 
#> $daemons
 #>                     status_online status_busy tasks_assigned tasks_complete instance #
#> ws://127.0.0.1:59603             0           0              0              0          0
m <- mirai(ps::ps_pid())
Sys.sleep(2)
m$data
#> 'unresolved' logi NA
daemons(0)

If I explicitly set a port instead of deferring to NNG:

library(mirai)
daemons(
  url = "ws://127.0.0.1:59603",
  dispatcher = TRUE
)
daemons()
#> $connections
#> [1] 1
#> 
#> $daemons
#> status_online status_busy tasks_assigned tasks_complete instance #
#> ws://127.0.0.1:59603             0           0              0              0          0
socket <- "ws://127.0.0.1:59603"
px <- callr::r_bg(
  function(socket) mirai::server(socket),
  args = list(socket = socket)
)
Sys.sleep(2)
daemons()
#> $connections
#> [1] 1
#> 
#> $daemons
#> status_online status_busy tasks_assigned tasks_complete instance #
#> ws://127.0.0.1:59603             1           0              0              0          1
m <- mirai(ps::ps_pid())
Sys.sleep(2)
m$data
#> [1] 40354
daemons(0)

Return the PID from launch()

As we discussed before, it would be nice if launch() returned the PID of the server process it launches. That way, I can replace callr::r_bg() with mirai::launch() in crew, which would allow me to drop a package dependency.

By the way, do you know if NNG has a replacement for getip::getip(type = "local")? If I could drop the getip package too, then crew would be just about as light as possible.

"'errorValue' int 5 | Timed out" on successive calls to daemons()

Re wlandau/crew#61 and #47, I am able to reproduce some of my difficulties using just mirai. The following result happens both in RStudio and the terminal on my Macbook.

library(mirai)
library(nanonext)
daemons(n = 1L, url = "ws://127.0.0.1:5000", dispatcher = TRUE, token = TRUE)
count <- 0L
while (TRUE) {
  count <- count + 1L
  out <- daemons()$daemons
  if (!is.matrix(out)) {
    break;
  }
  msleep(10) # or msleep(1) or msleep(100)
}
count
#> [1] 516
out
#> 'errorValue' int 5 | Timed out

At this point, my results vary. Sometimes the dispatcher exits when I call daemons(n = 0L), sometimes it keeps running. Sometimes when I restart my R session in RStudio, the R console freezes or I get a crash. Other times, the restart succeeds and RStudio returns control to me at the R console.

unresolved() and data

This topic has been low on my priority list until now, and it is by no means urgent, but I would like to discuss.

crew keeps the mirai objects of all the current tasks, and it needs to know which tasks are complete in order to do auto-scaling. To make decisions about when to launch new workers, crew compares the number of active workers to the number of incomplete tasks. To count the number of incomplete tasks, I cannot use counters tasks_assigned or tasks_complete because they only count tasks that have been assigned to a worker (ignoring the unassigned backlog) and because they reset every time a new worker connects to the socket. So instead, crew calls unresolved() on each mirai() object in the task list before making a decision about how many workers to launch. These launching decisions happen frequently (every push() and pop(), as well as every fraction of a second in wait()), so I would strongly prefer unresolved() to be fast.

As you explained in an earlier thread, unresolved() downloads the data in order to check if the task is complete. For most cases this may not be much of an issue, but for large enough data, this could slow down crew because of what needs to happen for auto-scaling. Also, it risks running into a situation where we have large objects from multiple tasks simultaneously in memory, and memory may not be adequate to store everything in a single session.

Would it be possible to allow unresolved() to avoid downloading the data so it can be fast and lightweight for medium to large data? Here is what I see with mirai 0.8.1:

library(mirai)
library(lobstr)
daemons(n = 1L, url = "ws://127.0.0.1:5001", dispatcher = TRUE)
#> [1] 1
server <- callr::r_bg(function() mirai::server("ws://127.0.0.1:5001"))
daemons()
#> $connections
#> [1] 1
#> 
#> $daemons
#> status_online status_busy tasks_assigned tasks_complete instance #
#> ws://127.0.0.1:5001             1           0              0              0          1
m <- mirai(rnorm(1e8))
Sys.sleep(10)
obj_size(m)
#> 6.01 kB
system.time(available <- !unresolved(m))
#> user  system elapsed 
#> 1.572   0.500   2.078
available
#> [1] TRUE
obj_size(m)
#> 800.01 MB
system.time(unresolved(m))
#> user  system elapsed 
#> 0       0       0
system.time(size <- obj_size(m$data))
#> user  system elapsed 
#> 0       0       0 
size
#> 800.00 MB
daemons(n = 0L)
#> [1] 0
server$kill()

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.