Giter Site home page Giter Site logo

missive's People

Contributors

calpaterson avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

missive's Issues

Double acking - allowed or not?

Description

In the RabbitMQAdapter, double acking currently raises a Kombu-level exception

Steps to reproduce

  1. Ack a message once
  2. Ack same message again

Expected result

Not sure? Warning? Missive-level exception? Nothing?

Actual result

Kombu level exception

Additional details

Got this monstrosity while testing qu:

2020-10-19 10:48:22,986 ERROR    quarchive.crawler              - indexing error
Traceback (most recent call last):
  File "/home/cal/src/missive/missive/missive.py", line 188, in handle
    sole_matching_handler(message, self)
  File "/home/cal/src/quarchive/src/server/quarchive/bg_worker.py", line 96, in on_crawl_requested
    ctx.ack(message)
  File "/home/cal/src/missive/missive/missive.py", line 136, in ack
    self.adapter.ack(message)
  File "/home/cal/src/missive/missive/adapters/rabbitmq.py", line 37, in ack
    self._current_kombu_message.ack()
  File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/kombu-5.0.2-py3.7.egg/kombu/message.py", line 122, in ack
    self))
kombu.exceptions.MessageStateError: Message already acknowledged with state: ACK

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/cal/src/missive/missive/adapters/rabbitmq.py", line 91, in run
    conn.drain_events(timeout=drain_timeout)
  File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/kombu-5.0.2-py3.7.egg/kombu/connection.py", line 318, in drain_events
    return self.transport.drain_events(self.connection, **kwargs)
  File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/kombu-5.0.2-py3.7.egg/kombu/transport/pyamqp.py", line 101, in drain_events
    return connection.drain_events(**kwargs)
  File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/amqp-5.0.1-py3.7.egg/amqp/connection.py", line 514, in drain_events
    while not self.blocking_read(timeout):
  File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/amqp-5.0.1-py3.7.egg/amqp/connection.py", line 520, in blocking_read
    return self.on_inbound_frame(frame)
  File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/amqp-5.0.1-py3.7.egg/amqp/method_framing.py", line 77, in on_frame
    callback(channel, msg.frame_method, msg.frame_args, msg)
  File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/amqp-5.0.1-py3.7.egg/amqp/connection.py", line 527, in on_inbound_method
    method_sig, payload, content,
  File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/amqp-5.0.1-py3.7.egg/amqp/abstract_channel.py", line 143, in dispatch_method
    listener(*args)
  File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/amqp-5.0.1-py3.7.egg/amqp/channel.py", line 1613, in _on_basic_deliver
    fun(msg)
  File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/kombu-5.0.2-py3.7.egg/kombu/messaging.py", line 620, in _receive_callback
    return on_m(message) if on_m else self.receive(decoded, message)
  File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/kombu-5.0.2-py3.7.egg/kombu/messaging.py", line 586, in receive
    [callback(body, message) for callback in callbacks]
  File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/kombu-5.0.2-py3.7.egg/kombu/messaging.py", line 586, in <listcomp>
    [callback(body, message) for callback in callbacks]
  File "/home/cal/src/missive/missive/adapters/rabbitmq.py", line 79, in callback
    ctx.handle(message)
  File "/home/cal/src/missive/missive/missive.py", line 200, in handle
    self.ack(message)
  File "/home/cal/src/missive/missive/missive.py", line 136, in ack
    self.adapter.ack(message)
  File "/home/cal/src/missive/missive/adapters/rabbitmq.py", line 37, in ack
    self._current_kombu_message.ack()
  File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/kombu-5.0.2-py3.7.egg/kombu/message.py", line 122, in ack
    self))
