Giter Site home page Giter Site logo

async-mutex's Introduction

Build status NPM version Coverage Status

What is it?

This package implements primitives for synchronizing asynchronous operations in Javascript.

Mutex

The term "mutex" usually refers to a data structure used to synchronize concurrent processes running on different threads. For example, before accessing a non-threadsafe resource, a thread will lock the mutex. This is guaranteed to block the thread until no other thread holds a lock on the mutex and thus enforces exclusive access to the resource. Once the operation is complete, the thread releases the lock, allowing other threads to acquire a lock and access the resource.

While Javascript is strictly single-threaded, the asynchronous nature of its execution model allows for race conditions that require similar synchronization primitives. Consider for example a library communicating with a web worker that needs to exchange several subsequent messages with the worker in order to achieve a task. As these messages are exchanged in an asynchronous manner, it is perfectly possible that the library is called again during this process. Depending on the way state is handled during the async process, this will lead to race conditions that are hard to fix and even harder to track down.

This library solves the problem by applying the concept of mutexes to Javascript. Locking the mutex will return a promise that resolves once the mutex becomes available. Once the async process is complete (usually taking multiple spins of the event loop), a callback supplied to the caller should be called in order to release the mutex, allowing the next scheduled worker to execute.

Semaphore

Imagine a situation where you need to control access to several instances of a shared resource. For example, you might want to distribute images between several worker processes that perform transformations, or you might want to create a web crawler that performs a defined number of requests in parallel.

A semaphore is a data structure that is initialized with an arbitrary integer value and that can be locked multiple times. As long as the semaphore value is positive, locking it will return the current value and the locking process will continue execution immediately; the semaphore will be decremented upon locking. Releasing the lock will increment the semaphore again.

Once the semaphore has reached zero, the next process that attempts to acquire a lock will be suspended until another process releases its lock and this increments the semaphore again.

This library provides a semaphore implementation for Javascript that is similar to the mutex implementation described above.

How to use it?

Installation

You can install the library into your project via npm

npm install async-mutex

The library is written in TypeScript and will work in any environment that supports ES5, ES6 promises and Array.isArray. On ancient browsers, a shim can be used (e.g. core-js). No external typings are required for using this library with TypeScript (version >= 2).

Starting with Node 12.16 and 13.7, native ES6 style imports are supported.

WARNING: Node 13 versions < 13.2.0 fail to import this package correctly. Node 12 and earlier are fine, as are newer versions of Node 13.

Importing

CommonJS:

var Mutex = require('async-mutex').Mutex;
var Semaphore = require('async-mutex').Semaphore;
var withTimeout = require('async-mutex').withTimeout;

ES6:

import {Mutex, Semaphore, withTimeout} from 'async-mutex';

TypeScript:

import {Mutex, MutexInterface, Semaphore, SemaphoreInterface, withTimeout} from 'async-mutex';

With the latest version of Node, native ES6 style imports are supported.

Mutex API

Creating

const mutex = new Mutex();

Create a new mutex.

Synchronized code execution

Promise style:

mutex
    .runExclusive(() => {
        // ...
    })
    .then((result) => {
        // ...
    });

async/await:

await mutex.runExclusive(async () => {
    // ...
});

runExclusive schedules the supplied callback to be run once the mutex is unlocked. The function may return a promise. Once the promise is resolved or rejected (or immediately after execution if an immediate value was returned), the mutex is released. runExclusive returns a promise that adopts the state of the function result.

The mutex is released and the result rejected if an exception occurs during execution of the callback.

Manual locking / releasing

Promise style:

mutex
    .acquire()
    .then(function(release) {
        // ...

        release();
    });

async/await:

const release = await mutex.acquire();
try {
    // ...
} finally {
    release();
}

acquire returns an (ES6) promise that will resolve as soon as the mutex is available. The promise resolves with a function release that must be called once the mutex should be released again. The release callback is idempotent.

IMPORTANT: Failure to call release will hold the mutex locked and will likely deadlock the application. Make sure to call release under all circumstances and handle exceptions accordingly.

Unscoped release

As an alternative to calling the release callback returned by acquire, the mutex can be released by calling release directly on it:

mutex.release();

Checking whether the mutex is locked

mutex.isLocked();

Cancelling pending locks

Pending locks can be cancelled by calling cancel() on the mutex. This will reject all pending locks with E_CANCELED:

Promise style:

import {E_CANCELED} from 'async-mutex';

mutex
    .runExclusive(() => {
        // ...
    })
    .then(() => {
        // ...
    })
    .catch(e => {
        if (e === E_CANCELED) {
            // ...
        }
    });

async/await:

import {E_CANCELED} from 'async-mutex';

try {
    await mutex.runExclusive(() => {
        // ...
    });
} catch (e) {
    if (e === E_CANCELED) {
        // ...
    }
}

