Giter Site home page Giter Site logo

Comments (16)

sslotsky avatar sslotsky commented on May 3, 2024 16

For what it's worth, I got around to testing my code and it was almost correct. Here's a working version with eventChannel

import { eventChannel, takeEvery } from 'redux-saga'
import { take, call, put, fork, cancel, cancelled } from 'redux-saga/effects'
import socket, { channel } from './socket'
import * as actionTypes from './constants'

function socketEmitter(subject) {
  return eventChannel(emitter => {
    socket.emit('track', subject)
    const newChannel = channel(subject)

    newChannel.on('tweets', tweets => {
      emitter(tweets)
    })

    return () => {
      newChannel.removeAllListeners('tweets')
      socket.emit('untrack', subject)
    }
  })
}

function* listen(subject) {
  const chan = yield call(socketEmitter, subject)
  try {
    while (true) {
      let tweets = yield take(chan)
      yield put({
        type: actionTypes.TWEETS_RECEIVED,
        subject,
        tweets,
        read: false
      })
    }
  } finally {
    if (yield cancelled())
      chan.close()
  }
}

function* subscribe(action) {
  while (true) {
    const listenTask = yield fork(listen, action.subject)
    const unsubscribed = yield take(actionTypes.STOP_TRACKING)
    if (action.subject == unsubscribed.subject)
      yield cancel(listenTask) 
  }
}

function* track() {
  yield* takeEvery(actionTypes.TRACK_SUBJECT, subscribe)
}

export default track

from redux-saga.

yelouafi avatar yelouafi commented on May 3, 2024 11

@emirotin I think this maybe related to this SO question. The main point is

To integrate external push sources, we'll need to transpose the Event Source from the push model into the pull model; i.e. we'll have to build an event iterator from which we can pull the future events from the event source

It joins somewhat your solution, but factor out the concept into a separate function. So the saga can be easily tested. You can find a jsbin live demo here https://jsbin.com/wumuri/edit?js,console

// Event iterator for socket events
function socketEventIterator(event) {
  let deferred
  socket.subscribe(event, payload => {
    if(deferred) {
      deferred.resolve(payload)
      deferred = null 
    }
  }

  return {
    nextEvent() {
      if(!deferred) {
        deferred = {}
        deferred.promise = 
          new Promise(resolve => deferred.resolve = resolve)
      }
      return deferred.promise
    }
  }
}

function* listenToSocket(event, actionType) {
  const { nextEvent } = yield call(socketEventIterator, event)
  while (true) {
    const payload = yield call(nextEvent)
    yield put({ type: actionType, payload })
  }
}

function* subscribeSocket(getState) {
  while (true) {
    const nextAction = yield take(SUBSCRIBE_SOCKET)
    const subscribeConfig = nextAction.payload
    const { event, actionType } = subscribeConfig
    yield fork(listenToSocket, event, actionType)
  }
}

from redux-saga.

mjrussell avatar mjrussell commented on May 3, 2024 1

I ran into the same issues, but ended up with a different solution. This is essentially using a link-list approach to queue up the messages:

// Creates a chain of message promises, each promise (when resolved) has a payload and nextPromise 
// to wait for the next message
function socketMessageChain(socket) {
  let resolveNext;
  const headPromise = new Promise(resolve => resolveNext = resolve);

  socket.onmessage = event => {
    const resolveThis = resolveNext;
    const nextPromise = new Promise(resolve => resolveNext = resolve);
    resolveThis({ payload: JSON.parse(event.data), nextPromise });
  };

  return headPromise;
}

from redux-saga.

jedwards1211 avatar jedwards1211 commented on May 3, 2024

Here's an idea. Fork a subtask that:

  • subscribes to the websocket events
  • enters a while loop that:
    • yields a Promise that gets fulfilled with the next websocket message when it comes in
    • yields a put for that message.
  • catches cancelation exceptions and unsubscribes when that happens

from redux-saga.

jedwards1211 avatar jedwards1211 commented on May 3, 2024

I have to say though, I wonder if redux-saga really provides a clear advantage for subscribing to things over a simple custom middleware that just dispatches the events/messages from a websocket/whatever as they arrive. It seems like testing might be easier with redux-saga. Does anyone else have an opinion on this?

from redux-saga.

emirotin avatar emirotin commented on May 3, 2024

Thanks for the idea.
redux-saga works well for the XHR middleware and for the general orchestration, that's why I tend to keep the socket stuff there as well

So now I have this:

function* listenToSocket(event, actionType) {
  let promise
  let resolveFn
  const callback = (payload) => {
    resolveFn(payload)
  }
  const unsibscribe = socket.subscribe(event, callback)

  while (true) {
    promise = new Promise(resolve => {
      resolveFn = resolve
    })
    const payload = yield promise
    yield put({ type: actionType, payload })
  }
}

function* subscribeSocket(getState) {
  while (true) {
    const nextAction = yield take(SUBSCRIBE_SOCKET)
    const subscribeConfig = nextAction.payload
    const { event, actionType } = subscribeConfig
    yield fork(listenToSocket, event, actionType)
  }
}

Which suprisingly works (insult towards my understanding of sagas, not your idea :)). But indeed looks less readable than a middleware that can call store.dispatch directly at any given time. I'd like to hear @yelouafi's opinion on this.