kombu.exceptions.MessageStateError: Message already acknowledged with state: ACK

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/cal/src/quarchive/src/server/quarchive/crawler.py", line 148, in add_to_fulltext_index
    ensure_fulltext(session, crawl_uuid)
  File "/home/cal/src/quarchive/src/server/quarchive/crawler.py", line 171, in ensure_fulltext
    crawl_metadata = get_crawl_metadata(session, crawl_uuid)
  File "/home/cal/src/quarchive/src/server/quarchive/data/functions.py", line 571, in get_crawl_metadata
    .filter(CrawlResponse.crawl_uuid == crawl_uuid)
  File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/sqlalchemy/orm/query.py", line 3325, in one
    ret = self.one_or_none()
  File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/sqlalchemy/orm/query.py", line 3294, in one_or_none
    ret = list(self)
  File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/sqlalchemy/orm/query.py", line 3367, in __iter__
    return self._execute_and_instances(context)
  File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/sqlalchemy/orm/query.py", line 3389, in _execute_and_instances
    querycontext, self._connection_from_session, close_with_result=True
  File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/sqlalchemy/orm/query.py", line 3404, in _get_bind_args
    mapper=self._bind_mapper(), clause=querycontext.statement, **kw
  File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/sqlalchemy/orm/query.py", line 3382, in _connection_from_session
    conn = self.session.connection(**kw)
  File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 1133, in connection
    execution_options=execution_options,
  File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 1139, in _connection_for_bind
    engine, execution_options
  File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 408, in _connection_for_bind
    self._assert_active()
  File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 295, in _assert_active
    code="7s2a",
sqlalchemy.exc.InvalidRequestError: This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback(). Original exception was: (psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "full_text_pkey"
DETAIL:  Key (url_uuid)=(57425b9c-9562-506b-b588-0f25bbdbaddc) already exists.

[SQL: INSERT INTO full_text (url_uuid, crawl_uuid, inserted, full_text, tsvector) VALUES (%(url_uuid)s, %(crawl_uuid)s, %(inserted)s, %(full_text)s, to_tsvector(%(to_tsvector_1)s))]
[parameters: {'url_uuid': UUID('57425b9c-9562-506b-b588-0f25bbdbaddc'), 'crawl_uuid': UUID('9e53b2e8-dc84-4e32-b4db-e6cfe6901a40'), 'inserted': datetime.datetime(2020, 10, 19, 9, 48, 22, 925691, tzinfo=datetime.timezone.utc), 'full_text': 'One of the easiest, least expensive ways to make great coffee. How To Make French Press CoffeeOpenOpen the Pocket mobile menuPocketDiscoverMy ListLog ... (35896 characters truncated) ... 0507d7b56b2dcc535be590c73","assetPrefix":"https://assets.getpocket.com/web-discover","isFallback":false,"customServer":true,"gip":true,"appGip":true}', 'to_tsvector_1': 'One of the easiest, least expensive ways to make great coffee. How To Make French Press CoffeeOpenOpen the Pocket mobile menuPocketDiscoverMy ListLog ... (35896 characters truncated) ... 0507d7b56b2dcc535be590c73","assetPrefix":"https://assets.getpocket.com/web-discover","isFallback":false,"customServer":true,"gip":true,"appGip":true}'}]
(Background on this error at: http://sqlalche.me/e/gkpj) (Background on this error at: http://sqlalche.me/e/7s2a)
Traceback (most recent call last):
  File "/home/cal/src/missive/missive/missive.py", line 188, in handle
    sole_matching_handler(message, self)
  File "/home/cal/src/quarchive/src/server/quarchive/bg_worker.py", line 96, in on_crawl_requested
    ctx.ack(message)
  File "/home/cal/src/missive/missive/missive.py", line 136, in ack
    self.adapter.ack(message)
  File "/home/cal/src/missive/missive/adapters/rabbitmq.py", line 37, in ack
    self._current_kombu_message.ack()
  File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/kombu-5.0.2-py3.7.egg/kombu/message.py", line 122, in ack
    self))
