Giter Site home page Giter Site logo

fedora-infra / fedora-messaging Goto Github PK

View Code? Open in Web Editor NEW
49.0 11.0 49.0 1.48 MB

A library for sending AMQP messages with JSON schema in Fedora infrastructure

License: GNU General Public License v2.0

Python 99.72% Shell 0.20% Gherkin 0.08%
jsonschema fedora amqp pika

fedora-messaging's Introduction

Fedora Messaging

image

image

Documentation Status

Tests Status

image

This package provides tools and APIs to make using Fedora's messaging infrastructure easier. These include a framework for declaring message schemas, a set of synchronous APIs to publish messages to AMQP brokers, a set of asynchronous APIs to consume messages, and services to easily run consumers.

This library is designed to be a replacement for the PyZMQ-backed fedmsg library in Fedora Infrastructure.

To get started, check out our user guide.

Looking to contribute? We appreciate it! Check out the contributor guide.

fedora-messaging's People

Contributors

abompard avatar adamwill avatar bowlofeggs avatar cverna avatar dependabot[bot] avatar dvejmz avatar erolkskn avatar freedisch avatar frenzymadness avatar gridhead avatar hroncok avatar jeremycline avatar khaledachech avatar lenkaseg avatar michaelgugino avatar onur-ozkan avatar pypingou avatar renovate[bot] avatar ryanlerch avatar sebwoj avatar shraddhaag avatar stephencoady avatar tomastomecek avatar vidit-maheshwari avatar xsuchy avatar zlopez avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

fedora-messaging's Issues

Investigate implementing a email->FAS id schema utility