This works with acquire, too: if acquire is used for locking, the resulting promise will reject with E_CANCELED.

The error that is thrown can be customized by passing a different error to the Mutex constructor:

const mutex = new Mutex(new Error('fancy custom error'));

Note that while all pending locks are cancelled, a currently held lock will not be revoked. In consequence, the mutex may not be available even after cancel() has been called.

Waiting until the mutex is available

You can wait until the mutex is available without locking it by calling waitForUnlock(). This will return a promise that resolve once the mutex can be acquired again. This operation will not lock the mutex, and there is no guarantee that the mutex will still be available once an async barrier has been encountered.

Promise style:

mutex
    .waitForUnlock()
    .then(() => {
        // ...
    });

Async/await:

await mutex.waitForUnlock();
// ...

Semaphore API

Creating

const semaphore = new Semaphore(initialValue);

Creates a new semaphore. initialValue is an arbitrary integer that defines the initial value of the semaphore.

Synchronized code execution

Promise style:

semaphore
    .runExclusive(function(value) {
        // ...
    })
    .then(function(result) {
        // ...
    });

async/await:

await semaphore.runExclusive(async (value) => {
    // ...
});

runExclusive schedules the supplied callback to be run once the semaphore is available. The callback will receive the current value of the semaphore as its argument. The function may return a promise. Once the promise is resolved or rejected (or immediately after execution if an immediate value was returned), the semaphore is released. runExclusive returns a promise that adopts the state of the function result.

The semaphore is released and the result rejected if an exception occurs during execution of the callback.

runExclusive accepts a first optional argument weight. Specifying a weight will decrement the semaphore by the specified value, and the callback will only be invoked once the semaphore's value greater or equal to weight.

runExclusive accepts a second optional argument priority. Specifying a greater value for priority tells the scheduler to run this task before other tasks. priority can be any real number. The default is zero.

Manual locking / releasing

Promise style:

semaphore
    .acquire()
    .then(function([value, release]) {
        // ...

        release();
    });

async/await:

const [value, release] = await semaphore.acquire();
try {
    // ...
} finally {
    release();
}

acquire returns an (ES6) promise that will resolve as soon as the semaphore is available. The promise resolves to an array with the first entry being the current value of the semaphore, and the second value a function that must be called to release the semaphore once the critical operation has completed. The release callback is idempotent.

IMPORTANT: Failure to call release will hold the semaphore locked and will likely deadlock the application. Make sure to call release under all circumstances and handle exceptions accordingly.

acquire accepts a first optional argument weight. Specifying a weight will decrement the semaphore by the specified value, and the semaphore will only be acquired once its value is greater or equal to weight.

acquire accepts a second optional argument priority. Specifying a greater value for priority tells the scheduler to release the semaphore to the caller before other callers. priority can be any real number. The default is zero.

Unscoped release

As an alternative to calling the release callback returned by acquire, the semaphore can be released by calling release directly on it:

semaphore.release();

release accepts an optional argument weight and increments the semaphore accordingly.

IMPORTANT: Releasing a previously acquired semaphore with the releaser that was returned by acquire will automatically increment the semaphore by the correct weight. If you release by calling the unscoped release you have to supply the correct weight yourself!

Getting the semaphore value

semaphore.getValue()

Checking whether the semaphore is locked

semaphore.isLocked();

The semaphore is considered to be locked if its value is either zero or negative.

Setting the semaphore value

The value of a semaphore can be set directly to a desired value. A positive value will cause the semaphore to schedule any pending waiters accordingly.

semaphore.setValue();

Cancelling pending locks

Pending locks can be cancelled by calling cancel() on the semaphore. This will reject all pending locks with E_CANCELED:

Promise style:

import {E_CANCELED} from 'async-mutex';

semaphore
    .runExclusive(() => {
        // ...
    })
    .then(() => {
        // ...
    })
    .catch(e => {
        if (e === E_CANCELED) {
            // ...
        }
    });

async/await:

import {E_CANCELED} from 'async-mutex';

try {
    await semaphore.runExclusive(() => {
        // ...
    });
} catch (e) {
    if (e === E_CANCELED) {
        // ...
    }
}

This works with acquire, too: if acquire is used for locking, the resulting promise will reject with E_CANCELED.

The error that is thrown can be customized by passing a different error to the Semaphore constructor:

const semaphore = new Semaphore(2, new Error('fancy custom error'));

Note that while all pending locks are cancelled, any currently held locks will not be revoked. In consequence, the semaphore may not be available even after cancel() has been called.

Waiting until the semaphore is available

You can wait until the semaphore is available without locking it by calling waitForUnlock(). This will return a promise that resolve once the semaphore can be acquired again. This operation will not lock the semaphore, and there is no guarantee that the semaphore will still be available once an async barrier has been encountered.

Promise style:

semaphore
    .waitForUnlock()
    .then(() => {
        // ...
    });