kombu.exceptions.MessageStateError: Message already acknowledged with state: ACK

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/cal/.virtualenvs/quarchive/bin/quarchive-bg-worker", line 11, in <module>
    load_entry_point('quarchive', 'console_scripts', 'quarchive-bg-worker')()
  File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/click/core.py", line 764, in __call__
    return self.main(*args, **kwargs)
  File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/click/core.py", line 717, in main
    rv = self.invoke(ctx)
  File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/click/core.py", line 956, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/click/core.py", line 555, in invoke
    return callback(*args, **kwargs)
  File "/home/cal/src/quarchive/src/server/quarchive/bg_worker.py", line 121, in bg_worker
    adapted_processor.run()
  File "/home/cal/src/missive/missive/adapters/rabbitmq.py", line 91, in run
    conn.drain_events(timeout=drain_timeout)
  File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/kombu-5.0.2-py3.7.egg/kombu/connection.py", line 318, in drain_events
    return self.transport.drain_events(self.connection, **kwargs)
  File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/kombu-5.0.2-py3.7.egg/kombu/transport/pyamqp.py", line 101, in drain_events
    return connection.drain_events(**kwargs)
  File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/amqp-5.0.1-py3.7.egg/amqp/connection.py", line 514, in drain_events
    while not self.blocking_read(timeout):
  File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/amqp-5.0.1-py3.7.egg/amqp/connection.py", line 520, in blocking_read
    return self.on_inbound_frame(frame)
  File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/amqp-5.0.1-py3.7.egg/amqp/method_framing.py", line 77, in on_frame
    callback(channel, msg.frame_method, msg.frame_args, msg)
  File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/amqp-5.0.1-py3.7.egg/amqp/connection.py", line 527, in on_inbound_method
    method_sig, payload, content,
  File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/amqp-5.0.1-py3.7.egg/amqp/abstract_channel.py", line 143, in dispatch_method
    listener(*args)
  File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/amqp-5.0.1-py3.7.egg/amqp/channel.py", line 1613, in _on_basic_deliver
    fun(msg)
  File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/kombu-5.0.2-py3.7.egg/kombu/messaging.py", line 620, in _receive_callback
    return on_m(message) if on_m else self.receive(decoded, message)
  File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/kombu-5.0.2-py3.7.egg/kombu/messaging.py", line 586, in receive
    [callback(body, message) for callback in callbacks]
  File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/kombu-5.0.2-py3.7.egg/kombu/messaging.py", line 586, in <listcomp>
    [callback(body, message) for callback in callbacks]
  File "/home/cal/src/missive/missive/adapters/rabbitmq.py", line 79, in callback
    ctx.handle(message)
  File "/home/cal/src/missive/missive/missive.py", line 200, in handle
    self.ack(message)
  File "/home/cal/src/missive/missive/missive.py", line 136, in ack
    self.adapter.ack(message)
  File "/home/cal/src/missive/missive/adapters/rabbitmq.py", line 37, in ack
    self._current_kombu_message.ack()
  File "/home/cal/.virtualenvs/quarchive/lib/python3.7/site-packages/kombu-5.0.2-py3.7.egg/kombu/message.py", line 122, in ack
    self))
kombu.exceptions.MessageStateError: Message already acknowledged with state: ACK

Rejig contexts to allow for processor and handle scoped objects

We want a way for people to scope certain things to

  • the processor's lifecycle (example: database connection pools)
  • the message's lifecycle (example: an individual database connection)

In order to do this:

  • Rename the current HandlingContext to ProcessingContext to indicate that it's per-processor
  • Create a new HandlingContext for each message
  • Create new decorator functions for them: @proc.before_process(proc_ctx) @proc.after_process(proc_ctx) @proc.before_handle(proc_ctx, handle_ctx) and @proc.after_handle(proc_ctx, handle_ctx)

Buggy DLQs cause chaos

Description

If a DLQ raises an exception, that isn't handled and the whole processor crashes

Steps to reproduce

  1. Receive a message
  2. Raise exception in a handler, with a DLQ set
  3. Message sent to DLQ
  4. DLQ also raises exception

Expected result

Not sure.

Option A: Message acked, problem logged at error level but processor continues

Option B: Processor exits in an orderly fashion, with particular status code

Tricky choice to make

Actual result

Disorderly exit

Additional details

RabbitMQAdapter does not stop under load

Description

drain_events doesn't timeout when there is a constant stream of messages, the way that we check for shutdown needs to be changed.

Steps to reproduce

  1. Start a RabbitMQ-adapted proc
  2. Consume a long stream of continuous messages
  3. Start hitting C-c partway through

Expected result

Missive proc exits gracefully at some point

Actual result

Missive proc soldiers wrongly on

Additional details

Probably best to check for shutdown at the end (not the start!) of each message handling and clean up (but allow existing messages to finish)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.