fedmsg_meta had a function to turn an email into a FAS account (https://github.com/fedora-infra/fedmsg_meta_fedora_infrastructure/blob/develop/fedmsg_meta_fedora_infrastructure/fasshim.py).

We definitely don't want to pull that function in as-is, but we will probably want to offer that functionality. In the short term we could recommend continuing to depend on fedora_meta for that utility function, but that's not ideal because it likely pulls in a huge number of dependencies.

CC: @AdamWill

Add support for amqp 1.0

In addition to AMQP 0.9.1 support, it would be great if fedora-messaging also had support for AMQP 1.0 protocol. This, however, is not supported by the underlying pika library. Instead, a different library would have to be used, e.g. python-qpid.

This is a tracker ticket for the previous conversations we've had.

Add a Sphinx autodoc generator

It would be nice if we had a Sphinx plugin for fedora-messaging that would allow Sphinx users to point at schema objects and have Sphinx RST docs automatically generated for their schemas. I'd love that for Bodhi as an example - this way I can write some code to define my schema and get documentation on that schema in my project for free.

I use something kinda like this for Bodhi's REST API, which might be something you can look at as an example of how someone else did something similar:

https://github.com/Cornices/cornice.ext.sphinx

I think this was originally @jeremycline's idea, so credit to him!

(Is it weird that I put the documentation tag on it? It's not related to fedora-messaging's own documentation, but it is related to documentation in general โ˜บ)

Twisted support

It would be nice to have Twisted support, to integrate Fedora Messaging with other network protocols and create bridges.

What's the timeline?

Hi, what's the timeline for Fedora Messaging? When can we expect it to be in prod? When can we start migrating from fedmsg to fedora-messaging?

I didn't find anything on Fedora wiki, hence asking here.

The version of toml in EL7 doesn't have lineno

When handling a TOML exception we try to be helpful with the error message, but the version in EL7 (0.9.4) doesn't have lineno. We should handle that gracefully:

self = <fedora_messaging.tests.unit.test_config.LoadTests testMethod=test_bad_config_file>, mock_exists = <MagicMock name='exists' id='140347696999760'>

    @mock.patch("fedora_messaging.config.open", mock.mock_open(read_data="Ni!"))
    @mock.patch("fedora_messaging.config.os.path.exists", return_value=True)
    def test_bad_config_file(self, mock_exists):
        """Assert an invalid TOML file raises a ConfigurationException."""
        with self.assertRaises(ConfigurationException) as cm:
>           msg_config.LazyConfig().load_config()

fedora_messaging/tests/unit/test_config.py:243:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = {}, config_path = '/etc/fedora-messaging/config.toml'

    def load_config(self, config_path=None):
        """
        Load application configuration from a file and merge it with the default
        configuration.
    
        If the ``FEDORA_MESSAGING_CONF`` environment variable is set to a
        filesystem path, the configuration will be loaded from that location.
        Otherwise, the path defaults to ``/etc/fedora-messaging/config.toml``.
        """
        self.loaded = True
        config = copy.deepcopy(DEFAULTS)
    
        if config_path is None:
            if "FEDORA_MESSAGING_CONF" in os.environ:
                config_path = os.environ["FEDORA_MESSAGING_CONF"]
            else:
                config_path = "/etc/fedora-messaging/config.toml"
    
        if os.path.exists(config_path):
            _log.info("Loading configuration from {}".format(config_path))
            with open(config_path) as fd:
                try:
                    file_config = toml.load(fd)
                    for key in file_config:
                        config[key.lower()] = file_config[key]
                except toml.TomlDecodeError as e:
                    msg = "Failed to parse {}: error at line {}, column {}: {}".format(
>                       config_path, e.lineno, e.colno, e.msg
                    )
E                   AttributeError: 'TomlDecodeError' object has no attribute 'lineno'

fedora_messaging/config.py:491: AttributeError

--terse support

Right now the default/docs version of the public consumer just prints out all of each message to stdout. This is nice, but it would be nice if there was something like fedmsg --terse where it just prints out a small subset to give you an idea of the message. This way you can see a lot more info, and can actually run it to watch what all is going on in fedora.

~ fedmsg-tail-3 --terse
[2019-04-11 17:32:02][fedmsg.crypto.utils INFO] Downloading x509 certificate from https://fedoraproject.org/fedmsg/ca.crt
[2019-04-11 17:32:03][fedmsg.crypto.utils INFO] Downloading x509 certificate from https://fedoraproject.org/fedmsg/crl.pem
[2019-04-11 17:32:03][fedmsg.commands INFO]
logger.odcs.internal.msg --
[2019-04-11 17:32:03][fedmsg.commands INFO]
github.pull_request_review -- xiexingguo approved the changes on PR #27523 on ceph/ceph ceph/ceph#27523 (review)
[2019-04-11 17:32:25][fedmsg.commands INFO]
buildsys.task.state.change -- koschei's scratch build of cinnamon-session-4.0.0-2.fc30.src.rpm for f31 started http://koji.fedoraproject.org/koji/taskinfo?taskID=34120079

sent-at header should include timezone information

The sent-at header doesn't currently include timezone information and it's not a UTC timestamp:

Python 3.6.6 (default, Jul 19 2018, 14:25:17) 
[GCC 8.1.1 20180712 (Red Hat 8.1.1-5)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from fedora_messaging import api
>>> print(api.Message())
Id: b3fcf75f-d48a-4cd5-a6a2-62a522f28d01
Topic: 
Headers: {
    "fedora_messaging_schema": "fedora_messaging.message:Message",
    "sent-at": "2018-08-22T12:38:55"
}
Body: {}
>>> 

This should be a UTC timestamp with TZ info attached for clarity, or it should be seconds since the epoch (in UTC) for machine-friendliness.

Improve settings for client certificate and key

I looked into how to set up pika with a client certificate and key, and it's not super user-friendly: https://pika.readthedocs.io/en/latest/examples/using_urlparameters.html#example-connection-urls shows how to create the ssl_options.

We may want to just add a setting in the config for the dictionary you'd normally provide to the URL and generate the proper url-encoded version. Or, since it's just set in stage/prod environments, we just document the process in an SOP. Thoughts?

Document how to cleanly shutdown as a consumer

I noticed we haven't documented how to properly stop consuming from within a consumer callback in the consumer docs. One thing I'm not sure about: should the ack requeue the message or not?

My first thought is to add a requeue kwarg to HaltConsumer so consumers can specify: raise HaltConsumer(requeue=True). Then the handler for HaltConsumer re-queues (or not). Thoughts?

Providing an invalid exchange to fedora-messaging consume doesn't crash

Running fedora-messaging consume --callback fedora_messaging.example:printer --exchange this_does_not_exist fails because the exchange doesn't exist and the consumer doesn't declare the exchange, but it hangs indefinitely rather than retrying or exiting.

Since the consumer probably shouldn't concern itself with the configuration of the exchange its binding to in most scenarios, we should exit with an error code and a message about the exchange not existing.

Improve performance when declaring large number of bindings with Twisted

The background for this is I'm playing with a notifications service and I'm making a couple queues per user. At the moment this takes a couple minutes for ~4000 queues and around 10 bindings per queue. We could increase efficiency by only declaring an exchange and queue once at:

for binding in self.factory.bindings:
yield self._channel.exchange_declare(
exchange=binding["exchange"], exchange_type="topic", durable=True
)
result = yield self._channel.queue_declare(
queue=binding["queue_name"],
durable=True,
auto_delete=binding.get("queue_auto_delete", False),
arguments=binding.get("queue_arguments"),
)
queue_name = result.method.queue
yield self._channel.queue_bind(
queue=queue_name,
exchange=binding["exchange"],
routing_key=binding["routing_key"],

This involves creating a de-duplicated list of exchanges, queues, and bindings and then declaring them all in that order. Using sets would be a good choice, but there's a small hitch: dictionaries aren't hashable and the "arguments" on queues and bindings are dictionaries. They could be turned into tuples since they aren't nested, though, and turned back into dictionaries when the object is declared.

The advantage here is that we don't slam the server with thousands of unnecessary exchange/queue declarations (and we don't wait on those declarations).

trigger tool

fedmsg has a tool called 'fedmsg-trigger'. It lets you easily do a one command line type thing to run a command/script when a specific message comes accross the bus.

It would be cool if there was something like this for fedora-messaging too.

"--amqp-url" parameter unused

In cli, parameter "--amqp-url" is not used.
User is not able to pass "amqp-url" to _session.ConsumerSession(except config file)
Also, impossible to pass "amqp-url" via api.publish to _session.PublisherSession(except config file)

Support passive declares

On a locked-down server like what we have in the infrastructure, we must be able do use passive=True when declaring queues and exchanges.

TypeError with Twisted backend on halt

Testing out client authentication with the Twisted backend and fedmsg-migration-tools, I saw this with pika==0.12:

^C2018-08-21 14:19:22-0400 [-] Received SIGINT, shutting down.
2018-08-21 14:19:22-0400 [-] [verify_missing WARNING] Lost connection. Reason: Connection to the other side was lost in a non-clean fashion: Connection lost.
2018-08-21 14:19:22-0400 [FedoraMessaging:Factory] Lost connection. Reason: Connection to the other side was lost in a non-clean fashion: Connection lost.
2018-08-21 14:19:22-0400 [FedoraMessagingProtocol (TLSMemoryBIOProtocol),client] <twisted.internet.tcp.Connector object at 0x7f2b4f9279b0> will retry in 2 seconds
2018-08-21 14:19:22-0400 [-] Stopping factory <fedora_messaging.twisted.factory.FedoraMessagingFactory object at 0x7f2b4f9277b8>
2018-08-21 14:19:22-0400 [-] Main loop terminated.
2018-08-21 14:19:22-0400 [-] [verify_missing CRITICAL] Unhandled error in Deferred:
2018-08-21 14:19:22-0400 [-] Unhandled error in Deferred:
2018-08-21 14:19:22-0400 [-] [verify_missing CRITICAL] 
2018-08-21 14:19:22-0400 [-] Traceback (most recent call last):
2018-08-21 14:19:22-0400 [-]   File "/home/jcline/devel/fedora-messaging/fedora_messaging/twisted/factory.py", line 138, in stopFactory
2018-08-21 14:19:22-0400 [-]     yield self.client.stopProducing()
2018-08-21 14:19:22-0400 [-]   File "/home/jcline/devel/fedora-messaging/fedora_messaging/twisted/protocol.py", line 297, in stopProducing
2018-08-21 14:19:22-0400 [-]     yield self.pauseProducing()
2018-08-21 14:19:22-0400 [-]   File "/home/jcline/devel/fedora-messaging/fedora_messaging/twisted/protocol.py", line 282, in pauseProducing
2018-08-21 14:19:22-0400 [-]     yield self._channel.basic_cancel(consumer_tag)
2018-08-21 14:19:22-0400 [-]   File "/home/jcline/.virtualenvs/fm/lib/python3.6/site-packages/pika/adapters/twisted_connection.py", line 169, in wrapped
2018-08-21 14:19:22-0400 [-]     method(*args, **kwargs)
2018-08-21 14:19:22-0400 [-] TypeError: basic_cancel() got multiple values for argument 'callback'
2018-08-21 14:19:22-0400 [-] Unhandled Error
        Traceback (most recent call last):
          File "/home/jcline/.virtualenvs/fm/lib/python3.6/site-packages/twisted/internet/defer.py", line 1613, in unwindGenerator
            return _cancellableInlineCallbacks(gen)
          File "/home/jcline/.virtualenvs/fm/lib/python3.6/site-packages/twisted/internet/defer.py", line 1529, in _cancellableInlineCallbacks
            _inlineCallbacks(None, g, status)
          File "/home/jcline/.virtualenvs/fm/lib/python3.6/site-packages/twisted/internet/defer.py", line 1418, in _inlineCallbacks
            result = g.send(result)
          File "/home/jcline/devel/fedora-messaging/fedora_messaging/twisted/protocol.py", line 282, in pauseProducing
            yield self._channel.basic_cancel(consumer_tag)
        --- <exception caught here> ---
          File "/home/jcline/devel/fedora-messaging/fedora_messaging/twisted/factory.py", line 138, in stopFactory
            yield self.client.stopProducing()
          File "/home/jcline/devel/fedora-messaging/fedora_messaging/twisted/protocol.py", line 297, in stopProducing
            yield self.pauseProducing()
          File "/home/jcline/devel/fedora-messaging/fedora_messaging/twisted/protocol.py", line 282, in pauseProducing
            yield self._channel.basic_cancel(consumer_tag)
          File "/home/jcline/.virtualenvs/fm/lib/python3.6/site-packages/pika/adapters/twisted_connection.py", line 169, in wrapped
            method(*args, **kwargs)
        builtins.TypeError: basic_cancel() got multiple values for argument 'callback'

With pika=master (1.0.0b1):

^C2018-08-21 14:23:12-0400 [-] Received SIGINT, shutting down.
2018-08-21 14:23:12-0400 [-] [pika.adapters.twisted_connection ERROR] connection_lost: ConnectionLost('Connection lost',)                                                                            
2018-08-21 14:23:12-0400 [-] [verify_missing CRITICAL] 'Failed getting the next message in the queue, stopping.'                                                                                     
2018-08-21 14:23:12-0400 [FedoraMessaging:Protocol] 'Failed getting the next message in the queue, stopping.'                                                                                        
2018-08-21 14:23:12-0400 [-] [verify_missing CRITICAL] Unhandled Error
2018-08-21 14:23:12-0400 [-] Traceback (most recent call last):                                                                                                                                      
2018-08-21 14:23:12-0400 [-]   File "/home/jcline/.virtualenvs/fm/lib/python3.6/site-packages/twisted/internet/defer.py", line 1475, in gotResult                                                    
2018-08-21 14:23:12-0400 [-]     _inlineCallbacks(r, g, status)                                                                                                                                      
2018-08-21 14:23:12-0400 [-]   File "/home/jcline/.virtualenvs/fm/lib/python3.6/site-packages/twisted/internet/defer.py", line 1416, in _inlineCallbacks                                             
2018-08-21 14:23:12-0400 [-]     result = result.throwExceptionIntoGenerator(g)
2018-08-21 14:23:12-0400 [-]   File "/home/jcline/.virtualenvs/fm/lib/python3.6/site-packages/twisted/python/failure.py", line 491, in throwExceptionIntoGenerator                                   
2018-08-21 14:23:12-0400 [-]     return g.throw(self.type, self.value, self.tb)
2018-08-21 14:23:12-0400 [-]   File "/home/jcline/devel/fedora-messaging/fedora_messaging/twisted/protocol.py", line 148, in _read                                                                   
2018-08-21 14:23:12-0400 [-]     log.err(e, system=self.name)
2018-08-21 14:23:12-0400 [-] --- <exception caught here> ---                                                                                                                                         
2018-08-21 14:23:12-0400 [-]   File "/home/jcline/devel/fedora-messaging/fedora_messaging/twisted/protocol.py", line 132, in _read                                                                   
2018-08-21 14:23:12-0400 [-]     yield queue_object.get()                                                                                                                                            
2018-08-21 14:23:12-0400 [-] twisted.internet.error.ConnectionLost: Connection to the other side was lost in a non-clean fashion: Connection lost.                                                   
2018-08-21 14:23:12-0400 [-]
2018-08-21 14:23:12-0400 [FedoraMessaging:Protocol] Unhandled Error                                                                                                                                  
        Traceback (most recent call last):
          File "/home/jcline/.virtualenvs/fm/lib/python3.6/site-packages/twisted/internet/defer.py", line 1475, in gotResult                                                                         
            _inlineCallbacks(r, g, status)
          File "/home/jcline/.virtualenvs/fm/lib/python3.6/site-packages/twisted/internet/defer.py", line 1416, in _inlineCallbacks                                                                  
            result = result.throwExceptionIntoGenerator(g)
          File "/home/jcline/.virtualenvs/fm/lib/python3.6/site-packages/twisted/python/failure.py", line 491, in throwExceptionIntoGenerator                                                        
            return g.throw(self.type, self.value, self.tb)
          File "/home/jcline/devel/fedora-messaging/fedora_messaging/twisted/protocol.py", line 148, in _read                                                                                        
            log.err(e, system=self.name)
        --- <exception caught here> ---                                                                                                                                                              
          File "/home/jcline/devel/fedora-messaging/fedora_messaging/twisted/protocol.py", line 132, in _read                                                                                        
            yield queue_object.get()
        twisted.internet.error.ConnectionLost: Connection to the other side was lost in a non-clean fashion: Connection lost.
2018-08-21 14:23:12-0400 [FedoraMessaging:Factory] Lost connection. Reason: Connection to the other side was lost in a non-clean fashion: Connection lost.
2018-08-21 14:23:12-0400 [FedoraMessagingProtocol (TLSMemoryBIOProtocol),client] <twisted.internet.tcp.Connector object at 0x7fe45d75ae80> will retry in 2 seconds
2018-08-21 14:23:12-0400 [-] Stopping factory <fedora_messaging.twisted.factory.FedoraMessagingFactory object at 0x7fe45d75acc0>
2018-08-21 14:23:12-0400 [-] Main loop terminated.
2018-08-21 14:23:12-0400 [-] [verify_missing CRITICAL] Unhandled error in Deferred:
2018-08-21 14:23:12-0400 [-] Unhandled error in Deferred:
2018-08-21 14:23:12-0400 [-] [verify_missing CRITICAL] 
2018-08-21 14:23:12-0400 [-] Traceback (most recent call last):
2018-08-21 14:23:12-0400 [-]   File "/home/jcline/devel/fedora-messaging/fedora_messaging/twisted/factory.py", line 138, in stopFactory
2018-08-21 14:23:12-0400 [-]     yield self.client.stopProducing()
2018-08-21 14:23:12-0400 [-]   File "/home/jcline/devel/fedora-messaging/fedora_messaging/twisted/protocol.py", line 297, in stopProducing
2018-08-21 14:23:12-0400 [-]     yield self.pauseProducing()
2018-08-21 14:23:12-0400 [-]   File "/home/jcline/.virtualenvs/fm/lib/python3.6/site-packages/twisted/internet/defer.py", line 1418, in _inlineCallbacks
2018-08-21 14:23:12-0400 [-]     result = g.send(result)
2018-08-21 14:23:12-0400 [-]   File "/home/jcline/devel/fedora-messaging/fedora_messaging/twisted/protocol.py", line 281, in pauseProducing
2018-08-21 14:23:12-0400 [-]     for consumer_tag in self._channel.consumer_tags:
2018-08-21 14:23:12-0400 [-] AttributeError: 'TwistedChannel' object has no attribute 'consumer_tags'
2018-08-21 14:23:12-0400 [-] Unhandled Error
        Traceback (most recent call last):
          File "/home/jcline/.virtualenvs/fm/lib/python3.6/site-packages/twisted/internet/defer.py", line 1418, in _inlineCallbacks
            result = g.send(result)
          File "/home/jcline/devel/fedora-messaging/fedora_messaging/twisted/protocol.py", line 297, in stopProducing
            yield self.pauseProducing()
          File "/home/jcline/.virtualenvs/fm/lib/python3.6/site-packages/twisted/internet/defer.py", line 1613, in unwindGenerator
            return _cancellableInlineCallbacks(gen)
          File "/home/jcline/.virtualenvs/fm/lib/python3.6/site-packages/twisted/internet/defer.py", line 1529, in _cancellableInlineCallbacks
            _inlineCallbacks(None, g, status)
        --- <exception caught here> ---
          File "/home/jcline/devel/fedora-messaging/fedora_messaging/twisted/factory.py", line 138, in stopFactory
            yield self.client.stopProducing()
          File "/home/jcline/devel/fedora-messaging/fedora_messaging/twisted/protocol.py", line 297, in stopProducing
            yield self.pauseProducing()
          File "/home/jcline/.virtualenvs/fm/lib/python3.6/site-packages/twisted/internet/defer.py", line 1418, in _inlineCallbacks
            result = g.send(result)
          File "/home/jcline/devel/fedora-messaging/fedora_messaging/twisted/protocol.py", line 281, in pauseProducing
            for consumer_tag in self._channel.consumer_tags:
        builtins.AttributeError: 'TwistedChannel' object has no attribute 'consumer_tags'

@abompard my guess is the first traceback is something you fixed in your pika work? The second one looks like an API issue on our end, but I haven't looked at the code at all yet.

edit2: There was more traceback for the 1.0.0b1 case, it looks like we're not gracefully handling SIGINT (at least for fedmsg-migration-tools).

Porting from fedmsg to fedora-messaging is not smooth

I am trying to migrate our project from fedmsg to fedora-messaging.

Right now I'm stuck on the config file:

  • while looking at the one provided by fedora-messaging rpm, it feels like it's just a placeholder and not meant to be used for production
  • there is a ton of values prefixed with my_ while some seem to have real values
  • amqp_url is not filled out -- how do I figure out the prod and stage instances?
  • when doing api.consume by default, it connects to localhost -- not very useful

So, what are the best practices when it comes to configuration?

I did also try the fedora-messaing executable:

$ fedora-messaging consume --routing-key org.fedoraproject.prod.github.pull_request.*
Error: You must define all three of exchange, queue_name and routing_key, or none of them to use the configuration

Ehm, the --help says there are default values. Okay then:

$ fedora-messaging consume --routing-key org.fedoraproject.prod.github.pull_request.* --exchange amq.topic --queue-name asdqwe123123
Error: Failed to import the callback module (No module named 'fedora_messaging.examples')

:(

api.consume against fedora endpoint hits permissions issue

Can I get a very simple api.consume example that works with fedora's endpoint? I can get the cli to work with the default callback using the docs here but I can't seem to get a simple api.consume() call working (not using twisted_consume) with fedora-messaging-1.6.0-1.fc30.noarch python3-fedora-messaging-1.6.0-1.fc30.noarch

Existing bindings are not removed

Existing bindings on the broker that have been removed from the configuration file are not deleted.
On the one hand it feels like they should be, to allow users to remove erroneous bindings without asking their broker admin. On the other hand it allows serverside declaration of bindings and just an empty [[bindings]] section in the configuration file, which avoids repeating oneself.

I still think we should remove them, the configuration already duplicates the server declarations wrt queues anyway.

Allow users to provide a filesystem path to a Python file for callbacks

Currently, the callback must be in the PYTHONPATH and is specified by "package.module:function" in the configuration. This is convenient when you already have the function in the Python path and the callback is part of a bigger project.

However, for one-off scripts it's more convenient to provide a path to the file (some/python/file.py:function) than it is to make a Python package for the callback alone.

We should add support to the consume CLI to accept a filesystem path as well as a Python path. It'll need to differentiate the two types. If a relative path is given to a file in the current directory (e.g. "callback.py:my_function") we can't tell the difference between the PYTHONPATH variety and the filesystem path, so I think we should always default to first trying the PYTHONPATH, then falling back to interpreting it as a filesystem path, then failing.

Attempt to get not existing pika.exceptions.ChannelClosed attributes

In _session.ConsumerSession._on_channel_close, we are trying to get:
if isinstance(reply_code_or_reason, pika_errs.ChannelClosed):
reply_code = reply_code_or_reason.reply_code
reply_text = reply_code_or_reason.reply_text

I think that attrs reply_code and reply_text are not existing in object ChannelClosed.

Add accessors to the Message class

Fedmsg has a lot of accessors available for consumers, let's discuss what we want to keep.

Here's what I found in fedmsg-meta, and my comments:

  • title: replaced by summary
  • subtitle: replaced by __str__
  • subjective: it hasn't really been used, let's drop it
  • long_form (paragraphs of text about a message): it would be useful for email notifications
  • lexer (pygments lexer that can be applied to the long_form): not sure we want that, it's only implemented for github and scm messages
  • link: that would be useful I think (maybe renamed as url? I don't find "link" very clear)
  • icon and secondary_icon: those are usually the app icon and the agent's avatar. They are used, so I think we should keep them, but rename them to app_icon and agent_avatar just to be clearer. (see fedmsg_meta_fedora_infrastructure.fasshim).
  • usernames (a set of FAS usernames associated with the message): this would be necessary for FMN
  • packages (a set of package names associated with a message): this would be necessary for FMN
  • objects: unclear, we should drop it and leave it to publishers to provide a precise accessor in their API.
  • emails: we can leave that to publishers and not make it standard
  • avatars (dict of avatar URLs associated with a message): unused, drop it

provide easy onramp for users of `fedmsg.tail_messages()` users

I think one of the easiest (though probably inefficient) ways to consume fedmsg was to just run fedmsg.tail_messages().

#!/usr/bin/python3
import fedmsg

# Set fedmsg logging to not print warnings
import logging
logger = logging.getLogger('fedmsg')
logger.setLevel(logging.ERROR)

def main():

    # Grab messages from fedmsg and process them as we go
    print("Starting listening for fedmsgs..")
    for name, endpoint, topic, msg in fedmsg.tail_messages():
        print(topic)

if __name__ == '__main__':
    main()

With this simple read/only operation one can trigger on pretty much any fedmsg and react to changes in Fedora. I think it will ease the transition to fedora messaging if we provide a very similar way for people to consume fedora-messaging. Here is an example where another user is using tail_messages().

Right now it is recommended that people use the CLI to invoke fedora messaging using a command like fedora-messaging consume --callback=fedora_messaging.example:printer. This works but it's a bit awkward to change a script that was previously being called directly to now be imported and be called by something else. For example if previously I was telling people to call my script (foo.py) in order to run this service now I have to tell them to copy the file into the correct location and call fedora-messaging consume --callback=foo.example:do-something.

It would be great if we could ease the transition by doing a few things:

  • make the shipped config connect to fedora servers by default

It would be nice if I didn't have to cp /etc/fedora-messaging/fedora.toml /etc/fedora-messaging/config.toml. I also didn't do the sed step from https://fedora-messaging.readthedocs.io/en/latest/fedora-broker.html#getting-connected but I imagine if that is valuable if we automate it would make it where not everyone would use the 00000000-0000-0000-0000-000000000000 queue.

  • provide a nice alternative to fedmsg.tail_messages() to be used as an almost drop in replacement

I think I should be able to do this with something like this (note it does still use a callback so not quite the same, but close):

#!/usr/bin/python3
from fedora_messaging import config, api, exceptions
import logging

# Set local logging 
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)


def process_message(msg):
    print(msg.topic)

def main():
    config.conf.setup_logging()
    # Start consuming messages using our callback. This call will block until
    # a KeyboardInterrupt is raised, or the process receives a SIGINT or SIGTERM
    # signal.
    api.consume(process_message)

if __name__ == '__main__':
    main()

but that currently is hitting a permissions issue and it also requires us to set up logging config.conf.setup_logging() and I think it's missing setting up signal handling.

so maybe we can create a new function in the api that will be a convenience (basically the same thing as the cli.consume()) function, but able to be called from within python like api.consume() can be.

Recursion error without certificate

Issue
When not providing SSL certificate to application it throws error after reaching maximum recursion on SSL validation.

The [tls] section in config.toml looks like this:

[tls]
ca_cert = ""
keyfile = ""
certfile = ""

Log
the-new-hotness-23-lqmqr.log

Version
fedora-messaging-1.3.0-1.fc29.noarch

Provide a set of APIs for users outside Fedora infrastructure

#111 shows we don't have a good path forward for users who are running production consumers outside Fedora infrastructure. The reason for this is that allowing unauthenticated access to the AMQP broker is a recipe for DoS attacks and the broker is a core piece of infrastructure (especially if/when other apps start using it for tasking systems in addition to general pub/sub).

The best option at the moment is to just keep using fedmsg, relying on the fact that bridges exist between AMQP and ZMQ. While this works, it's less than ideal for several reasons:

  • No schema validation, Python APIs to interact with messages

  • The interface you use to implement consumers depends on whether or not your service is inside Fedora infrastructure, and you have to change depending on where your service runs.

  • fedmsg doesn't separate message headers from the message body, so bridging involves mangling messages. This means consumers outside have a different format to deal with which won't be documented (although it will be the same as the fedmsg messages sent today).

It would probably be best to create some tools so that the consumers outside infra can use the same interfaces. We would need:

  • A documented wire format for ZMQ that can represent the AMQP messages. This would likely be a 3 part message in the format [<amqp topic>, <amqp headers>, <amqp body>] although we may want to include additional properties here.

  • Something to publish the messages via ZMQ. This would be nearly identical to the AMQP->ZMQ bridge we have today, but it would use the wire format from above rather than the one fedmsg uses. It should also use the ZMQ authenticated sockets (server side only) rather than message signatures.

  • An API that is identical to the current consume call for AMQP that connects to ZMQ instead. The ack/nack interface would be a no-op since it's using ZMQ pub/sub sockets, but ideally everything else is the same (message validation, creating the Python message objects, etc).

This way you can use the same consumer callback everywhere, Fedora infra doesn't have to expose the broker to the world, and consumers outside infra get the same level of reliability they've always had with fedmsg (no guarantees about message delivery).

@abompard, what do you think of this? @TomasTomecek, while the nitty-gritty details might not interest you, how does the overall proposal sound?

Bodhi's tests take a very long time with 1.6.1

Greetings!

I recently noticed that Bodhi's tests take a very long time to run with 1.6.1, but run quickly with 1.6.0. I noticed this because Bodhi's pip and Fedora 28 tests install the package from PyPI, but the F29 and F30 tests install the package from RPM (which still seems to have 1.6.0).

This is an example test that seems to take a long time:

https://github.com/fedora-infra/bodhi/blob/5c4cc813979fba7df33e95859d2a9b2a88d0010c/bodhi/tests/server/services/test_updates.py#L182-L202

With gdb I can see that it is epolling on a socket. I wonder if the mock in there isn't fully mocking something perhaps?

I haven't spent a lot of time digging into this, so it's possible that it is actually a problem on my end, but since it seems to only happen in 1.6.1 I thought it might be worth reporting here.

RPM packaging

There should be a spec file to package this on the currently supported Fedora versions, as well as CentOS 7 and 8.

Write integration tests for server-side connection closing

#78 made some adjustments that should have broken tests for pika-1.0.0 - the on_close callback passes an exception in 1.0.0 and a plain integer code in 0.12.

It'd be good to have integration tests that tested how clients handle the server shutting down connections. This is possible from the CLI by running rabbitmqctl close_connection <connectionpid> <reason>, but this needs to be done as a user who has access to the cluster. Alternatively, there may be a REST API call for this, so we could possibly do that with the default guest user. So, before we write the tests we need:

  • Documentation on how to set up a local RabbitMQ for testing. At the moment that just means installing it and starting it, but for this test you also need to give yourself access to use rabbitmqctl or enable and get access to the REST API. Figure out what the best approach is and document it.

Once we've written the tests, we need to make sure they:

  • Work locally as an unprivileged user

  • Work in the CI environment

The tests themselves should at least:

  • Start a publish session (by calling the publish API once), have the server cancel the connection, and then publish again (which should work).

  • Start a consumer session (by calling the consume API in a thread, see other integration tests), have the server cancel the connection with a 200 code, and then make sure the consumer restarts the connection correctly.

  • Start a consumer session, have the server cancel the connection with a non-200 error code, make sure the consumer exits with a non-zero exit code.

Add a configurable way to prefix the topic

For compatibility with fedmsg, it would be useful to be able to set a topic prefix in the configuration. Something like org.fedoraproject.prod (or staging).

The existing code does not deal with that by itself since the topic is assembled by fedmsg, and the running code does not know much about the environment it's running in, so the configuration seems like the right place.

The topic_prefix is added to incoming messages

The topic_prefix is added in the Message constructor, as a result it is also added when messages are received from the broker. Since topic_prefix is only intended for outgoing messages, this should be fixed.

Crash using Twisted and pika == 0.12

With pika == 0.12 I get the following:

[verify_missing CRITICAL] Unhandled error in Deferred:
[verify_missing CRITICAL] 
Traceback (most recent call last):
  File "/home/abompard/.virtualenvs/fedora-messaging/lib/python3.6/site-packages/twisted/internet/defer.py", line 1418, in _inlineCallbacks
    result = g.send(result)
  File "/home/abompard/Fedora/fedmsg/fedora-messaging/fedora_messaging/twisted/protocol.py", line 119, in _allocate_channel
    defer.returnValue(channel)
  File "/home/abompard/.virtualenvs/fedora-messaging/lib/python3.6/site-packages/twisted/internet/defer.py", line 1362, in returnValue
    raise _DefGen_Return(val)
twisted.internet.defer._DefGen_Return: <pika.adapters.twisted_connection.TwistedChannel object at 0x7efc714a42b0>

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/abompard/Fedora/fedmsg/fedora-messaging/fedora_messaging/twisted/factory.py", line 110, in _on_client_ready
    yield self.client.bind_queues(self.bindings)
  File "/home/abompard/Fedora/fedmsg/fedora-messaging/fedora_messaging/twisted/protocol.py", line 492, in bind_queues
    yield channel.queue_bind(queue, exchange, **b)
  File "/home/abompard/.virtualenvs/fedora-messaging/lib/python3.6/site-packages/pika-0.12.0-py3.6.egg/pika/adapters/twisted_connection.py", line 169, in wrapped
    method(*args, **kwargs)
TypeError: queue_bind() got multiple values for argument 'callback'

This is caused by pika changing the order of arguments in this method between 0.12 and 1.0.

Add the metadata for FMN in the headers

The new FMN will leverage RabbitMQ's header exchange feature to distribute relevant message in user-specific queues, that would then be distributed by IRC or email.

To use the header exchange, we need the following data in the headers:

  • RPM package name that is affected by the action
  • FAS username(s) of the user(s) who is/are affected by the action

We could add the username who did the action, but I'm not sure that's really useful. Why would you want to be notified of actions you take, after all? I could be useful for mentoring, though.

The header exchange in RabbitMQ only does string matching, and there can be multiple users affected by a message. A header won't be matched if it's a list, and there can't be multiple headers with the same key.
It does not appear that we'll be able to entirely rely on RabbitMQ's header routing, and avoid a custom daemon. Am I missing something?

In any case, the base Message class must have 2 accessors, package and affected_usernames (names to be debated) that would return None and an empty list by default, and that publishers can override.

Pika 0.13

Pika 0.13 is out with the pre-1.0 API, and the version checks in the code only test for 0.12.

Gracefully handle long-running consumer tasks

At the moment, our consumer API does not handle callbacks that take significantly longer than the AMQP heartbeat interval.

To reproduce this, use a vanilla RabbitMQ broker on Fedora with fedora-messaging, the heartbeat is 60s, and run this callback with fedora-messaging consume:

import time

def bad_consumer(message):
    """ A consumer that misbehaves"""
    time.sleep(300)
    print(message)

This will exit with:

[pika.adapters.base_connection ERROR] Socket Error: 104                                                                                                                                                                                      
[fedora_messaging._session INFO] Channel <Channel number=1 CLOSED conn=<SelectConnection CLOSED socket=None params=<URLParameters host=localhost port=5672 virtual_host=/ ssl=False>>> closed (-1): ConnectionResetError(104, 'Connection rese
t by peer')                                                                                                                
[fedora_messaging._session WARNING] Connection to localhost closed unexpectedly (-1): ConnectionResetError(104, 'Connection reset by peer')

The problem is this:

  1. Long-running consumer callback blocks the pika event loop, so no heartbeating can occur.

  2. Connection times out and is killed by the broker.

  3. The message(s) the consumer was processing are re-queued by the broker as they were not acknowledged by the consumer (it acknowledges the messages once the consumer returns)

  4. The consumer does not gracefully restart the connection, but that's a minor problem because...

  5. When the consumer reconnects, it gets the message it was processing again.

Now, it's less than ideal to have consumer callbacks block for ages, but I suspect there are many such consumers in Fedora (the masher I think? @bowlofeggs would know) so I think we need to handle this better.

The basic problem is we need to heartbeat at the same time the message callback is running. That means either a) consumer callbacks need to do asynchronous IO (and have scheduling points if they're doing long CPU-intensive tasks) or b) the consumer callback runs in a separate thread.

The most appealing way forward for me is to back the consumer API with the Twisted client we already have, but call the consumer callback with the reactor.deferToThread API to run the message in a thread. This lets the Twisted event loop heartbeat while it waits on the thread to finish, and the callback can use synchronous APIs all it wants.

We need to be careful since consumer callbacks have not historically needed to be thread-safe, so handling one message at a time seems safest. This means using whatever the twisted equivalent of https://docs.python.org/3/library/asyncio-sync.html#asyncio.Lock is.

Another consideration is to make sure we still handle OS signals gracefully.

There is one last tricky issue here, which is that Twisted's reactor (AFAIK) can only be started once, which means we're going to have problems if callbacks raise a HaltException and then the caller tries to call fedora_messaging.api.consume later. Perhaps https://github.com/itamarst/crochet is the best way to handle that.

Generate a JSON schema for me

Greetings!

I've been working on writing JSON schemas for Bodhi's messages the past few days. It's pretty rad, but I had the thought that it might be a nice convenience if fedora-messaging had the ability to generate these for me based on Python classes. I don't know exactly how I would recommend making it work, but I think it could be cool if we require Python 3 for this feature so we can use some cool Python 3 things to help out:

  1. Type annotations! This way we can know the types of the items to fill in the schema in a "Pythonic" way.
  2. We could use typing.NamedTuple - this is like a really fancy Enum that supports storing data with type annotations and behaviors.

Anyways, I'm quite fuzzy on the details, but I imagine some code that can introspect a class and spit out a JSON schema, and all fedora-messaging API users have to do is write nice Python classes and not worry about how JSON schema works.

Note: We would need some way to differentiate between required fields and non-required fields to fill out the required section on objects in the JSON schema. Perhaps just a _required = ['a', 'b', 'c'] on the class could do that for us?

Two different value for severity in one Message object

It is possible to set 2 different values for severity in object of class Message.
In Message constructor, we can set severity via:

  • constructor argument "severity"
  • constructor argument "properties" (properties->headers->severity)
    Which of them should be source of truth?

Expand the test matrix

At the moment tox is set up to run unit tests for Python 2.7, 3.4, 3.5, 3.6, and 3.7. Integration tests are run against 2.7 and 3.6. It'd be nice to run them with all version of Python, and add some pika versions into the mix as well. It'll increase the run time of Travis, but running the tests locally is very fast and super easy, so I think that's okay.

The matrix would be Python Versions X Unit tests and Integration tests X Pika versions.

  • Add integration testing for 3.4, 3.5, and 3.7

  • Run unit tests for all Python versions against pika 0.12 and the current master branch

  • Run integration tests for all Python versions against pika 0.12 and the current master branch

pytoml is deprecated

fedora-messaging is using pytoml to parse the configuration file but the project is not maintained any more and deprecated (see readme ) and should be replaced by the toml lib

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.