Giter Site home page Giter Site logo

Comments (4)

DirtyHairy avatar DirtyHairy commented on June 2, 2024 2

I am referring to the original code sample you posted.

router.get('/message', function (req, res, next) {
  co(function* () {
    const originalText = req.body.message;

    const mutex = new Mutex();

    const release = yield mutex.acquire();
    let addedCount = 0;
    try {
      const { currentCount } = yield CharacterCounter.findOne(); // get total character count so far
      addedCount = currentCount + originalText.length;
      CharacterCounter.updateTotalCharacterCount(addedCount); // save on DB
    } catch (e) {
      console.log(e);
    } finally {
      release();
    }

  // ... my code is keep going

You're using the mutex wrong. You are creating a new mutex each time you access the DB, so concurrent requests will access the db concurrently, as the individual mutexes know nothing about each other. In addition, you do not yield when updating.

Your edited code above should work fine. If you still have problem and suspect async-mutex to be the issue, please add console.log messages at the beginning and end of the try block to see if this is really a concurrency issue. You can also use a canary boolean variable to check if the critical section is already executing.

from async-mutex.

tinytortoise-dev avatar tinytortoise-dev commented on June 2, 2024

@DirtyHairy Thank you for commenting! I eventually run this code below and sent 20 concurrent requests.
Each one has message content and its length is 10, so the expected result is 200(20 * 10). But I got the result of 160...

Looking console, I noticed the initial mutex object appeared two times, which is strange because mutex object should be a singleton.

Code:

'use strict'
const express = require('express');
const router = express.Router();
const co = require('co');
const models = require('models');
const Mutex = require('async-mutex').Mutex;
const CharacterCounter = models.CharacterCounter;

const mutex = new Mutex();

router.get('/message', function (req, res, next) {
  co(function* () {
    const originalText = req.body.message;

	console.log('BEFORE STARTING LOCK\n', mutex);
    const release = yield mutex.acquire();
    console.log('START LOCK\n', mutex);
    let addedCount = 0;
    try {
      const { currentCount } = yield CharacterCounter.findOne(); // get total character count so far
      addedCount = currentCount + originalText.length;
      yield CharacterCounter.updateTotalCharacterCount(addedCount); // save on DB
    } catch (e) {
      console.log(e);
    } finally {
      console.log('BEFORE FINISHING LOCK\n', mutex);
      release();
      console.log('FINISH LOCK\n', mutex);
    }

    // ... my code is keep going

Console:

// 1st request arrived
BEFORE STARTING LOCK
Mutex {
  _semaphore: Semaphore { _maxConcurrency: 1, _queue: [], _value: 1 } }  // this object is an initial object and should appear only once, right?
START LOCK
Mutex {
  _semaphore:
   Semaphore {
     _maxConcurrency: 1,
     _queue: [],
     _value: 0,
     _currentReleaser: [Function] } }
Mongoose: CharacterCounter.findOne({}, { fields: {} })
Mongoose: CharacterCounter.findAndModify({}, [], { '$set': { totalCharacters: 10 } }, { upsert: true, new: false, remove: false, 
fields: {} })

// arrived 2nd request but have to wait because 1st request is still processing
BEFORE FINISHING LOCK
Mutex {
  _semaphore:
   Semaphore {
     _maxConcurrency: 1,
     _queue: [],
     _value: 0,
     _currentReleaser: [Function] } }
     
// 1st request is processed and mutex is unlocked
FINISH LOCK
Mutex {
  _semaphore:
   Semaphore {
     _maxConcurrency: 1,
     _queue: [],
     _value: 1,
     _currentReleaser: [Function] } }
     
// 3rd request arrived
BEFORE STARTING LOCK
Mutex {
  _semaphore:
   Semaphore {
     _maxConcurrency: 1,
     _queue: [],
     _value: 1,
     _currentReleaser: [Function] } }
     
// started lock for 2nd request
START LOCK
Mutex {
  _semaphore:
   Semaphore {
     _maxConcurrency: 1,
     _queue: [],
     _value: 0,
     _currentReleaser: [Function] } }
Mongoose: CharacterCounter.findOne({}, { fields: {} })

// 4th request arrived
BEFORE STARTING LOCK
Mutex {
  _semaphore:
   Semaphore {
     _maxConcurrency: 1,
     _queue: [],
     _value: 0,
     _currentReleaser: [Function] } }
     
// 5th request arrived
BEFORE STARTING LOCK
Mutex {
  _semaphore:
   Semaphore {
     _maxConcurrency: 1,
     _queue: [ [Function] ],
     _value: 0,
     _currentReleaser: [Function] } }
     
// 2nd request is being processed
Mongoose: CharacterCounter.findAndModify({}, [], { '$set': { totalCharacters: 20 } }, { upsert: true, new: false, remove: false, 
fields: {} })

// 6th request arrived and 2nd request is finished
BEFORE STARTING LOCK
BEFORE FINISHING LOCK
Mutex {
  _semaphore:
   Semaphore {
     _maxConcurrency: 1,
     _queue: [ [Function], [Function] ],
     _value: 0,
     _currentReleaser: [Function] } }
Mutex {
  _semaphore: Semaphore { _maxConcurrency: 1, _queue: [], _value: 1 } }  // it seems a new mutex instance is created? I don't think this object is supposed to be here...
  
  
// (...) 


Mongoose: CharacterCounter.findOne({}, { fields: {} })
Mongoose: CharacterCounter.findAndModify({}, [], { '$set': { totalCharacters: 90 } }, { upsert: true, new: false, remove: false, 
fields: {} })
BEFORE FINISHING LOCK
Mutex {
  _semaphore:
   Semaphore {
     _maxConcurrency: 1,
     _queue: [],
     _value: 0,
     _currentReleaser: [Function] } }
FINISH LOCK
Mutex {
  _semaphore:
   Semaphore {
     _maxConcurrency: 1,
     _queue: [],
     _value: 1,
     _currentReleaser: [Function] } }
BEFORE STARTING LOCK
Mutex {
  _semaphore:
   Semaphore {
     _maxConcurrency: 1,
     _queue: [],
     _value: 0,
     _currentReleaser: [Function] } }
Mongoose: CharacterCounter.findAndModify({}, [], { '$set': { totalCharacters: 90 } }, { upsert: true, new: false, remove: false, 
fields: {} })
BEFORE STARTING LOCK
Mutex {
  _semaphore:
   Semaphore {
     _maxConcurrency: 1,
     _queue: [ [Function] ],
     _value: 0,
     _currentReleaser: [Function] } }
BEFORE STARTING LOCK
Mutex {
  _semaphore:
   Semaphore {
     _maxConcurrency: 1,
     _queue: [ [Function], [Function] ],
     _value: 0,
     _currentReleaser: [Function] } }
BEFORE FINISHING LOCK
Mutex {
  _semaphore:
   Semaphore {
     _maxConcurrency: 1,
     _queue: [ [Function], [Function], [Function] ],
     _value: 0,
     _currentReleaser: [Function] } }
FINISH LOCK
Mutex {
  _semaphore:
   Semaphore {
     _maxConcurrency: 1,
     _queue: [ [Function], [Function] ],
     _value: 0,
     _currentReleaser: [Function] } }

START LOCK
Mutex {
  _semaphore:
   Semaphore {
     _maxConcurrency: 1,
     _queue: [ [Function], [Function] ],
     _value: 0,
     _currentReleaser: [Function] } }
Mongoose: CharacterCounter.findOne({}, { fields: {} })
Mongoose: CharacterCounter.findAndModify({}, [], { '$set': { totalCharacters: 100 } }, { upsert: true, new: false, remove: false, fields: {} })
BEFORE STARTING LOCK
Mutex {
  _semaphore:
   Semaphore {
     _maxConcurrency: 1,
     _queue: [ [Function], [Function] ],
     _value: 0,
     _currentReleaser: [Function] } }
BEFORE FINISHING LOCK
Mutex {
  _semaphore:
   Semaphore {
     _maxConcurrency: 1,
     _queue: [ [Function], [Function], [Function] ],
     _value: 0,
     _currentReleaser: [Function] } }
FINISH LOCK
Mutex {
  _semaphore:
   Semaphore {
     _maxConcurrency: 1,
     _queue: [ [Function], [Function] ],
     _value: 0,
     _currentReleaser: [Function] } }
START LOCK
Mutex {
  _semaphore:
   Semaphore {
     _maxConcurrency: 1,
     _queue: [ [Function], [Function] ],
     _value: 0,
     _currentReleaser: [Function] } }
Mongoose: CharacterCounter.findOne({}, { fields: {} })
BEFORE STARTING LOCK
Mutex {
  _semaphore:
   Semaphore {
     _maxConcurrency: 1,
     _queue: [ [Function], [Function] ],
     _value: 0,
     _currentReleaser: [Function] } }
Mongoose: CharacterCounter.findAndModify({}, [], { '$set': { totalCharacters: 110 } }, { upsert: true, new: false, remove: false, fields: {} })
BEFORE STARTING LOCK
Mutex {
  _semaphore:
   Semaphore {
     _maxConcurrency: 1,
     _queue: [ [Function], [Function], [Function] ],
     _value: 0,
     _currentReleaser: [Function] } }
BEFORE STARTING LOCK
Mutex {
  _semaphore:
   Semaphore {
     _maxConcurrency: 1,
     _queue: [ [Function], [Function], [Function], [Function] ],
     _value: 0,
     _currentReleaser: [Function] } }
BEFORE FINISHING LOCK
Mutex {
  _semaphore:
   Semaphore {
     _maxConcurrency: 1,
     _queue: [ [Function], [Function], [Function], [Function], [Function] ],
     _value: 0,
     _currentReleaser: [Function] } }
FINISH LOCK
Mutex {
  _semaphore:
   Semaphore {
     _maxConcurrency: 1,
     _queue: [ [Function], [Function], [Function], [Function] ],
     _value: 0,
     _currentReleaser: [Function] } }

START LOCK
Mutex {
  _semaphore:
   Semaphore {
     _maxConcurrency: 1,
     _queue: [ [Function], [Function], [Function], [Function] ],
     _value: 0,
     _currentReleaser: [Function] } }
Mongoose: CharacterCounter.findOne({}, { fields: {} })
Mongoose: CharacterCounter.findAndModify({}, [], { '$set': { totalCharacters: 120 } }, { upsert: true, new: false, remove: false, fields: {} })
BEFORE FINISHING LOCK
Mutex {
  _semaphore:
   Semaphore {
     _maxConcurrency: 1,
     _queue: [ [Function], [Function], [Function], [Function] ],
     _value: 0,
     _currentReleaser: [Function] } }
FINISH LOCK
Mutex {
  _semaphore:
   Semaphore {
     _maxConcurrency: 1,
     _queue: [ [Function], [Function], [Function] ],
     _value: 0,
     _currentReleaser: [Function] } }

START LOCK
Mutex {
  _semaphore:
   Semaphore {
     _maxConcurrency: 1,
     _queue: [ [Function], [Function], [Function] ],
     _value: 0,
     _currentReleaser: [Function] } }
Mongoose: CharacterCounter.findOne({}, { fields: {} })
Mongoose: CharacterCounter.findAndModify({}, [], { '$set': { totalCharacters: 130 } }, { upsert: true, new: false, remove: false, fields: {} })
BEFORE FINISHING LOCK
Mutex {
  _semaphore:
   Semaphore {
     _maxConcurrency: 1,
     _queue: [ [Function], [Function], [Function] ],
     _value: 0,
     _currentReleaser: [Function] } }
FINISH LOCK
Mutex {
  _semaphore:
   Semaphore {
     _maxConcurrency: 1,
     _queue: [ [Function], [Function] ],
     _value: 0,
     _currentReleaser: [Function] } }

START LOCK
Mutex {
  _semaphore:
   Semaphore {
     _maxConcurrency: 1,
     _queue: [ [Function], [Function] ],
     _value: 0,
     _currentReleaser: [Function] } }
Mongoose: CharacterCounter.findOne({}, { fields: {} })
Mongoose: CharacterCounter.findAndModify({}, [], { '$set': { totalCharacters: 140 } }, { upsert: true, new: false, remove: false, fields: {} })
BEFORE FINISHING LOCK
Mutex {
  _semaphore:
   Semaphore {
     _maxConcurrency: 1,
     _queue: [ [Function], [Function] ],
     _value: 0,
     _currentReleaser: [Function] } }
FINISH LOCK
Mutex {
  _semaphore:
   Semaphore {
     _maxConcurrency: 1,
     _queue: [ [Function] ],
     _value: 0,
     _currentReleaser: [Function] } }
START LOCK
Mutex {
  _semaphore:
   Semaphore {
     _maxConcurrency: 1,
     _queue: [ [Function] ],
     _value: 0,
     _currentReleaser: [Function] } }
Mongoose: CharacterCounter.findOne({}, { fields: {} })
Mongoose: CharacterCounter.findAndModify({}, [], { '$set': { totalCharacters: 150 } }, { upsert: true, new: false, remove: false, fields: {} })
BEFORE FINISHING LOCK
Mutex {
  _semaphore:
   Semaphore {
     _maxConcurrency: 1,
     _queue: [ [Function] ],
     _value: 0,
     _currentReleaser: [Function] } }
FINISH LOCK
Mutex {
  _semaphore:
   Semaphore {
     _maxConcurrency: 1,
     _queue: [],
     _value: 0,
     _currentReleaser: [Function] } }
START LOCK
Mutex {
  _semaphore:
   Semaphore {
     _maxConcurrency: 1,
     _queue: [],
     _value: 0,
     _currentReleaser: [Function] } }
Mongoose: CharacterCounter.findOne({}, { fields: {} })
Mongoose: CharacterCounter.findAndModify({}, [], { '$set': { totalCharacters: 160 } }, { upsert: true, new: false, remove: false, fields: {} })
BEFORE FINISHING LOCK
Mutex {
  _semaphore:
   Semaphore {
     _maxConcurrency: 1,
     _queue: [],
     _value: 0,
     _currentReleaser: [Function] } }
FINISH LOCK
Mutex {
  _semaphore:
   Semaphore {
     _maxConcurrency: 1,
     _queue: [],
     _value: 1,
     _currentReleaser: [Function] } }

When I do the same thing with 10 concurrent requests, the result is always correct.(I've got exact the result of 100.)
And the larger the number of concurrent requests become, the higher the more initial mutex objects appears on console.
Sending 100 concurrent requests gave me 4 or 5 initial mutex objects on console and total character count was 460.

p.s.
In addition, you do not yield when updating.

Could you explain more about this comment, please? I think if I don't yield when updating, another process can get the resource value that should be after updated but not yet.

I've only started working on programming world recently, so if I'm saying stupid things and take your much time, I apoligise.

from async-mutex.

DirtyHairy avatar DirtyHairy commented on June 2, 2024

Mmh, I don't know why your program does not work as intended, but the mutex works fine --- I am pretty sure that the error is elsewhere. Observe that there is never a "START LOCK" before the previous cycle "START - BEFORE FINISH - FINISH" has completed.

I am not sure what you are referring to with "initial mutex object". The logs show that the mutex starts with an empty queue and then starts filling up its queue as more and more requests are waiting for the mutex to become available.

Could you explain more about this comment, please?

I was referring to your initial code sample:

    try {
      const { currentCount } = yield CharacterCounter.findOne(); // get total character count so far
      addedCount = currentCount + originalText.length;
      CharacterCounter.updateTotalCharacterCount(addedCount); // save on DB
    } catch (e) {
      console.log(e);
    } finally {
      release();
    }

Here, you immediately release the mutex after calling updateTotalCharacterCount, even though the update has not finished yet. In your more current code samples, you yield which causes the request to return to the event loop until the Promise for the update has resolved, so that's no problem anymore.

from async-mutex.

tinytortoise-dev avatar tinytortoise-dev commented on June 2, 2024

Now I'm trying to find another solution...
But your replay was much appreciated. Thank you!

from async-mutex.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.