from redux-saga.

emirotin avatar emirotin commented on May 3, 2024

Thanks for your response. Indeed a very similar question, though I tried searching before asking and it didn't come. I like the concepts separation, will update my code.

from redux-saga.

 avatar commented on May 3, 2024

Perhaps I'm missing something here, but I'm dropping events across the websocket connection using this deferred method.

It seems like this is a result of the event loop and next-tick mechanics, because if I change the code to this:

  socket.subscribe(event, payload => {
    if(deferred) {
      deferred.resolve(payload)
      deferred = null 
    } else {
      console.log('dropped event', payload)
    }
  }

I get a whole lot of dropped event logs.

However, if I utilize a simple queue as demonstrated here I do not run into any dropped frames. Hopefully this is helpful to someone else with the same issue.

I would be curious if someone could explain to my why this is happening, however.

from redux-saga.

yelouafi avatar yelouafi commented on May 3, 2024

@aft-luke Yes the above method doesn't queue in-between events. The method on the SO does queue the events

from redux-saga.

gajus avatar gajus commented on May 3, 2024

@yelouafi I am probably overlooking something, but the solution that you have proposed looks to me over-engineered.

Wouldn't this achieve the same result?

function socketEventIterator (eventName) {
    let resolveNextValue,
        resolved;

    resolved = true;

    bridge.on(eventName, (event) => {
        resolveNextValue(event);

        resolved = true;
    });

    return () => {
        if (!resolved) {
            throw new Error('Iterator can be used by only one agent.');
        }

        resolved = false;

        return new Promise((resolve) => {
            resolveNextValue = resolve;
        });
    };
}

or even

function socketEventIterator (eventName) {
    return new Promise((resolve) => {
        bridge.once(eventName, (event) => {
            resolve(event);
        });
    });
}

from redux-saga.

sslotsky avatar sslotsky commented on May 3, 2024

Maybe this is a dumb question, but would it also be possible to do this using an event channel? Here's some totally untested code to serve as an example, am I way off base here?

function socketEmitter(subject) {
  return eventChannel(emitter => {
    const channel = channel(subject)
    channel.on('message', payload => {
      emitter(payload)
    })

    return () => {
      channel.removeAllListeners('message')
    }
  })
}

function* listen(subject, actionType) {
  const chan = yield call(socketEmitter, subject)
  try {
    while (true) {
      let payload = yield take(chan)
      yield put({ type: actionType, payload })
    }
  } finally {
    if (yield cancelled())
      chan.close()
  }
}

function* subscribe(subject, getState) {
  while (true) {
    const nextAction = yield take(SUBSCRIBE_SOCKET)
    const { subject, actionType } = nextAction.payload
    const listenTask = yield fork(listen, subject, actionType)
    const unsubscribed = yield take(UNSUBSCRIBE_SOCKET)
    const { cancelledSubject } = unsubscribed.payload
    if (cancelledSubject == subject)
      yield cancel(listenTask) 
  }
}

from redux-saga.

emirotin avatar emirotin commented on May 3, 2024

@gajus this seems to only fire once for the given event?
But the purpose is to translate every occurrence of the event into an action.

from redux-saga.

gajus avatar gajus commented on May 3, 2024

@gajus this seems to only fire once for the given event?
But the purpose is to translate every occurrence of the event into an action.

Both implementations that I have proposed fire for every event.

We have been using this in staging env for over a week now.

from redux-saga.

emirotin avatar emirotin commented on May 3, 2024

Interesting, I was confused by .once.

On Fri, Jun 10, 2016 at 11:35 AM Gajus Kuizinas [email protected]
wrote:

@gajus https://github.com/gajus this seems to only fire once for the
given event?
But the purpose is to translate every occurrence of the event into an
action.

Both implementations that I have proposed fire for every event.

We have been using this in staging env for over a week now.


You are receiving this because you modified the open/close state.
Reply to this email directly, view it on GitHub
#51 (comment),
or mute the thread
https://github.com/notifications/unsubscribe/AAgGCEiZwsyfz0wsoGOJr2tWCkJtdLWxks5qKT3agaJpZM4HKwvq
.

from redux-saga.

gajus avatar gajus commented on May 3, 2024

Interesting, I was confused by .once.

socketEventIterator will be called every time the previous promise is resolved. Therefore, once binding will be created/ discarded upon each event.

For the record, we are using the first variation (because it is more performant than creating/ discarding an event listener upon each iteration). I haven't extensively tested the once approach beyond ensuring that it is passing all the test cases.

from redux-saga.

m-2k avatar m-2k commented on May 3, 2024
  yield* takeEvery(actionTypes.TRACK_SUBJECT, subscribe)
       ^

from redux-saga.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.