reactivex / rxpy Goto Github PK
View Code? Open in Web Editor NEWReactiveX for Python
Home Page: https://rxpy.rtfd.io
License: MIT License
ReactiveX for Python
Home Page: https://rxpy.rtfd.io
License: MIT License
Is combine_latest
supposed to work with subjects? Whenever I try to subscribe to the combined stream, the interpreter freezees because rx requests a lock.
s1 = rx.subjects.BehaviorSubject(1)
s2 = rx.subjects.BehaviorSubject(2)
s = rx.Observable.combine_latest(s1, s2, lambda a, b: (a, b))
s.subscribe(print) # Freezes here
The output I expect is simply (1, 2)
. I have tried using plain Subject
as well, but it freezes in the same spot. Here is the stack trace after I press Ctrl-C to interrupt:
^CTraceback (most recent call last):
File "combine.py", line 29, in <module>
main()
File "combine.py", line 14, in main
s.subscribe(print)
File "/usr/lib/python3.4/site-packages/rx/observable.py", line 51, in subscribe
return self._subscribe(observer)
File "/usr/lib/python3.4/site-packages/rx/anonymousobservable.py", line 50, in _subscribe
current_thread_scheduler.schedule(set_disposable)
File "/usr/lib/python3.4/site-packages/rx/concurrency/currentthreadscheduler.py", line 54, in schedule
return self.schedule_relative(timedelta(0), action, state)
File "/usr/lib/python3.4/site-packages/rx/concurrency/currentthreadscheduler.py", line 68, in schedule_relative
self.queue.run()
File "/usr/lib/python3.4/site-packages/rx/concurrency/currentthreadscheduler.py", line 37, in run
item.invoke()
File "/usr/lib/python3.4/site-packages/rx/concurrency/scheduleditem.py", line 18, in invoke
self.disposable.disposable = self.invoke_core()
File "/usr/lib/python3.4/site-packages/rx/concurrency/scheduleditem.py", line 33, in invoke_core
return self.action(self.scheduler, self.state)
File "/usr/lib/python3.4/site-packages/rx/anonymousobservable.py", line 34, in set_disposable
auto_detach_observer.disposable = fix_subscriber(subscribe(auto_detach_observer))
File "/usr/lib/python3.4/site-packages/rx/linq/observable/combinelatest.py", line 101, in subscribe
func(idx)
File "/usr/lib/python3.4/site-packages/rx/linq/observable/combinelatest.py", line 98, in func
subscriptions[i].disposable = args[i].subscribe(on_next, observer.on_error, on_completed)
File "/usr/lib/python3.4/site-packages/rx/observable.py", line 51, in subscribe
return self._subscribe(observer)
File "/usr/lib/python3.4/site-packages/rx/subjects/behaviorsubject.py", line 47, in __subscribe
observer.on_next(self.value)
File "/usr/lib/python3.4/site-packages/rx/linq/observable/combinelatest.py", line 90, in on_next
with parent.lock:
KeyboardInterrupt
The following program based on observables works:
o1 = rx.Observable.interval(2000)
o2 = rx.Observable.interval(3000)
s = rx.Observable.combine_latest(o1, o2, lambda a, b: (a, b))
s.subscribe(print)
I figured that the subject version should be able to work similarly, but I could be wrong as I have only a little experience with Rx.
Thanks in advance for any help, and sorry if I am simply using this wrong!
I'm really puzzled by something in RxPY. It seems to be different from RxJS in a crucial way.
Please take a look at my contrived RxJS example on JSFiddle, then look at the equivalent RxPY code below.
In the examples, I create a single stream, then create two new streams by transforming it. I subscribe two observers to the streams. In RxJS, I get the result I expect. In RxPY, I get a confusing result.
import rx
def on_next_1(s):
print "subscriber 1: ", s
def on_next_2(s):
print "subscriber 2: ", s
ns = rx.Observable.from_(['a', 'b', 'c'])
xs = ns.map(lambda s: s)
ys = ns.map(lambda s: s.upper())
xs.subscribe(on_next_1)
xs.merge(ys).subscribe(on_next_2)
This RxPY code produces this output:
subscriber 1: a
subscriber 1: b
subscriber 1: c
This is the output I would expect (just like the RxJS output):
subscriber 1: a
subscriber 1: b
subscriber 1: c
subscriber 2: a
subscriber 2: A
subscriber 2: b
subscriber 2: B
subscriber 2: c
subscriber 2: C
In the RxPY code, it seems the first observer works, but the second observer gets nothing. It's like the ns
is a mutable object that is changing in place. But shouldn't each call to Observable(...).map
return a new Observable
? Is it really the desired/designed behavior, for Observable.from_(<a list>)
to fire just once and be exhausted, even when you're deriving more than one new Observable
s from it?
I cannot tell if this is a bug, or if I'm missing a fundamental point! It seems like the current behavior violates expectations about the Observer pattern. Isn't the contract, that each subscriber gets each message? What am I missing?
Hi, I notice there is a tkinter mainloop scheduler, as PyQt is also very popular and extremely powerful, is there any plan of adding a qt mainloop scheduler?
I'm new to Rx but I'll try to do one first.
The following program illustrates the problem:
import rx
import time
sub = rx.Observable.interval(1000) \
.map(lambda x: 0/(5-x)) \
.subscribe(print, print, print)
try:
while True:
time.sleep(0.1)
except:
sub.dispose()
The map
call will cause a division by 0 exception to occur if x is 5. Since Observable.interval
emits 0, 1, 2, 3, ... this will happen after 6 seconds.
If you exit via a keyboard interrupt (Ctrl+C) before the exception occurs, the program exits normally (since dispose
is called, cancelling the Observable
subscription).
However, if you exit after the exception occurs, the process never ends. Some thread (probably the one controlling Observable.interval
) seems to be blocking the process.
Pressing Ctrl+C again causes the following error to show:
Exception ignored in: <module 'threading' from '/opt/sd/lib/python3.5/threading.py'>
Traceback (most recent call last):
File "/opt/sd/lib/python3.5/threading.py", line 1288, in _shutdown
t.join()
File "/opt/sd/lib/python3.5/threading.py", line 1054, in join
self._wait_for_tstate_lock()
File "/opt/sd/lib/python3.5/threading.py", line 1070, in _wait_for_tstate_lock
elif lock.acquire(block, timeout):
KeyboardInterrupt
I would have thought that an error would cancel the controlling thread (of Observale.interval
), but this does not seem to be the case. What gives?
I'm using Python 3.5.1 and RxPY 1.2.2.
method:
python setup.py sdist bdist_wheel --universal
twine upload dist/*
reason:
pip -r requirements.txt
any tar.gz package in a requirement file is first transform as a wheel, and so if it needs a package for this, you have to create a requirements0.txt file and pip it first, adding complexity to a 'solved by wheel' problem otherwise,I created an observable sequence using interval, subscribed to it, and expected to see an endless parade of timestamps slowly making their way down the screen. Instead I received an attribute error indicating that the TimeoutScheduler has no attribute 'schedule_periodic'. VirtualTimeScheduler appears to be the only scheduler with the method 'schedule_periodic'. Is the wrong Scheduler being generated? Please see the output below...
obs = rx.Observable.interval(1000).subscribe(lambda x: print(x))
Traceback (most recent call last):
File "", line 1, in
File "c:\Python34\lib\site-packages\rx\observable.py", line 50, in subscribe
return self._subscribe(observer)
File "c:\Python34\lib\site-packages\rx\anonymousobservable.py", line 32, in _subscribe
if not auto_detach_observer.fail(ex):
File "c:\Python34\lib\site-packages\rx\abstractobserver.py", line 52, in fail
self.error(exn)
File "c:\Python34\lib\site-packages\rx\autodetachobserver.py", line 24, in error
self.observer.on_error(exn)
File "c:\Python34\lib\site-packages\rx\abstractobserver.py", line 34, in on_error
self.error(error)
File "c:\Python34\lib\site-packages\rx\internal\basic.py", line 24, in default_error
raise err
File "c:\Python34\lib\site-packages\rx\anonymousobservable.py", line 30, in _subscribe
auto_detach_observer.disposable = subscribe(auto_detach_observer)
File "c:\Python34\lib\site-packages\rx\linq\observable_time.py", line 136, in subscribe
return scheduler.schedule_periodic(period, action, state=0)
AttributeError: 'TimeoutScheduler' object has no attribute 'schedule_periodic'
It seems that combine_latest is buggy. I believe the result_selector = args.pop()
line needs to be moved before the preceding if statement:
if args and isinstance(args[0], list):
args = args[0]
else:
args = list(args)
result_selector = args.pop()
This can be achieved using IObservable.Remotable using the .NET flavour of Rx, but what about in Python with RxPy?
Minimal working example: see SO post
The issue documented in the post above seems to be due to the following in rx.concurrency.currentthreadscheduler
if not self.queue:
self.queue = Trampoline(self)
This is not thread-safe yet Scheduler.current_thread = current_thread_scheduler = CurrentThreadScheduler()
is shared across threads and is used in an asynchronous manner by LINQ operators. So should we use a lock as below?
from rx import Lock
class CurrentThreadScheduler(Scheduler):
def __init__(self):
...
self.lock = Lock()
def schedule_relative(self, duetime, action, state=None):
...
with self.lock:
has_queue = self.queue
if not has_queue:
self.queue = Trampoline(self)
if has_queue:
self.queue.enqueue(si)
else:
self.queue = Trampoline(self)
try:
self.queue.enqueue(si)
self.queue.run()
finally:
self.queue.dispose()
self.queue = None
...
On a further note, the self.queue.dispose()
seems also to not be very safe. What if one thread has just called self.queue.enqueue(si)
when another is about to call self.queue.dispose()
? The queued item will end up being dropped silently.
How do you use resources with Observables?
For example: given an Observable sequence of file paths, construct a sequence of the contents of the files.
Using rx either via pip or building the development /master branch results in:
# python
Python 2.7.9 (default, Sep 11 2015, 21:52:46)
[GCC 4.8.3] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> from rx.subjects import Subject
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "rx.py", line 18, in <module>
from rx.subjects import Subject
ImportError: No module named subjects
from rx.subjects import Subject
works inside the RxPy/ sources but not outside. How to fix this?
Any help is highly appreciated.
I'm reading the official RxPy documentation here:
http://rxpy.codeplex.com/documentation
At some point, there is a mention of the "Using Schedulers" topic:
This will be covered in more details in the Using Schedulers topic.
What type of schedulers are supported in RxPy? I am not interested in using the .NET platform, and am wondering if there is (or there will be) support for more Pythonic libraries like gevent or the Tornado ioloop.
Thanks
Would be very helpful to add the list with of all supported Observable methods to some .md file, like here - rxjs_observable. I guess they are using something to autogenerate that.
It seems possible to make the AsyncIOScheduler
available for Python 2.7 through the use of trollius. Perhaps it can be configured through rx.config
, like the class for Future
can be?
The following code tries to use .observe_on(Scheduler.timeout)
to run tasks in parallel:
import time
import threading
import rx
from rx.concurrency.scheduler import Scheduler
tm1=time.time()
lock=threading.Lock()
def log(s):
tm2=time.time()
with lock:
print '%ds: %s on %s' % (round(tm2-tm1), s, threading.currentThread().name)
def work(x):
log('processing '+str(x))
time.sleep(1)
def finish(x):
log('finished '+str(x))
log('started')
rx.Observable.range(1, 3) \
.observe_on(Scheduler.timeout) \
.do_action(work) \
.subscribe(finish)
log('finished ALL')
time.sleep(5) # wait to complete
Expected output:
0s: started on MainThread
0s: finished ALL on MainThread
0s: processing 1 on Thread-1
0s: processing 2 on Thread-2
0s: processing 3 on Thread-3
1s: finished 1 on Thread-1
1s: finished 2 on Thread-2
1s: finished 3 on Thread-3
Actual output:
0s: started on MainThread
0s: processing 1 on Thread-1
0s: finished ALL on MainThread
1s: finished 1 on Thread-1
1s: processing 2 on Thread-2
2s: finished 2 on Thread-2
2s: processing 3 on Thread-3
3s: finished 3 on Thread-3
The problem seems to be in the design of ScheduledObserver
class. The run
method does not schedule next queued item until current item is processed.
If the main thread dies, we see that the scheduled threads are still alive causing system to be in a weired state, is there a way to make scheduler threads demonic?
In scala I can do something like this:
scala> val testScheduler = TestScheduler()
scala> testScheduler.createWorker.schedulePeriodically(1 milliseconds, 1 milliseconds)(println(1))
scala> testScheduler.advanceTimeBy(10 milliseconds)
warning: there was one feature warning; re-run with -feature for details
1
1
1
1
1
1
1
1
1
1
I tried to do the same thing with python; but got this error, obviously has to do something with "state", which is None by default. The problem is, I can't see what is for...
In [48]: s = TestScheduler()
In [49]: s.schedule_periodic(1, write("1"))
1
Out[49]: <rx.disposables.singleassignmentdisposable.SingleAssignmentDisposable at 0x10e864410>
In [50]: s.advance_by(10)
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-50-f16a9b67196f> in <module>()
----> 1 s.advance_by(10)
/Users/caente/nlp/lib/python2.7/site-packages/rx/concurrency/virtualtimescheduler.pyc in advance_by(self, time)
135 if self.comparer(self.clock, dt) > 0:
136 raise ArgumentOutOfRangeException()
--> 137 return self.advance_to(dt)
138
139 def sleep(self, time):
/Users/caente/nlp/lib/python2.7/site-packages/rx/concurrency/virtualtimescheduler.pyc in advance_to(self, time)
117 self.clock = next.duetime
118
--> 119 next.invoke()
120 else:
121 self.is_enabled = False
/Users/caente/nlp/lib/python2.7/site-packages/rx/concurrency/scheduleditem.pyc in invoke(self)
15
16 def invoke(self):
---> 17 self.disposable.disposable = self.invoke_core()
18
19 def compare_to(self, other):
/Users/caente/nlp/lib/python2.7/site-packages/rx/concurrency/scheduleditem.pyc in invoke_core(self)
24
25 def invoke_core(self):
---> 26 return self.action(self.scheduler, self.state)
27
28 def __lt__(self, other):
/Users/caente/nlp/lib/python2.7/site-packages/rx/concurrency/virtualtimescheduler.pyc in run(scheduler, state1)
63 self.queue.remove(si)
64
---> 65 return action(scheduler, state1)
66
67 si = ScheduledItem(self, state, run, duetime, self.comparer)
/Users/caente/nlp/lib/python2.7/site-packages/rx/concurrency/scheduler.pyc in action1(s, p)
162
163 def action1(s, p):
--> 164 return self.invoke_rec_date(s, p, 'schedule_relative')
165
166 return self.schedule_relative(duetime, action1, state={ "first": state, "second": action })
/Users/caente/nlp/lib/python2.7/site-packages/rx/concurrency/scheduler.pyc in invoke_rec_date(self, scheduler, pair, method)
109
110 action(state1, action1)
--> 111 recursive_action(state)
112 return group
113
/Users/caente/nlp/lib/python2.7/site-packages/rx/concurrency/scheduler.pyc in recursive_action(state1)
108 is_added = True
109
--> 110 action(state1, action1)
111 recursive_action(state)
112 return group
/Users/caente/nlp/lib/python2.7/site-packages/rx/concurrency/scheduleperiodicrecursive.pyc in tick(self, command, recurse)
22 recurse(0, self._period)
23 try:
---> 24 self._state = self._action(self._state)
25 except Exception as exn:
26 self._cancel.dispose()
TypeError: 'NoneType' object is not callable
What does it mean? I have this error log.
File "/home/karl/.virtualenvs/banner/local/lib/python2.7/site-packages/rx/concurrency/scheduler.py", line 34
nonlocal is_done
^
SyntaxError: invalid syntax
@dbrattli
I think it is good that docstring of RxPY and sample code are published in reactivex.github.io.
I will make sample code of RxPY.
I will send docstring of RxPY and sample code to reactivex.github.io by pull request.
Would you tell me whether there is a problem?
Hello I had to revisit a project that is using RxPY, it was on version 0.10, so I upgraded to version 1.0.0.
Now it seems like Observable.from_array no longer exists, is there any option to create an observable from an array?
When creating a stream using the "sample" method with an observable as sampler, I get the following error when on_completed is called "TypeError: sample_subscribe() takes exactly 1 argument (0 given)"
Here is a the sample code how to reproduce it:
import rx
def show(x):
print x
obs = rx.subjects.Subject()
sampler = rx.subjects.Subject()
obs.sample(sampler=sampler).subscribe(show)
obs.on_next(1)
sampler.on_next(0)
sampler.on_completed()
And the output:
1
Traceback (most recent call last):
File "sample_test.py", line 14, in <module>
sampler.on_completed()
File "/usr/local/lib/python2.7/dist-packages/rx/subjects/subject.py", line 57, in on_completed
observer.on_completed()
File "/usr/local/lib/python2.7/dist-packages/rx/abstractobserver.py", line 35, in on_completed
self._completed()
TypeError: sample_subscribe() takes exactly 1 argument (0 given)
Thanks for your help
The following program illustrates the problem:
import rx
import asyncio
# Bug is with the AsyncIO scheduler
scheduler = rx.concurrency.AsyncIOScheduler()
# Works with the default scheduler (timeout)
#scheduler = None
subscription = rx.Observable.interval(1000.0, scheduler=scheduler) \
.subscribe(print, print, print)
try:
loop = asyncio.get_event_loop()
loop.run_forever()
except:
pass
subscription.dispose()
No output is generated with the AsyncIO scheduler. Replace 1000.0
with 1000
and it works. Both cases work with the default scheduler (timeout).
Very subtle bug. I couldn't figure out why no events were firing until I isolated my code to the above.
We probably want to avoid having to write with to separate operators that have the same name but have different signatures. There are several ways to fix this using singledispatch or multimethods. Singledispatch is included in Python-3, and available from pypi for Python-2.
Generators cannot be resumed from multiple threads simultaneously. This causes Observable.from_
to throw if Scheduler.new_thread
is used.
The solution seems to be to wrap the passed-in generator as a thread-safe iterator that serializes calls to __next__()
. However, given that Observable
knows which scheduler it's using, perhaps it could be smarter about this case?
ACTUAL
o = Observable.from_(my_generator(), scheduler=Scheduler.new_thread)
o.subscribe(lambda x: print('aaa', x))
o.subscribe(lambda x: print('bbb', x))
This fails due to a race condition.
EXPECTED
o = Observable.from_(my_generator(), scheduler=Scheduler.new_thread)
o.subscribe(lambda x: print('aaa', x))
o.subscribe(lambda x: print('bbb', x))
This should not fail and Observable
could wrap my_generator()
in a thread-safe iterator.
WORKAROUND
class threadsafe_iter(object):
def __init__(self, it)
self.it = it
self.lock = Lock()
def __iter__(self):
return self
def __next__(self):
with self.lock:
return next(self.it)
o = Observable.from_(threadsafe_iter(my_generator()), scheduler=Scheduler.new_thread)
o.subscribe(lambda x: print('aaa', x))
o.subscribe(lambda x: print('bbb', x))
This works fine.
I could provide a complete example if needed.
Hi,
Not sure where else to ask this, but I don't see how the library is able to support actual concurrent operation. The recursive scheduler in particular seems to actually serialize computation for the stream. From scheduledobserver.py:
try:
work()
except Exception:
with self.lock:
parent.queue = []
parent.has_faulted = True
raise
recurse()
i.e. work() is called followed by recurse() which serializes processing of the stream (and does preserve order). My understanding is that one should be able to get concurrency via different scheduler choices (potentially giving up preserving order). Am I missing something?
Rx version: 1.2.4
on both python2 and python3.
I'm new to rx and maybe I'm doing something wrong but using repeat with value > 60 causes RuntimeError: maximum recursion depth exceeded while calling a Python object.
Code example:
Observable.just(0).repeat().subscribe(lambda _: None)
Minimal working examples: see SO post
The issue documented in the post above seems to be due to the following in rx.autodetachobserver
def _next(self, value):
try:
self.observer.on_next(value)
except Exception:
self.dispose()
raise
MWE1 cause: when an Exception
is caught, it is simply raise
d and self.observer.on_error()
does not get called.
MWE2 cause: The call to self.dispose()
causes self.is_stopped
to be set to True
so when self.on_error()
is called, the condition if not self.is_stopped
is False
and so prevents anything from happening.
So should it not just be:
def _next(self, value):
self.observer.on_next(value)
because you don't need to catch errors in _next
. This is because the caller uses catches exceptions -- I'm thinking about use cases like:
def subscribe(observer) # observer is AutoDetachObserver
try:
observer.on_next(3)
except Exception as e:
observer.on_error(e)
Minimal working example: see SO post
The issue documented in the post above seems to be due to the following in rx.concurrency.mainloopscheduler.TwistedScheduler
:
def dispose():
handle[0].cancel()
I believe the problem is that handle[0]
(a delayedCall
) might already have been called prior to cancellation. So should we wrap the above with a try
?
def dispose():
try:
handle[0].cancel()
except AlreadyCalled:
pass
Starting the tornado autocomplete example on python 2.7.5 using RxPY from the master branch at commit 82212d0 (latest, cloned on 2014-09-22):
$ python autocomplete.py
Starting server at port: 8080
WebSocket opened
WARNING:tornado.access:404 GET /favicon.ico (127.0.0.1) 0.30ms
Traceback (most recent call last):
File "/home/anglerud/tmp/RxPY/rx/autodetachobserver.py", line 17, in next
self.observer.on_next(value)
File "/home/anglerud/tmp/RxPY/rx/abstractobserver.py", line 24, in on_next
self.next(value)
File "/home/anglerud/tmp/RxPY/rx/linq/observable/switchlatest.py", line 37, in on_next
inner_source = Observable.from_future(inner_source)
TypeError: unbound method from_future() must be called with Observable instance as first argument (got Future instance instead)
I am using BehaviorSubjects all over the place my current GUI application, because they are an awesome way to store values and get updates. I have some situations where I use .pausable()
when combining multiple streams of values. I do this because I update many of the streams at once, and I don't want to compute results in derived streams until all of the parent streams are updated. That is, I set the pausable to False, make updates to multiple streams, and then set the pausable to True.
I am encountering what I think is an error when I pause and immediately unpause without changing the value. Here is a sample script:
import rx
def normal_subject():
pauser = rx.subjects.BehaviorSubject(True)
subject = rx.subjects.Subject()
subject.pausable(pauser).subscribe(print)
subject.on_next("print value")
print("Subject pause/unpause")
pauser.on_next(False)
pauser.on_next(True)
def behavior_subject():
pauser = rx.subjects.BehaviorSubject(True)
subject = rx.subjects.BehaviorSubject("print value")
subject.pausable(pauser).subscribe(print)
print("BehaviorSubject pause/unpause")
pauser.on_next(False)
pauser.on_next(True)
if __name__ == "__main__":
print("Subject:")
normal_subject()
print("\nBehaviorSubject:")
behavior_subject()
Here is the output:
$ python rx_pause.py
Subject:
print value
Subject pause/unpause
BehaviorSubject:
print value
BehaviorSubject pause/unpause
print value
In the normal subject situation, the value is only printed once, right when on_next
is called. When we pause and then unpause, the value is not printed again. In the BehaviorSubject version, the value is printed twice, the second time being when the pausable is paused then unpaused. It is my understanding that both situations should produce the same output, at least based off of the RxMarbles website.
My hunch is pausable
is implemented using some sort of subscribe/unsubscribe mechanism. When used with BehaviorSubjects, the re-subscription causes the latest value to be emitted over again, even though no values were emitted while paused. I would think that pausable
should be implemented by ignoring on_next
, so values are simply ignored while paused, and we don't resubscribe when we unpause our stream.
Again, I'm not quite sure that this is a bug, but it definitely feels unintended. I think that if two streams emit the same values into the same operator, we should get the same result.
Version 1.2.2 has been out for almost 2 months, but PyPI still only has version 1.2.1. Could someone with the power to do so update the PyPI version of Rx? @ardoramor made note of this in #50 but I decided to make a separate issue in case his message was buried. Thanks in advance!
Currently absolute imports are used. This prevents the rxpy package to be easily included in other packages, when a package manager/installer isn't available (for example, a Python-extensible app that imposes non-standard requirements). By using relative imports, it'd be easy to just drop the rxpy package in a larger package and be able to import it.
IIRC, relative imports are available since py2.5 (?) so this shouldn't affect compat?
I try to use iterator as observable but I want this observable to be as cold as possible, i.e. generate new values only when there is any subscriber. But when I dispose subscriber iterator still works. The code is as follow:
from time import sleep
from rx import Observable
from rx.concurrency import new_thread_scheduler
def myiterator():
for i in range(100):
sleep(1)
print('Publishing value: {0}'.format(i))
yield i
source = Observable.from_(myiterator(), scheduler=new_thread_scheduler)
subscription = source.subscribe(
lambda v: print("Value published: {0}".format(v)),
lambda e: print("Error! {0}".format(e)),
lambda: print("Completed!")
)
sleep(3)
print('Before dispose')
subscription.dispose()
print('After dispose')
sleep(3)
print('Finish')
and the output is:
Publishing value: 0
Value published: 0
Publishing value: 1
Value published: 1
Before dispose
After dispose
Publishing value: 2
Publishing value: 3
Publishing value: 4
Finish
I suppose there should be no publishing value after subscriber disposal. Am I correct?
Hey man, finally I have an actual issue :-)
One of my problems was that on_error_resume_next takes an Observable(or more) as parameter, instead of a function that takes an Exception and returns an Observable.
In java/scala I can do something like this:
Observable.defer(Observable[Result[A]](_.onNext(Result(action(msg.getBody), msg)))).onErrorResumeNext {
ex =>
logger.error(s"Error processing ${msg.getBody}", ex)
Observable.empty
}
While in python I only can do something like this:
def __process(msg, action):
def result(m):
return lambda: Observable(lambda s: s.on_next(Result(action(m.get_body()), m)))
return Observable.defer(result(msg)).on_error_resume_next(Observable.empty())
It seems silly, but that logger is actually logging the exception to loggly, is a information that we really need, so is kind of a big deal to know why was an exception, even when everything should keep working as if nothing happened. In other words, the subscribers of the that Observable should never receive the error, but the error has to be logged.
So, what's the rationale to take off the exception? Is there a chance to get it back?
I feel like this should work
def myprint(*args):
print(*args)
return None
drive = Observable.from_list(list(range(33))).publish()
reg = Subject().buffer_with_count(8).do_action(myprint)
drive.subscribe(reg)
drive.connect()
Or maybe there's another way. I'd really like to be able to create Subjects the same way I create Observables using Linq, then hook them up in a network up with some master Observable to drive it. But I can't seem to manage to create even one this way.
py3)michaels-air:try mfox$ python tryrx.py
Traceback (most recent call last):
File "/Users/mfox/anaconda/envs/py3/lib/python3.4/site-packages/Rx-0.10.3-py3.4.egg/rx/autodetachobserver.py", line 17, in next
self.observer.on_next(value)
File "/Users/mfox/anaconda/envs/py3/lib/python3.4/site-packages/Rx-0.10.3-py3.4.egg/rx/subjects/subject.py", line 94, in on_next
observer.on_next(value)
File "/Users/mfox/anaconda/envs/py3/lib/python3.4/site-packages/Rx-0.10.3-py3.4.egg/rx/abstractobserver.py", line 24, in on_next
self.next(value)
TypeError: 'AnonymousObservable' object is not callable
(py3)michaels-air:try mfox$
Hi, me again, translating from Rx scala to python...
In my scala code I have:
private def process[A](msg: SQSMessage, action: Action[A]): Observable[Result[A]] =
Observable.defer(Observable(_ => Result(action(msg.getBody), msg)))
process(msg, action).onErrorResumeNext {
ex =>
logger.error("Error processing message", ex)
sendMessage(queueName)(msg.getBody)
}
Trying to do the same with python does work:
def __process(msg, action):
return Observable.defer(lambda: Observable(lambda s: s.on_next(Result(action(msg.get_body()), msg))))
send = lambda message: self.send_message(queue_name, message.get_body())
__process(message, action).on_error_resume_next(send)
The problem is when action throws an exception, in the scala version onErrorResumeNext is called and sendMessage returns an Observable. In the python version though, this is what happens(I'm using python 2.7.5):
File "/Users/caente/nlp/lib/python2.7/site-packages/rx/autodetachobserver.py", line 17, in next
self.observer.on_next(value)
File "/Users/caente/nlp/lib/python2.7/site-packages/rx/abstractobserver.py", line 24, in on_next
self.next(value)
File "/Users/caente/nlp/lib/python2.7/site-packages/rx/linq/observable/merge.py", line 172, in on_next
on_complete)
File "/Users/caente/nlp/lib/python2.7/site-packages/rx/observable.py", line 47, in subscribe
return self._subscribe(observer)
File "/Users/caente/nlp/lib/python2.7/site-packages/rx/anonymousobservable.py", line 43, in _subscribe
set_disposable()
File "/Users/caente/nlp/lib/python2.7/site-packages/rx/anonymousobservable.py", line 35, in set_disposable
if not auto_detach_observer.fail(ex):
File "/Users/caente/nlp/lib/python2.7/site-packages/rx/abstractobserver.py", line 52, in fail
self.error(exn)
File "/Users/caente/nlp/lib/python2.7/site-packages/rx/autodetachobserver.py", line 24, in error
self.observer.on_error(exn)
File "/Users/caente/nlp/lib/python2.7/site-packages/rx/abstractobserver.py", line 34, in on_error
self.error(error)
File "/Users/caente/nlp/lib/python2.7/site-packages/rx/abstractobserver.py", line 34, in on_error
self.error(error)
File "/Users/caente/nlp/lib/python2.7/site-packages/rx/autodetachobserver.py", line 24, in error
self.observer.on_error(exn)
File "/Users/caente/nlp/lib/python2.7/site-packages/rx/abstractobserver.py", line 34, in on_error
self.error(error)
File "/Users/caente/nlp/lib/python2.7/site-packages/rx/internal/basic.py", line 26, in default_error
raise err
TypeError: unbound method from_future() must be called with Observable instance as first argument (got function instance instead)
Hello,
I am currently using Reactive Programming paradigm as an alternative to the Actor model. I was expecting that it would be easy to dictate processing in multiple processes by simply using the correct scheduler. After looking I can't find any scheduler that seems to use multiple processes. I was thinking about building a ProcessPoolScheduler, but I was wondering what problems you guys see with implementing something like that?
I understand that the Reactive contract is such that streams are processed in order, but I know many applications don't require that assumption.
Also if something like the ProcessPoolScheduler is not possible, how might one go about trying to build a parallel processing system in Python where you have the GIL to contend with.
Thanks,
Jeffrey
I'm probably just doing something wrong. I've been trying to play with GEvent support, but I am having a hard time getting it to work. My first attempt tries to just use the GEventScheduler to print a series of words:
import rx
from rx.concurrency.mainloopscheduler import GEventScheduler
from rx.concurrency import CurrentThreadScheduler
import gevent
words = ["carriage", "boots", "intriguing", "pepper", "vitamins"]
def output(result):
print(result)
scheduler = GEventScheduler()
#scheduler = CurrentThreadScheduler()
rx.Observable.from_iterable(words, scheduler=scheduler).subscribe(output)
gevent.wait()
The above code only outputs the first word ("carriage"). Any idea why this terminates early?
I am trying to use Subject
to create a loop (connecting the end of a stream to the beginning). I have a real use-case for this, but I wrote a contrived script to isolate my problem. I'm confused, because it seems like there are duplicate events.
(Note, I'm running Python 2.7, and I have tornado
installed.)
Here's the code without the loop connected:
from __future__ import print_function
import re
import time
from rx import Observable
from rx.concurrency import IOLoopScheduler
from rx.subjects import Subject
from tornado import ioloop
scheduler = IOLoopScheduler()
seed_list = ['a', 'b', '1', '2', 'c', ]
def check_no_numbers(s):
"""Filters out strings w/ number characters
"""
if not re.search(r'\d', s):
print("+", s)
return True
else:
print("x", s)
return False
def dot(s):
return s + '.' #+ str(uuid4())
if __name__ == "__main__":
xs = Observable.from_(seed_list, scheduler=IOLoopScheduler())
subject = Subject()
def push_new(x):
print('pushing: {!r}'.format(x))
time.sleep(0.1)
subject.on_next(x)
xs.subscribe(on_next=push_new)
filtered = subject.filter(check_no_numbers).filter(lambda s: len(s) <= 3)
results = filtered.map(lambda x: "==> {}".format(x))
news = filtered.map(dot)
# news.subscribe(on_next=push_new) # DON'T loop
def out(result):
print(result)
results.subscribe(on_next=out)
ioloop.IOLoop.current().start()
The output is not surprising to me. Each event goes through the pipeline once, as expected:
pushing: 'a'
+ a
==> a
pushing: 'b'
+ b
==> b
pushing: '1'
x 1
pushing: '2'
x 2
pushing: 'c'
+ c
==> c
Only one line is different in the following code. The code subscribes one observer to the Subject
at the beginning of the pipeline, causing the expected looping. But the surprise is that the events seem to be duplicated.
from __future__ import print_function
import re
import time
from rx import Observable
from rx.concurrency import IOLoopScheduler
from rx.subjects import Subject
from tornado import ioloop
scheduler = IOLoopScheduler()
seed_list = ['a', 'b', '1', '2', 'c', ]
def check_no_numbers(s):
"""Filters out strings w/ number characters
"""
if not re.search(r'\d', s):
print("+", s)
return True
else:
print("x", s)
return False
def dot(s):
return s + '.' # + str(uuid4())
if __name__ == "__main__":
xs = Observable.from_(seed_list, scheduler=IOLoopScheduler())
subject = Subject()
def push_new(x):
print('pushing: {!r}'.format(x))
time.sleep(0.1)
subject.on_next(x)
xs.subscribe(on_next=push_new)
filtered = subject.filter(check_no_numbers).filter(lambda s: len(s) <= 3)
results = filtered.map(lambda x: "==> {}".format(x))
news = filtered.map(dot)
news.subscribe(on_next=push_new) # loop
def out(result):
print(result)
results.subscribe(on_next=out)
ioloop.IOLoop.current().start()
This produces the following, surprising output:
pushing: 'a'
+ a
pushing: 'a.'
+ a.
pushing: 'a..'
+ a..
pushing: 'a...'
+ a...
+ a...
+ a..
==> a..
+ a.
==> a.
+ a
==> a
pushing: 'b'
+ b
pushing: 'b.'
+ b.
pushing: 'b..'
+ b..
pushing: 'b...'
+ b...
+ b...
+ b..
==> b..
+ b.
==> b.
+ b
==> b
pushing: '1'
x 1
x 1
pushing: '2'
x 2
x 2
pushing: 'c'
+ c
pushing: 'c.'
+ c.
pushing: 'c..'
+ c..
pushing: 'c...'
+ c...
+ c...
+ c..
==> c..
+ c.
==> c.
+ c
==> c
If you sort
these lines, you can see the duplication very clearly (note how you only see pushing: <x>
once, but see <x>
twice, for each value...)
+ a
+ a
+ a.
+ a.
+ a..
+ a..
+ a...
+ a...
# snipped
pushing: 'a'
pushing: 'a.'
pushing: 'a..'
pushing: 'a...'
# snipped
(For another way to see the behavior, you can uncomment the snippet in the dot()
function, that calls uuid4()
, to tag the strings.)
Is this a bug?
Just an idea, that I'm wondering if I should explore or nip in the bud. Looking for some Rx wisdom I suppose, since I'm a newb.
I'm looking at these:
I'm also thinking about Celery and how to either integrate it with RxPy, or to imitate a small part of RxPy with Celery's operators. Integration would be nicer. Or if RxPy + ((Twisted or Tornado) + pika + RabbitMQ) could replace the need for Celery for some projects... that would also be interesting.
Is this something worth thinking about and trying to work on? Or am I mixing incompatible patterns? Opinions?
The class extension method zip_array's action() method has this code:
for _ in queues:
queues[n] = []
n is set to len(queues) earlier in the method, so this throws index out of range error.
When merging two observables I got this stack trace. It seems a comparison is between an int
and an AnonymousObservable and a type error exception is thrown.
I am using python 3.4.1 and Rx 0.9.3
from rx import Observable
source1 = Observable.range(1, 3)
source2 = Observable.range(1, 3)
x = source1.merge(source2)
x.subscribe(
lambda x: print("Observer 1: OnNext: ", x),
lambda ex: print("Observer 1: OnError: ", ex.Message),
lambda: print("Observer 1: OnCompleted")
)
/Library/Frameworks/Python.framework/Versions/3.4/bin/python3.4-32 /Users/larsholm/PycharmProjects/Py1Proj/py1.py
Traceback (most recent call last):
File "/Users/larsholm/Library/Python/3.4/lib/python/site-packages/rx/autodetachobserver.py", line 17, in next
self.observer.on_next(value)
File "/Users/larsholm/Library/Python/3.4/lib/python/site-packages/rx/abstractobserver.py", line 24, in on_next
self.next(value)
File "/Users/larsholm/Library/Python/3.4/lib/python/site-packages/rx/linq/observable/merge.py", line 58, in on_next
if active_count[0] < max_concurrent_or_other:
TypeError: unorderable types: int() < AnonymousObservable()
Process finished with exit code 0
Hey,
Thank you so much for maintaining this module. It's super useful.
.controlled()
extension method to turn anObservable
into a ControlledObservable
? rxJS appears to have one.WindowedObservable
... self.subscription
is referenced before it is set which throws an exception.import rx
source = rx.backpressure.controlledobservable.ControlledObservable(
rx.Observable.from_iterable([1,2,3,4,5]),
enable_queue = False,
).windowed(3)
ControlledObservable
to work... Even this basic chain throws an exception. Is it a bug or am I doing something wrong?import rx
import logging
logger = logging.getLogger(__name__)
logging.basicConfig(stream=sys.stderr,)
source = rx.backpressure.controlledobservable.ControlledObservable(
rx.Observable.from_iterable([1,2,3,4,5]),
enable_queue = False,
)
for number in source.to_blocking():
print number
Which outputs the following stack trace...
Traceback (most recent call last):
File "sample.py", line 37, in <module>
for number in source.to_blocking():
File "/Library/Python/2.7/site-packages/rx/linq/observable/blocking/toiterable.py", line 63, in __iter__
return self.to_iterable()
File "/Library/Python/2.7/site-packages/rx/linq/observable/blocking/toiterable.py", line 29, in to_iterable
self.observable.materialize().subscribe(on_next)
File "/Library/Python/2.7/site-packages/rx/observable.py", line 51, in subscribe
return self._subscribe(observer)
File "/Library/Python/2.7/site-packages/rx/anonymousobservable.py", line 50, in _subscribe
current_thread_scheduler.schedule(set_disposable)
File "/Library/Python/2.7/site-packages/rx/concurrency/currentthreadscheduler.py", line 54, in schedule
return self.schedule_relative(timedelta(0), action, state)
File "/Library/Python/2.7/site-packages/rx/concurrency/currentthreadscheduler.py", line 68, in schedule_relative
self.queue.run()
File "/Library/Python/2.7/site-packages/rx/concurrency/currentthreadscheduler.py", line 37, in run
item.invoke()
File "/Library/Python/2.7/site-packages/rx/concurrency/scheduleditem.py", line 18, in invoke
self.disposable.disposable = self.invoke_core()
File "/Library/Python/2.7/site-packages/rx/concurrency/scheduleditem.py", line 33, in invoke_core
return self.action(self.scheduler, self.state)
File "/Library/Python/2.7/site-packages/rx/concurrency/scheduler.py", line 128, in scheduled_action
return self.invoke_rec_immediate(scheduler, pair)
File "/Library/Python/2.7/site-packages/rx/concurrency/scheduler.py", line 83, in invoke_rec_immediate
action(inner_action, state)
File "/Library/Python/2.7/site-packages/rx/linq/observable/fromiterable.py", line 39, in action
observer.on_next(item)
File "/Library/Python/2.7/site-packages/rx/autodetachobserver.py", line 16, in _next
self.observer.on_next(value)
File "/Library/Python/2.7/site-packages/rx/autodetachobserver.py", line 16, in _next
self.observer.on_next(value)
TypeError: 'ControlledSubject' object is not callable
The following program illustrates the problem:
import asyncio
import rx
import time
scheduler = rx.concurrency.AsyncIOScheduler()
sub0 = rx.Observable.interval(1000, scheduler=scheduler) \
.subscribe(print, print, print)
try:
loop = asyncio.get_event_loop()
loop.run_forever()
except:
pass
sub0.dispose()
I would expect the above to exit normally. However, I get the following error message instead:
Traceback (most recent call last):
File "./rx_test.py", line 17, in <module>
sub0.dispose()
File "/opt/sd/lib/python3.5/site-packages/rx/autodetachobserver.py", line 40, in dispose
self.m.dispose()
File "/opt/sd/lib/python3.5/site-packages/rx/disposables/booleandisposable.py", line 50, in dispose
old.dispose()
File "/opt/sd/lib/python3.5/site-packages/rx/disposables/compositedisposable.py", line 63, in dispose
disposable.dispose()
File "/opt/sd/lib/python3.5/site-packages/rx/disposables/booleandisposable.py", line 50, in dispose
old.dispose()
AttributeError: 'int' object has no attribute 'dispose'
Python 3.5.1 and RxPY master branch (where I have applied the fix for issue #90).
I mean, I'm probably just not getting it. Looking at RxJava documentation and test_replaysubject.py
I'm trying to make a network of Observables that runs in a predictable, deterministic way. (This might not be what RxPY is for? but anyway). I've found one approach that works better, using publish()
and connect()
. But it seems like ReplaySubject
is advertised to accomplish this. If it worked, this code would print the sequence twice. It only prints it once.
from rx.subjects import (Subject, ReplaySubject)
from rx import Observable, Observer
# An observer who takes (int, nbits) and emits ASCII '0' and '1' LSB first
def to_bitstring(t):
val, nbits = t
return Observable.from_iterable([str(1 & (val >> i)) for i in range(nbits)])
# An observer who takes things and prints them
def myprint(*args):
print(*args)
return None
printer = Observer(myprint)
tstdata = [(0xFF, 8), (0xa0, 8)]
expander = Observable.\
from_iterable(tstdata).map(to_bitstring).concat_all()
replay = ReplaySubject()
expander.subscribe(replay)
replay.subscribe(printer)
replay.subscribe(printer)
Is there any plans to implement thread_pool scheduler?
In rx/linq/observable/startswith.py
, the scheduler is fetched from the keyword args. However, it's then immediately overridden by the following if-else. I'm not sure if allowing a scheduler in the keywords is desired, but either way, I think that the current implementation is mistaken.
I don't actually use the scheduler arg (I was looking at start_with for something else), so this isn't high priority for me. I just wanted to bring it up.
Hi,
Thanks for putting this library together! I'm really enjoying learning about reactive extensions.
One issue I've encountered is that some of the examples (for example ironpython) have Python 2.x print statements. EG:
print "x y z"
So will not run in Python 3.x. However, the notebook tutorial has statements like:
xs = d.subscribe(print)
Which only works in Python 3.x. How would one subscribe the print function in Python 2.x?
I created a StackOverflow post describing my problem.
The issue is that I have a partial function (created via functools.partial
) that I want to pass to flat_map
. However, doing so results in the following:
Traceback (most recent call last):
File "retry/example.py", line 46, in <module>
response_stream = message_stream.flat_map(functools.partial(message_handler, context=context))
File "/home/justin/virtualenv/retry/local/lib/python2.7/site-packages/rx/linq/observable/selectmany.py", line 67, in select_many
selector = adapt_call(selector)
File "/home/justin/virtualenv/retry/local/lib/python2.7/site-packages/rx/internal/utils.py", line 37, in adapt_call_1
argnames, varargs, kwargs = getargspec(func)[:3]
File "/usr/lib/python2.7/inspect.py", line 816, in getargspec
raise TypeError('{!r} is not a Python function'.format(func))
TypeError: <method-wrapper '__call__' of functools.partial object at 0x2ce6cb0> is not a Python function
Here is some sample code which reproduces the problem:
from __future__ import absolute_import
from rx import Observable, Observer
from pykafka import KafkaClient
from pykafka.common import OffsetType
import logging
import requests
import functools
logger = logging.basicConfig()
def puts(thing):
print thing
def message_stream(consumer):
def thing(observer):
for message in consumer:
observer.on_next(message)
return Observable.create(thing)
def message_handler(message, context=None):
def req():
return requests.get('http://httpbin.org/get')
return Observable.start(req)
def handle_response(message, response, context=None):
consumer = context['consumer']
producer = context['producer']
t = 'even' if message % 2 == 0 else 'odd'
return str(message) + ': ' + str(response) + ' - ' + t + ' | ' + str(consumer) + ' | ' + producer
consumer = ['pretend', 'these', 'are', 'kafka', 'messages']
producer = 'some producer'
context = {
'consumer': consumer,
'producer': producer
}
message_stream = message_stream(consumer)
response_stream = message_stream.flat_map(functools.partial(message_handler, context=context))
message_response_stream = message_stream.zip(response_stream, functools.partial(handle_response, context=context))
message_stream.subscribe(puts)
Install RxPy via pip.
Have some messages, what does it mean? It's correct?
Installing collected packages: rx
Running setup.py install for rx
File "/home/karl/.virtualenvs/banner/lib/python2.7/site-packages/rx/linq/observable_time.py", line 43
class ObservableTime(Observable, metaclass=ObservableMeta):
^
SyntaxError: invalid syntax
File "/home/karl/.virtualenvs/banner/lib/python2.7/site-packages/rx/linq/observable_concurrency.py", line 6
class ObservableConcurrency(Observable, metaclass=ObservableMeta):
^
SyntaxError: invalid syntax
File "/home/karl/.virtualenvs/banner/lib/python2.7/site-packages/rx/linq/observable_leave.py", line 6
class ObservableLeave(Observable, metaclass=ObservableMeta):
^
SyntaxError: invalid syntax
File "/home/karl/.virtualenvs/banner/lib/python2.7/site-packages/rx/linq/observable_multiple.py", line 10
class ObservableMultiple(Observable, metaclass=ObservableMeta):
^
SyntaxError: invalid syntax
File "/home/karl/.virtualenvs/banner/lib/python2.7/site-packages/rx/linq/observable_creation.py", line 7
class ObservableCreation(Observable, metaclass=ObservableMeta):
^
SyntaxError: invalid syntax
File "/home/karl/.virtualenvs/banner/lib/python2.7/site-packages/rx/linq/standardsequenceoperators.py", line 25
class ObservableLinq(Observable, metaclass=ObservableMeta):
^
SyntaxError: invalid syntax
File "/home/karl/.virtualenvs/banner/lib/python2.7/site-packages/rx/linq/observable_single.py", line 40
nonlocal is_disposed
^
SyntaxError: invalid syntax
File "/home/karl/.virtualenvs/banner/lib/python2.7/site-packages/rx/concurrency/scheduler.py", line 34
nonlocal is_done
^
SyntaxError: invalid syntax
File "/home/karl/.virtualenvs/banner/lib/python2.7/site-packages/rx/testing/testscheduler.py", line 98
nonlocal source
^
SyntaxError: invalid syntax
Successfully installed rx
Cleaning up...
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.