gmr / rejected Goto Github PK
View Code? Open in Web Editor NEWrejected is a consumer framework for RabbitMQ
License: BSD 3-Clause "New" or "Revised" License
rejected is a consumer framework for RabbitMQ
License: BSD 3-Clause "New" or "Revised" License
Consumes up to N messages, then shuts down
The configuration example at https://rejected.readthedocs.io/en/stable/example_config.html is still using the old ssl configuration instead of the newer ssl_options configuration. This can be confusing to users who are copying the example as a starting point for their own project configuration. Please update the example to be accurate with the project's API.
ssl_options documentation: https://rejected.readthedocs.io/en/stable/configuration.html#ssl-options
See https://travis-ci.org/petere/rejected/jobs/18373397.
couchconfig
is not listed in requirements, so those tests should perhaps be skipped.
The from rejected import state
line looks wrong. It should perhaps be something like from ..state import State
.
There are more issues once these two are fixed.
>>> import json, zlib
>>> body = {'name': u'Då√e'}
>>> json.dumps(body, ensure_ascii=False)
u'{"name": "D\xe5\u221ae"}'
>>> zlib.compress(_)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
UnicodeEncodeError: 'ascii' codec can't encode characters in position 11-12: ordinal not in range(128)
>>>
_dump_json_value should probably encode('utf-8')
the result so that it is consistently return bytes.
It is possible to get into the generic except Exception
case of consumer.Consumer.execute
with a future that does not have an exception set. The code in question is https://github.com/gmr/rejected/blob/master/rejected/consumer.py#L888-L896
except Exception as error:
exc_info = sys.exc_info()
if concurrent.is_future(result):
error = result.exception()
exc_info = result.exc_info()
self.log_exception('Exception processing delivery %s: %s',
message_in.delivery_tag, error,
exc_info=exc_info)
self._measurement.set_tag('exception', error.__class__.__name__)
raise gen.Return(data.UNHANDLED_EXCEPTION)
If the consumer class does not implement the process
method, then a NotImplementedError
is raised (that is what error
is set to) but the result
is a Future
that is finished with both the result and exception set to None
. Since it is a future, both error
and exc_info
are overwritten with None
. log_exception
then tries to iterate over None
with results in a TypeError
. Here is an example traceback:
Traceback (most recent call last):
File "/Users/daveshawley/opt/lib/python2.7/unittest/case.py", line 329, in run
testMethod()
File "/Users/daveshawley/Source/python/some-consumer/env/lib/python2.7/site-packages/tornado/testing.py", line 136, in __call__
result = self.orig_method(*args, **kwargs)
File "/Users/daveshawley/Source/python/some-consumer/env/lib/python2.7/site-packages/tornado/testing.py", line 529, in post_coroutine
timeout=timeout)
File "/Users/daveshawley/Source/python/some-consumer/env/lib/python2.7/site-packages/tornado/ioloop.py", line 457, in run_sync
return future_cell[0].result()
File "/Users/daveshawley/Source/python/some-consumer/env/lib/python2.7/site-packages/tornado/concurrent.py", line 237, in result
raise_exc_info(self._exc_info)
File "/Users/daveshawley/Source/python/some-consumer/env/lib/python2.7/site-packages/tornado/gen.py", line 1021, in run
yielded = self.gen.throw(*exc_info)
File "/Users/daveshawley/Source/python/some-consumer/tests/consumer_tests.py", line 74, in test_that_things_work
'id': '[email protected]'})
File "/Users/daveshawley/Source/python/some-consumer/env/lib/python2.7/site-packages/tornado/gen.py", line 1015, in run
value = future.result()
File "/Users/daveshawley/Source/python/some-consumer/env/lib/python2.7/site-packages/tornado/concurrent.py", line 237, in result
raise_exc_info(self._exc_info)
File "/Users/daveshawley/Source/python/some-consumer/env/lib/python2.7/site-packages/tornado/gen.py", line 1021, in run
yielded = self.gen.throw(*exc_info)
File "/Users/daveshawley/Source/python/some-consumer/env/lib/python2.7/site-packages/rejected/testing.py", line 182, in process_message
measurement)
File "/Users/daveshawley/Source/python/some-consumer/env/lib/python2.7/site-packages/tornado/gen.py", line 1015, in run
value = future.result()
File "/Users/daveshawley/Source/python/some-consumer/env/lib/python2.7/site-packages/tornado/concurrent.py", line 237, in result
raise_exc_info(self._exc_info)
File "/Users/daveshawley/Source/python/some-consumer/env/lib/python2.7/site-packages/tornado/gen.py", line 1024, in run
yielded = self.gen.send(value)
File "/Users/daveshawley/Source/python/some-consumer/env/lib/python2.7/site-packages/rejected/consumer.py", line 894, in execute
exc_info=exc_info)
File "/Users/daveshawley/Source/python/some-consumer/env/lib/python2.7/site-packages/rejected/consumer.py", line 922, in log_exception
if all(exc_info):
TypeError: 'NoneType' object is not iterable
You have example.py
inside a package (it is OK, though I haven't seen such examples, and usually I like to play with examples myself and it is a bit awkward to modify files in a package), and there are two example.py
in the documentation and README with little information on how to actually run it. There are also 3 versions of example.yaml
and again zero information about them.
It would be much better to have only one example, but covered from the very beginning:
example.yaml
+ example.py
or example/__init__.py
)example.yaml
, so your example with LOGGER.info()
would actually print anything out of the box (or release helper==2.4.2
)rejected -c example.yaml -p "$(pwd)"
- this is how I am running it now)It would be nice to see why a processing exception is happening when inspecting messages that are spinning on processing exceptions.
TypeError: 'NoneType' object is not iterable
File "bin/rejected", line 11, in <module>
sys.exit(main())
File "rejected/controller.py", line 150, in main
helper.start(Controller)
File "helper/__init__.py", line 174, in start
obj.start()
File "helper/controller.py", line 266, in start
self.run()
File "rejected/controller.py", line 100, in run
self._mcp = self._master_control_program()
File "rejected/controller.py", line 51, in _master_control_program
quantity=self.args.quantity)
File "rejected/mcp.py", line 65, in __init__
self.consumer_cfg = self.get_consumer_cfg(config, consumer, quantity)
File "rejected/mcp.py", line 249, in get_consumer_cfg
consumers = dict(config.application.Consumers)
https://rejected.readthedocs.io redirects to https://rejected.readthedocs.io/en/stable/ which is not rejected's documentation - the following is shown:
This impacts documentation present in the README.
https://rejected.readthedocs.io/en/latest/ is functional.
I came across the following error when running rejected==3.19.5
and pika==0.11.2
:
[2017-12-12 10:50:34,011 ERROR 21981 pika.callback] Calling <bound method Connection.on_channel_closed of <rejected.process.Connection object at 0x3aa7d50>> for "1:_on_channel_close" failed
Traceback (most recent call last):
File "/home/usrisg/python_envs/dsm-reporter/lib/python2.7/site-packages/pika/callback.py", line 236, in process
callback(*args, **keywords)
File "/home/usrisg/python_envs/dsm-reporter/lib/python2.7/site-packages/rejected/process.py", line 166, in on_channel_closed
self.handle.channel(self.on_channel_open)
File "/home/usrisg/python_envs/dsm-reporter/lib/python2.7/site-packages/pika/connection.py", line 1153, in channel
'Channel allocation requires an open connection: %s' % self)
ConnectionClosed: Channel allocation requires an open connection: <TornadoConnection CLOSED socket=None params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>
This happens when the process()
method of the consumer takes longer to run than the RabbitMQ heartbeat interval/timeout and the RabbitMQ server closes the connection to the consumer. In my specific case, a consumer makes a HTTP request that occasionally takes a long time to receive a response(4+ minutes). The consumer only notices the connection was reset/dropped once the process()
method finishes. This then triggers the on_channel_close()
callback which generates the exception above. The consumer gets stuck in an undefined state - the consumer process continues to run but never attempts to re-connect to RabbitMQ.
To reproduce:
consumer.yaml
time.sleep(60)
calls in the process()
methodHere is a log snippet preceding the exception above (after the process()
method finishes):
[2017-12-12 10:50:29,625 DEBUG 21981 pika.heartbeat] Sending heartbeat frame
[2017-12-12 10:50:29,625 ERROR 21981 pika.adapters.base_connection] Socket Error: 104
[2017-12-12 10:50:29,625 INFO 21981 pika.connection] Disconnected from RabbitMQ at localhost:5672 (-1): error(104, 'Connection reset by peer')
[2017-12-12 10:50:29,626 DEBUG 21981 pika.heartbeat] Removing timeout for next heartbeat interval
[2017-12-12 10:50:29,626 WARNING 21981 pika.adapters.base_connection] Connection is closed but not stopping IOLoop
[2017-12-12 10:50:29,626 DEBUG 21981 pika.channel] Handling meta-close on <Channel number=1 OPEN conn=<TornadoConnection CLOSED socket=None params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>>
[2017-12-12 10:50:29,627 DEBUG 21981 pika.callback] Processing 1:_on_channel_close
[2017-12-12 10:50:29,627 DEBUG 21981 pika.callback] Calling <bound method Connection.on_channel_closed of <rejected.process.Connection object at 0x3aa7d50>> for "1:_on_channel_close"
[2017-12-12 10:50:32,821 DEBUG 21981 rejected.process] Connection dsm-rabbitmq channel was closed: (-1) error(104, 'Connection reset by peer')
[2017-12-12 10:50:32,821 DEBUG 21981 rejected.state] State changing from Processing to Connecting
I was able to fix this with some minor changes applied against the 3.19.5 tag:
code-fabriek@d21c95a
It looks like there are some changes on master for the 4.x release so perhaps this is not an issue on current master (I haven't attempted to reproduce there).
Limit processing to N messages per
Lines 1044 to 1045 in 974c126
I think that the guarantee should be that if prepare
completes, then on_finish
should be called.
i use python3, but lib still ussing python2
rejected -c examples.yaml -p . -o sync
ERROR: Startup of rejected Failed
.Traceback (most recent call last):
File "/Library/Python/2.7/site-packages/helper/unix.py", line 91, in start
self._daemonize()
File "/Library/Python/2.7/site-packages/helper/unix.py", line 152, in _daemonize
if os.getuid() != self.uid:
File "/Library/Python/2.7/site-packages/helper/unix.py", line 139, in uid
self._uid = pwd.getpwnam(self.config.daemon.user).pw_uid
KeyError: 'getpwnam(): name not found: rejected'
Exception log: /var/log/rejected.errors
Once I send a message with text/csv
content type and any payload (compressed or plain), I'm getting the following traceback:
File "./app2/__init__.py", line 16, in process
print(self.body)
File "./env/lib/python3.5/site-packages/rejected/consumer.py", line 901, in body
self._message_body = self._load_csv_value(self._message_body)
File "./env/lib/python3.5/site-packages/rejected/consumer.py", line 954, in _load_csv_value
csv_buffer = io.StringIO(value)
TypeError: initial_value must be str or None, not bytes
I get a similar exception using application/json
content type:
File "./app2/__init__.py", line 16, in process
print(self.body)
File ".env/lib/python3.5/site-packages/rejected/consumer.py", line 889, in body
self._message_body = self._load_json_value(self._message_body)
File ".env/lib/python3.5/site-packages/rejected/consumer.py", line 968, in _load_json_value
return json.loads(value, encoding='utf-8')
File ".env/lib/python3.5/json/__init__.py", line 312, in loads
s.__class__.__name__))
At the same time, YAML, Pickle, msgpack, and plist work fine. I haven't tested BeautifulSoup.
System info:
Using a checkout of 'master' with Python 2.6.7 in a new virtualenv, I installed using setup.py, and simplejson was not installed by default. The native json module in 2.6.7 (or 3.3 for that matter) doesn't support the 'use_decimal' argument to json.loads, but it's required here: https://github.com/gmr/rejected/blob/master/rejected/consumer.py#L624
Installing simplejson made this issue go away, though it'd be nice to make this work with the json module, or add simplejson to the install_requires. This way people aren't stymied the minute they try out the examples! ;-)
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.