Async/await:

await semaphore.waitForUnlock();
// ...

waitForUnlock accepts optional arguments weight and priority. The promise will resolve as soon as it is possible to acquire the semaphore with the given weight and priority. Scheduled tasks with the greatest priority values execute first.

Limiting the time waiting for a mutex or semaphore to become available

Sometimes it is desirable to limit the time a program waits for a mutex or semaphore to become available. The withTimeout decorator can be applied to both semaphores and mutexes and changes the behavior of acquire and runExclusive accordingly.

import {withTimeout, E_TIMEOUT} from 'async-mutex';

const mutexWithTimeout = withTimeout(new Mutex(), 100);
const semaphoreWithTimeout = withTimeout(new Semaphore(5), 100);

The API of the decorated mutex or semaphore is unchanged.

The second argument of withTimeout is the timeout in milliseconds. After the timeout is exceeded, the promise returned by acquire and runExclusive will reject with E_TIMEOUT. The latter will not run the provided callback in case of an timeout.

The third argument of withTimeout is optional and can be used to customize the error with which the promise is rejected.

const mutexWithTimeout = withTimeout(new Mutex(), 100, new Error('new fancy error'));
const semaphoreWithTimeout = withTimeout(new Semaphore(5), 100, new Error('new fancy error'));

Failing early if the mutex or semaphore is not available

A shortcut exists for the case where you do not want to wait for a lock to be available at all. The tryAcquire decorator can be applied to both mutexes and semaphores and changes the behavior of acquire and runExclusive to immediately throw E_ALREADY_LOCKED if the mutex is not available.

Promise style:

import {tryAcquire, E_ALREADY_LOCKED} from 'async-mutex';

tryAcquire(semaphoreOrMutex)
    .runExclusive(() => {
        // ...
    })
    .then(() => {
        // ...
    })
    .catch(e => {
        if (e === E_ALREADY_LOCKED) {
            // ...
        }
    });

async/await:

import {tryAcquire, E_ALREADY_LOCKED} from 'async-mutex';

try {
    await tryAcquire(semaphoreOrMutex).runExclusive(() => {
        // ...
    });
} catch (e) {
    if (e === E_ALREADY_LOCKED) {
        // ...
    }
}

Again, the error can be customized by providing a custom error as second argument to tryAcquire.

tryAcquire(semaphoreOrMutex, new Error('new fancy error'))
    .runExclusive(() => {
        // ...
    });

License

Feel free to use this library under the conditions of the MIT license.

async-mutex's People

Contributors

0xflotus avatar akatquas avatar aryzing avatar cantoine avatar davideviolante avatar dirtyhairy avatar dmurvihill avatar hmil avatar jasongore avatar jmounier avatar josemiguelmelo avatar kronthto avatar marco-a avatar meirionhughes avatar nnoodle avatar ranma42 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

async-mutex's Issues

Feature request: mutex.tryLock()

I know this seems rather trivial to implement (I think a mutex.isLocked and a consequent mutex.acquire would do the job).

However, I'm not familiar with the internals of this library.

trylock is a standard function in the posix mutex api (pthread_mutex_trylock), would be nice to have it here, too.

Request: Priority Queue

I'm synchronizing a CSRF token that changes every time I make a request and I want to fall back to resetting the session if I get a CSRF verification error.

I want the reset code to have a higher priority than other waiting tasks, so that the other tasks can get a fresh CSRF token before they are scheduled.

Is this a feature you are interested in? If so, I will take some time to implement it on Monday.

Types not compatible

simple reproduction code:

// NodeJS: 14.15.3
import * as am from "async-mutex"; // [email protected]

const mutex = new am.Mutex();

function test(param1: string) {
  return;
}

(async () => {
  // Error: Argument of type 'void' is not assignable to parameter of type 'Worker<unknown>'. ts(2345)
  await mutex.runExclusive(test("hi"));
})();

or am i supposed to use it in a different way? this is how i thought it works after looking at https://github.com/DirtyHairy/async-mutex#synchronized-code-execution

Question about Usage

The NPM page's example for promise-style locking does not call the release() function. It seems, from the text and other examples on the page, that release() needs to be called. Would it be possible to update the example so that this is clearer?

withTimeout missing a clearTimeout

First off, thanks for the great library.

While writing some unit tests on some code that uses Semaphores I noticed I was having Jest yell at me about workers not gracefully exiting. In trying to track down the issue with my tests I noticed it only happened if I was testing code that had a withTimeout and Semaphore or a Mutex. I did some digging and it appears that there is a missing clearTimeout(handle) when a lock is successfully acquired here: https://github.com/DirtyHairy/async-mutex/blob/master/src/withTimeout.ts#L26-L28 so timers are leaking on successful acquisition of locks.

I am opening a PR to add the clearTimeout and some tests. Please let me know of any changes you'd like me to make of if I've missed a reason for the lack of the clearTimeout.

Thanks!

Please don't deprecate release() function

With great power comes great responsibility...says Batman I think ;)

I get it, runExclusive ensures you don't screw things up and not unlock the thread, but sometimes you can't set things up with a nice try / catch loop and you really need to use withTimeout to ensure it doesn't lock the application.

Here is an example from React Native that I made as a workaround for this Expo issue:

import React, {useCallback, useRef, useState} from "react";
import {MutexInterface, withTimeout, Mutex} from "async-mutex";
import {Video} from "expo-av";
import {Asset} from "expo-media-library";

type IVideoInfo = Pick<Asset, "width" | "height" | "duration">;

const useVideoInfo = () => {
  const [uri, setUri] = useState<string>();
  // const mutexRef      = useRef<MutexInterface>(withTimeout(new Mutex(), 4000));
  // withTimeout didn't really seem to be working...so we just set a timeout
  const mutexRef      = useRef<MutexInterface>(new Mutex());
  const videoInfoRef  = useRef<IVideoInfo>();

  const getVideoInfo = useCallback(
    async (uri: string) => {
      mutexRef.current.release();
      videoInfoRef.current = undefined;

      await mutexRef.current.acquire();
      setTimeout(() => mutexRef.current.release(), 5000);
      setUri(uri);

      await mutexRef.current.waitForUnlock();
      if (!videoInfoRef.current) {
        throw(`Could not extract the video info from ${uri}`)
      }
      return videoInfoRef.current;
    },
    [],
  );

  const VideoInfoExtractor = useCallback(
    () => <Video
      style={{width: 1, height: 1, opacity: 1}}
      source={{ uri }}
      onReadyForDisplay={event => {
        if (event.status?.isLoaded) {
          const { width, height } = event.naturalSize;
          videoInfoRef.current = {width, height, duration: event.status.durationMillis / 1000};
          mutexRef.current.release();
        }
      }}
    />,
    [uri],
  );

  return {
    VideoInfoExtractor,
    getVideoInfo,
  };
};

export {
  useVideoInfo,
  IVideoInfo,
};

Rewriting this somehow with runExclusive would basically fly in the face of what I'm trying to do here which is wait for one thing to wait for another.

Can we keep the acquire/release paradigm into the future? ๐Ÿ™

PS. For some reason the withTimeout wrapper didn't work for me...so I had to use setTimeout...maybe I'm doing something wrong?

Bug: Bad execution time for large number of calls on Mutex.runExclusive

Hi,
I have some trouble with some unit tests taking a lot of time. The problem seems to be related to some methods that are surrounded with runExclusively from a Mutex. I created this test case showing some problems with the performance of runExclusive in combinations with a delayed method.

    const mutex = new Mutex()
    const lockMethod = () => new Promise(resolve => setTimeout(() => resolve(undefined), 0))
    const promises = _.range(0, 1000).map(() => mutex.runExclusive(lockMethod))
    // const promises = _.range(0, 1000).map(() => lockMethod())
    // without lodash use:  [...Array(1000).keys()].map(...)
    await Promise.all(promises)

The test tooks 13seconds on my maschine to complete, while the same test competes in milliseconds if runExclusively is removed from the test. If you increase the number calls it gets even worse.

I am expecting the test to also complete in milliseconds while the Mutex is used.

Im running async-mutex 0.3.2, NodeJS 17.3, Windows 10, jest 27.4.7
If you need more information let me know.
Thanks

A small improvement

The
runExclusive()
takes as a parameter a function which returns a Promise.
Why not enable also to call this method with just a Promise?

0.4 Changelog

Can you please provide a changelog for the .4 version? Is it save to upgrade without any breaking changes?

Locking a specific value

Hi! I just found this project and I'm looking for something just like this. In my case, it's related to the creation of database connections. I only want to create the database connection once, even if the server is called multiple times during a very short period.

In normal cases, I could just create the promise for creating the database connection at startup, and use that promise, as it would always resolve to the same database connection. The problem is that my application is multitenant, and there are a lot of different databases (one per installation/company/customer). I can't know beforehand which databases exists. They can come and go.

I can use Mutex to lock the entire database connection creation - which will work for me and solve my problems - however, it would be very nice if database connections for different databases could be created in parallell. So I was thinking of something like this:

const mutex = new Mutex();
const connections: Record<number, DatabaseConnection> = {};
const createConnection = async (databaseId: number) => {
    const connection = connections[databaseId];
    if (connection) {
        return connection;
    }

    return mutex.runExclusive(databaseId, async () => {
        const connection = connections[databaseId];
        if (connection) {
            return connection;
        }

        const databaseInfo = getDatabaseInfo(databaseId);
        const connection = Database.connect(databaseInfo);
        connections[databaseId] = connection;
        return connection;
    });
}

The idea here is that the mutex would only lock per databaseId. Have anyone given any thought regarding this? Is there an obvious solution that I've missed?

Thanks in advance!

Why can't I release a semaphore?

Would you consider un-deprecating the release method? 0ccb014

Preventing release means you always have to acquire/unlock in the same scope, which works well when you want to control how many jobs can be done concurrently.

Semaphores also have another use, where you can use it to manage a shared resource that have separate writers and readers.
Here you would not initialize the semaphore to a positive value; starting at 0 is fine.
A writer pushes an item into an array and signals (or releases as this library calls it) the semaphore.
A reader waits/acquires on the semaphore, and then is guaranteed to have an item to work on.
This is the more classical use of a semaphore, where semaphores do not need to be signalled/waited on in the same thread.

So, it would be nice for this library to support that. This would also solve #23.

Every so often I go to find a semaphore library and always have to skip this one (which seems to be the most popular), and reach for semaphore-async-await.

Mutex guarantees FIFO?

Is it guaranteed that the mutex will be released in the same order that they are requested?

In the other words, the following should print 0 to 10 in that order.

var Mutex = require('async-mutex').Mutex;
const mutex = new Mutex();

for (let i=0; i<=10; i++){    
    let release = await mutex.acquire();
    console.log(`mutex acquired by i=${i}`)
    i === 0 ? setTimeout(release, 2000) : release()
}

Timing bug inside semaphore for `isLocked()`

I've discovered a timing bug involving withTimeout.

import { Mutex, withTimeout } from 'async-mutex';

async function main () {

  const lock = new Mutex();

  const release = await lock.acquire();

  const lock2 = withTimeout(lock, 100);

  try {
    await lock2.acquire();
  } catch (e) {
    console.log(e.message);
  }

  const lock3 = withTimeout(lock, 100);

  try {
    await lock3.acquire();
  } catch (e) {
    console.log(e.message);
  }

  release();

  console.log(lock.isLocked());

  setTimeout(() => {
    console.log(lock.isLocked());
  }, 0);

}

main();

Running with ts-node shows this:

timeout while waiting for mutex to become available
timeout while waiting for mutex to become available
true
false

We should expect that console.log(lock.isLocked()) to be false as soon as I've called release().

Note that this error goes away as soon as I remove the withTimeout attempts.

I've traced this to:

    Semaphore.prototype.isLocked = function () {
        return this._value <= 0;
    };

The _value property is still 0, and it appears the value isn't properly incremented until some ticks pass by. Which is why the setTimeout shows that the lock becomes unlocked afterwards.

So far, in order to fix this, I had to add a 0 ms sleep after my release calls.

thank you!

@DirtyHairy thank you for this excellent package! I dropped it into a situation that needed it and it worked right away with no fuss!

Politely, I think it was too hard to find this package. I believe this package is the one that most people will want to use for handling async mutexes and semaphores. I almost missed this package when searching for it.

Thank you for solving this problem for us!

Wrong documantation

The description:
mutex.acquire(function(release) { // ... });

is wrong because:
interface MutexInterface {

acquire(): Promise<MutexInterface.Releaser>;

0.4.0 not downloadable?

Hello,

it seems that the package.json still points to 0.3.2 version, however, it seems that 0.4.0 was already released. Can we get the update up? Thank you!

Possible to have blocking version of isLocked?

I have my own simple mutex implementation I've been using, but was looking at adopting this library because of the isLocked function. (I also like the amount of test coverage you have.) I'm curious if it's realistic or possible to have an async version of isLocked that blocks until the lock is available, like a waitForUnlock function.

I know there are some inconsistencies with checking for a locked state without actually locking it, but I think they would already apply to isLocked. I have some actions that don't need to lock an exclusive resource, but should not run if the exclusive resource is already in use. In effect, these functions depend on exclusive resource output and should wait if it's in use.

Not work on express app

I'm working on creating a chat app in which node.js server counts up the number of characters of all messages. For instance, 100 people sending messages each of them having the length of 10 means that total character count is 1000(100 * 10). And if a person sends a message with the length of 10 after that, total character count should be 1010(1000 + 10).

All messages are sent to node.js server, counted its length and saved on DB.

A big problem occurred when a lot of people send messages at the same time.

When I send 100 messages each of them having the length of 10 (like the example above) AT THE SAME TIME by jMeter, total character count was 120. It is supposed to be exact 1000!

Here's my code(node version 8.13.0):

'use strict'
const express = require('express');
const router = express.Router();
const co = require('co');
const models = require('models');
const Mutex = require('async-mutex').Mutex;
const CharacterCounter = models.CharacterCounter;

const mutex = new Mutex();

router.get('/message', function (req, res, next) {
  co(function* () {
    const originalText = req.body.message;

    const release = yield mutex.acquire();
    let addedCount = 0;
    try {
      const { currentCount } = yield CharacterCounter.findOne(); // get total character count so far
      addedCount = currentCount + originalText.length;
      yield CharacterCounter.updateTotalCharacterCount(addedCount); // save on DB
    } catch (e) {
      console.log(e);
    } finally {
      release();
    }

    // ... my code is keep going

It seems that the cause of this problem is in this mutex library. But I don't know the exact solution. Does anyone have an idea to achieve my goal? I've only started working on node.js recently, so a plain explanation would be appreciated.

Different release lock interface

Currently, semaphores and mutex have a return Promised of a new release function. Is there anyway to just call a release directly on the semaphore or mutex?

I have functions where I need to lock/release multiple times.
I end up having to do this:

const release1 = await MUTEX.acquire();
    do something
release1();
   do something
const release2 = await MUTEX.acquire();
release2();
   do something
const release3 = await MUTEX.acquire();
release3();

Is it possible to make it like this?

await MUTEX.acquire();
MUTEX.release();

do something

await MUTEX.acquire();
MUTEX.release();

do something

await MUTEX.acquire();
MUTEX.release();

mutex.waitForUnlock() bug?

Not sure if this is a bug or if I'm using it incorrectly.

version: 0.3.2
node: v16.6.1
typescript: Version 4.5.5

const myMutex = new Mutex();

await myMutex.runExclusive( async () => {
await myAsyncFunc();
});

// now wait until unlocked ...
await myMutex.waitForUnlock();
if(myMutex.isLocked()) throw new Error('myMutex is locked'); // <== this throws the error all the time

// instead, this works fine
const releaser = await myMutex.acquire();
releaser();
if(myMutex.isLocked()) throw new Error('myMutex is locked); // <== never throws, it works

BTW, awesome library!

TypeError: objMutex.waitForUnlock is not a function

why waitForUnlock not found ?

let {Mutex} = require('async-mutex');

function sleep(ms) {
    return new Promise(resolve => setTimeout(resolve, ms));
}

async function f(){
	const objMutex = new Mutex();

	let promise = objMutex.runExclusive( async () => {
		await sleep(5000);
	});

	await objMutex.waitForUnlock();
	console.log('mutex unlocked');
}

f().then(console.log).catch(console.error);
TypeError: objMutex.waitForUnlock is not a function
    at f (E:\src\arbitr-jsm\test\mutex-test.js:14:17)
    at Object.<anonymous> (E:\src\arbitr-jsm\test\mutex-test.js:18:1)
    at Module._compile (node:internal/modules/cjs/loader:1101:14)
    at Object.Module._extensions..js (node:internal/modules/cjs/loader:1153:10)
    at Module.load (node:internal/modules/cjs/loader:981:32)
    at Function.Module._load (node:internal/modules/cjs/loader:822:12)
    at Function.executeUserEntryPoint [as runMain] (node:internal/modules/run_main:79:12)
    at node:internal/main/run_main_module:17:47

Cancel specific pending lock

Hi @DirtyHairy thank you for the great lib!

I wanted to ask you if there's a way to cancel() some specific pending locks of a mutex? In the docs it states

Pending locks can be cancelled by calling cancel() on the mutex. This will reject all pending locks with E_CANCELED:

However, in some situations I want to cancel only a specific lock while leaving others waiting for the lock to become available. I took a quick look at the source code but didn't find a way of achieving this. Can you please provide some pointers about how to make this possible?
I guess that the runExclusive() will have to return an ID that later can be used to cancel (e.g., cancel(<ID>)) the pending lock.

NPM documentation improvement

I spent about 30mins-1hour debugging why my mutex.release() was breaking. Turns out I needed to use the releaser returned from the mutex.acquire().

I did this because I was skimming the NPM readme which shows example of mutex.release().

I have two possible suggestions for anyone that has the same issue:

  • I assume the "WARNING: Using this API comes with the inherent danger of releasing a mutex locked in an entirely unrelated place. Use with care." is meant for this, so moving it to the top of the 'Alternate release API' section (before the code examples) so that it's easier to notice for people skimming the docs would be really nice
  • Changing the naming of mutex.release() to be mutex.semaphoreRelease() to make it obvious that it's probably not what you're looking for as specifically acts directly on the semaphore itself inside the mutex

Btw, awesome work on this library, it's super useful!

Would it always be `locked` if we initialize the semaphore with a negative integer?

As mentioned in the document,

...
A semaphore is a data structure that is initialized to **an arbitrary integer** value and that
can be locked multiple times.
...

With the implementation for isLockedใ€‚

isLocked(): boolean {
return this._value <= 0;
}

So, it seems the semaphore is always locked if we create it with negative value.

const semaphore = new Semaphore(-1);

Does it really make sense to allow negative value for initialization?

Add an ES module entrypoint

Trying import syntax with node 13 results in:

import {Mutex} from 'async-mutex';
        ^^^^^
SyntaxError: The requested module 'async-mutex' does not provide an export named 'Mutex'

The following, however, works:

import AsyncMutex from 'async-mutex';
const mutex = new AsyncMutex.Mutex();

Weird warning

I had to update node to 14.x, and after running npm start -- --reset-cache I'm getting the following warning:

warn Package async-mutex has been ignored because it contains invalid configuration. Reason: Package subpath './package.json' is not defined by "exports" in app/node_modules/async-mutex/package.json

Ideas?

Using async mutex across NodeJS processes

This is more of a question related to the way this package works, and any limitations it may have in relation to some specific use-cases.

I have written a command line utility which generates a vast amount of data, and then dumps the generated data into one large file.

I've noticed some race-condition issues where a process reads the file, and adds to it at the same time another file is reading/writing the same contents. This results in some data being dropped/overridden from the final file, as what is added in one process is not read into the other process when it is merging the contents that it has generated.

I'm attempting to use this library to add some read/write protection to the processes. I can also attempt to refactor the child processes into a single-process asynchronous task, but that will significantly slow down the operations, and requires a lot of extra work on my end.

However, this library seems to only work within one process, and the lock is not shared across all the children (which, makes sense), causing each child process to obtain its own lock.

I'm attempting to do the following.

// mutex.ts
export const mutex = new Mutex();
// child_process.ts
import { mutex } from './mutex';
import { v4 } from 'uuid';
import * as fs from 'fs';
import * as _ from 'lodash';
import * as yaml from 'js-yaml';

const definition = await generateData();
await mutex.runExclusive(() => {
   console.log('LOCK ACQUIRED FOR', uuid);
   if (fs.existsSync(baseFile)) {
      baseDefinition = yaml.safeLoad(fs.readFileSync(baseFile, 'utf8'));
   }
   baseDefinition = _.merge(baseDefinition, definition);
   fs.writeFileSync(baseFile, yaml.safeDump(baseDefinition));
   console.log('RELEASING LOCK FOR', uuid);
});

const moreData = await generateMoreData();
await mutex.runExclusive(async () => {
   console.log('SECOND LOCK ACQUIRED FOR', uuid);
   await moveData(moreData, baseFile);
   console.log('SECOND LOCK RELEASED FOR', uuid);
});

The generateData and generateMoreData methods take a little bit of time, so I do not want to keep the lock during that generation process (which is why the single threaded async/await method takes significantly more time to complete, and why I'm spawning multiple child processes). The moveData method does some additional read/write stuff on baseFile, so I'm simply attempting to acquire the lock before attempting to run the method.

When running this, I get some output that looks like this

LOCK ACQUIRED FOR 526ac716-b135-4a2d-986d-75bee856411b
LOCK ACQUIRED FOR 104073b7-6d7f-4ca1-8513-eec103877898
RELEASING LOCK FOR 526ac716-b135-4a2d-986d-75bee856411b
SECOND LOCK ACQUIRED FOR 526ac716-b135-4a2d-986d-75bee856411b
RELEASING LOCK FOR 104073b7-6d7f-4ca1-8513-eec103877898
SECOND LOCK ACQUIRED FOR 104073b7-6d7f-4ca1-8513-eec103877898
SECOND LOCK RELEASED FOR 526ac716-b135-4a2d-986d-75bee856411b
LOCK ACQUIRED FOR 155eb578-bc91-4bee-830f-bab2e1363bb8
SECOND LOCK RELEASED FOR 104073b7-6d7f-4ca1-8513-eec103877898
LOCK ACQUIRED FOR 8e6e08cf-8b3b-4040-af88-86670ad5cbb1
RELEASING LOCK FOR 155eb578-bc91-4bee-830f-bab2e1363bb8
SECOND LOCK ACQUIRED FOR 155eb578-bc91-4bee-830f-bab2e1363bb8
LOCK ACQUIRED FOR cb79f948-e4a4-4f75-ba31-fa2b743ce886
RELEASING LOCK FOR 8e6e08cf-8b3b-4040-af88-86670ad5cbb1
SECOND LOCK ACQUIRED FOR 8e6e08cf-8b3b-4040-af88-86670ad5cbb1
SECOND LOCK RELEASED FOR 155eb578-bc91-4bee-830f-bab2e1363bb8
RELEASING LOCK FOR cb79f948-e4a4-4f75-ba31-fa2b743ce886
SECOND LOCK ACQUIRED FOR cb79f948-e4a4-4f75-ba31-fa2b743ce886
SECOND LOCK RELEASED FOR 8e6e08cf-8b3b-4040-af88-86670ad5cbb1
SECOND LOCK RELEASED FOR cb79f948-e4a4-4f75-ba31-fa2b743ce886

As you can see, the locks clearly are not shared across processes, as the locks are acquired and released independently of one another.

Is this a limitation of the library, or of NodeJs child processes?

Semaphore with negative count (suggestion)

As _weightedQueues is private in the semaphore, to detect the number of pending items, it would be better to make the count negative when added to the queue. (number of pending = absolute value of count )

what is the limit of thread safe queue limit of asyn-mutex module?

Hi everyone,
Currently we are receiving the API request approx. 110 within a minute, also we are getting multiple requests in parallel.
So I am using this module Mutex API with async/await style for locking the thread for api request in our code, It's working fine for 110 API request within a minute.
https://www.npmjs.com/package/async-mutex

Is anyone know about the limit of thread safe queue limit? Meanwhile how much API request will be handle through above Mutex API approach?
Can this approach will handle approx. 1000 API request on same time(within a minute)?

Please check this scenario and let us know the helpful answer, so that I can plan to use this approach for 1000 API request within a minute.

Limit concurrency with Mutex?

Hi! Thank you for this excellent library. I'm kind of amazed that it isn't more popular given how much we rely on it.

I was wondering if there's a possibility the library could be expanded to limit the number of concurrent accesses - to something greater than 1?

My use case is resizing and uploading a lot of images - with the absence of back-pressure the code races through the resize step, allocating tons of memory even though they're only uploaded slowly. So I'd like to throttle this, but putting the 'resize and upload' step into a Mutex would be wasteful, as I can probably afford 4x concurrency.

Maybe there's a better word for this than mutex as well - I think perhaps 'semaphore' might be the right name.

What do you think? Worth adding to this library or should I just spin something else up myself?

Alternate release API seems broken

Hi, I really like this small library so far. My impression is that the following code should work properly:

let Mutex = require('async-mutex').Mutex;
let mutex = new Mutex();

async function doWork(id) {
    await mutex.acquire();
    try {
        console.log('Doing work', id);
    }
    finally {
        mutex.release();
    }
}

async function doEverything() {
    await Promise.all([
        doWork(1),
        doWork(2),
        doWork(3),
        doWork(4),
    ]);

    console.log('All done');
}

doEverything();

Expected output ("Doing work" lines not necessarily in that order):

Doing work 1
Doing work 2
Doing work 3
Doing work 4
All done

Actual output:

Doing work 1
Doing work 2

This only happens when using the alternate release API as described in the README. If I use the returned release function it works fine. I'm running this on node v12.19.1 on linux. But initially I experienced this issue in a web application (babel + webpack).

Cancel doesn't work as expected

It looks like when I cancel a mutex it doesn't throw the E_CANCELLED error and the runExclusive callback finishes.
Here is the code to reproduce it:

const mutex = new Mutex();

const longOperation = async () => {
  // wait 5 seconds
  await new Promise(res => setTimeout(res, 5000));
};

const execute = async (id: number) => {
  console.log("Executing", id);
  if(mutex.isLocked()) {
    console.log("Mutex is locked", id);
    mutex.cancel();
    mutex.release();
  }
  await mutex.runExclusive(async () => {
    await longOperation();
  });
  console.log("Execute finished running", id);
};


void execute(1);
void execute(2);

Output:

Executing 1
Executing 2
Mutex is locked 2
// 5 seconds pass...
Execute finished running 1
Execute finished running 2

Expected output:

Executing 1
Executing 2
Mutex is locked 2
thrown E_CANCELLED error by the first execution
// 5 seconds pass...
Execute finished running 2

Note 1: I am calling the release() because of this behaviour:

Note that while all pending locks are cancelled, a currently held lock will not be revoked. In consequence, the mutex may not be available even after cancel() has been called.

Communicate value to waiting code

Hello folks,

I don't know if there is a straight link to async-mutex but i am actually doing this thing:

// listening for events 
provider.pairContract.on('Sync', async (...dexReserves) => {
    ...
    // No exclusive code here, can be // if multiple events are triggered really quickly
    ...
    // Here we should be able to be notified from the thread running the exclusive code
    var release = await mutex.acquire();
    // Exclusive code - may take several seconds, so many threads will wait for the mutex to be unlocked
    release();
}

What i'm trying to do here is that i would like to communicate a value from the thread running the exclusive code, to the waiting threads waiting for mutex to be unlocked. Why ? because conditions of execution of the waiting threads may vary depending of what happens in the thread having the lock.

I was thinking using an EventEmmiter, but i was wondering if may be there is a way to communicate a variable from a synchronized portion of code thanks to async-mutex ?

Thanks a lot and GG for this lib !
Cheers,

Option to empty the queue?

Hi and thanks for this awesome library.

I was wonder if I can empty this._queue somehow. In my app I manage a queue of actions, but sometimes I just want to empty it all out when a drastic event happens (and the old queue is not relevant anymore).

Thanks!

Optional acquire timeout?

It would be good to also provide a timeout argument to acquire. If the mutex is not acquired in that time, reject the promise.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.