Giter Site home page Giter Site logo

php-enqueue / enqueue-dev Goto Github PK

View Code? Open in Web Editor NEW
2.2K 60.0 426.0 19.4 MB

Message Queue, Job Queue, Broadcasting, WebSockets packages for PHP, Symfony, Laravel, Magento. DEVELOPMENT REPOSITORY - provided by Forma-Pro

Home Page: https://enqueue.forma-pro.com/

License: MIT License

PHP 99.40% Shell 0.36% Dockerfile 0.10% Twig 0.13%
message-queue message-bus rabbitmq amqp stomp job-queue amazon-sqs redis doctrine-dbal rabbitmq-client

enqueue-dev's Introduction

SWUbanner

Enqueue logo

Enqueue Chat Build Status Total Downloads Latest Stable Version License

Supporting Enqueue

Enqueue is an MIT-licensed open source project with its ongoing development made possible entirely by the support of community and our customers. If you'd like to join them, please consider:


Introduction

Enqueue is production ready, battle-tested messaging solution for PHP. Provides a common way for programs to create, send, read messages.

This is a main development repository. It provides a friendly environment for productive development and testing of all Enqueue related features&packages.

Features:

Resources

Developed by Forma-Pro

Forma-Pro is a full stack development company which interests also spread to open source development. Being a team of strong professionals we have an aim an ability to help community by developing cutting edge solutions in the areas of e-commerce, docker & microservice oriented architecture where we have accumulated a huge many-years experience. Our main specialization is Symfony framework based solution, but we are always looking to the technologies that allow us to do our job the best way. We are committed to creating solutions that revolutionize the way how things are developed in aspects of architecture & scalability.

If you have any questions and inquires about our open source development, this product particularly or any other matter feel free to contact at [email protected]

License

It is released under the MIT License.

enqueue-dev's People

Contributors

andrewmy avatar angelsk avatar arjanvdbos avatar askozienko avatar atrauzzi avatar balabis avatar beryllium avatar dgafka avatar dheineman avatar elazar avatar fibula avatar greblov avatar jdecool avatar m3m0r7 avatar makasim avatar mordilion avatar msheakoski avatar nick-zh avatar oskarstark avatar qkdreyer avatar ramunasd avatar rosamarsky avatar samnela avatar snapshotpl avatar steveb-p avatar timesplinter avatar turboboy88 avatar uro avatar versh23 avatar vincentlanglet avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

enqueue-dev's Issues

[Extensions] extensions priority

I'm doing #69 in my app (will send a PR later) and ran into a problem.

My extension catches the Symfony user token and stores it properly in the processor's token storage. But, to be able to use it, I must merge the user with the Doctrine entity manager (user must be managed to be able to use in Blameable or similar), all this works.

Problem was: after I merged the user, DoctrineClearIdentityMapExtension came in and cleared the map so my onPreReceived() ended up being useless. I've worked this by disabling it and doing the same thing from my onPostReceived, but the problem still remains: extensions should be able to specify their priority.

Also, shouldn't DoctrineClearIdentityMapExtension clear the map from onPostReceived, not onPreReceived?

[client] Command, Event segregation.

The rules:

  1. The client message is removed in favor of an event and command objects.
  2. The event object could be sent to messages bus or fan out inside an application. It could be configured using a scope.
  3. The command is sent inside the app to a given command. Anyone else can get that message.

Implementations:

  1. New methods approach:
<?php

$producer->sendEvent($topic, $message);

$producer->sendCommand($processor, $message);
  1. New objects
<?php

$producer->send(new Event($topic, $body));

$producer->send(new Command($processor, $body));

[RabbitMQ] support for wildcard topics ("topic exchange")

RabbitMQ supports specifying an exchange supporting wildcard topics, named a topic exchange.

I'm looking into implementing converting Doctrine events to Enqueue messages (idea is, Doctrine persist and by which triggers async job which was waiting for some precondition). I'd like to be able convert Doctrine events to messages of topic doctrine.<entity>.<event type> (for example, doctrine.image.postPersist). This could be used for all sorts of things, for example converting Symfony events and listening for *.ERROR (which is also on my TODO list).

Having wildcard topic support means processors might subscribe to:

  • doctrine.image.* - only interested in images
  • doctrine.*.postPersist - interested in persisting any entity
  • doctrine.image.postPersist - interested in persiting an Image entity

You can clump all of this together where each processor rejects messages it's not interested in, but in high throughput environments having your processors trigger on each message just to reject them, with RabbitMQ supporting it natively is a wasted opportunity IMO.

[Doctrine] add support to convert Doctrine events to Enqueue messages

Doctrine2 has lifecycle events which trigger on create, update, delete. We can hook into those events and push a new message with enough information a processor might do something with it (for example, clear cache, run webhooks, run long running jobs which waited for that entity to reach that state in the attached state machine, etc).

I've created a sample implementation with usage example from Symfony container (disregard the userToken stuff in the processor, that's a hacky implementation for which I'll open another issue).

It works like this (C/P from gitter):

  1. you create an entity, persist it
  2. Doctrine triggers a postPersist, passing the entity
  3. DoctrineEventConverter creates a new DoctrineEventMessage from the Doctrine event
    (passing event type, entity class and entity PK)
  4. TestProcessor attaches a user token to the message before sending it
    (this is temporary, need a better way to do it, maybe an extension of some sort?)
  5. DoctrineEventConverter sends the message via the producer
  6. <transport>
  7. worker running enqueue:consume picks up the message, passes it back to DoctrineEventConverter
  8. DoctrineEventConverter creates a new DoctrineEventMessage from the PSR message
  9. TestProcessor pulls the user token from the message, stores it to its own tokenStorage and also to gedmo (temporary, also)
  10. DoctrineEventConverter fetches the entity by class+ID, stores it to DoctrineEventMessage and passes it to TestProcessor
  11. TestProcessor appends the string " processed" to the entity name
  12. Doctrine triggers a postUpdate, passing the entity
  13. we start from 3) again, but this time the origin is the worker, not ourselves
  14. if anybody was listening for postUpdate, it would trigger here again (so you can chain events)
  15. it stops once all processors decline to do an entity change

[Symfony] add support to transfer tokenStorage (user info) to the worker

Implementing #68, I've found the processor needs to have information which user ran the query (I'm using the Blameable Doctrine2 extension).

I've added this as a temporary hack in the implementation, but the real solution would be to have an extension which would transfer the current tokenStorage from the message origin to the processor. This way we could use other Symfony features (like authorizationChecker for example) in our processors as naturally as we would in the web app.

We coul probably also trigger some core Symfony event (like kernel.request?) to make other listeners run, but that would be a nice bonus: after having the user token in place, it's easy doing some of the stuff yourself.

Amqp lib support

I am going to use enqueue in our next project but I cannot use amqp extension. Are you planning to support phpamqplib?

[amqp] Get message count

In AMQPContext::declareQueue(), calling $extQueue->declareQueue() returns an integer which is the message count. But this isn't returned anywhere by the initial method.

Is my only option to override this context with my own, and copy / paste the declareQueue() method code, adding a return for the call through to AMQP::declareQueue()? Can I provide a PR? Or is there another way to get a message count from a queue? Thanks!

Multiple consumer handling one message

Hi

I have a usage like this: after a blog post is published, I need to submit it to multiple places such as Apple News, ElasticSearch etc
In RabbitMQ I used to use one exchange and multiple queue / consumer to do this.

I wondering is this still possible with the Enqueue library.

To goal is to publish the message once, and consume by unknown amount of consumer.

Thanks
Kayue

[Symfony] Symfony 3.3 / 4.x compatibility for ProxyEventDispatcher

On Symfony 3.3, I'm getting an error:

The "Enqueue\Bundle\Events\ProxyEventDispatcher" class extends "Symfony\Component\EventDispatcher\ContainerAwareEventDispatcher" that is deprecated since 3.3, to be removed in 4.0. Use EventDispatcher with closure factories instead.

Additional drivers

Hey!

This library looks awesome, nice work!
I'd really like to use it to replace our existing implementation, but we currently use SQS and a database as Transports. Are there any plans to support SQS/Database as Transports in enqueue, or would PR's be considered?

Thanks!

[RabbitMQ] High resource usage in AmqpConsumer::receiveBasicGet()

TLDR recieveBasicGet() (the default receive method) is using a lot of CPU and other resources as it's calling AMQPQueue::get() as fast as it can, unbound. Should be able to switch receive method to consume and/or fix this one.

I'm switching my Dockerized app from using php:7.1-alpine to alpine:3.6 and installing php7* packages required (Dockerfile), Alpine now supports PHP 7.1 and it should make my process somewhat more streamlined.

Problem is, Enqueue seems to work WAY worse in the new environment. What I mean is, resource usage goes through the roof as soon as I start a consumer (note, there are no messages produced or consumed here, it's just idle wait). When using RabbitMQ's web management console, I see my consumer's connection doing 500KB/s traffic (as I said, no messages produced, it's waiting for something to happen).

I cannot quite pinpoint why this happens as the PHP version is the same, PHP modules are the same, codebase (including Enqueue) is exactly the same, etc. Obviously, this means I shouldn't report this bug here. :)

What does stand out is the code in recieveBasicGet(), it's basically calling a non-blocking AMQPQueue::get() without any restrictions. I'm guessing this is the core cause of this problem as I'm able to avoid this completely if I add usleep(10000) to recieveBasicGet() or switch the receive method to basic_consume (both of which I'm unable to do without changing the vendor source).

[bundle] The bundle does not work correctly with env parameters set as tag attr.

This does not work

# app/config/config.yml

parameters:
    enqueue_router_topic: '%env(ENQUEUE_ROUTER_TOPIC)%'
    enqueue_router_queue: '%env(ENQUEUE_ROUTER_QUEUE)%'

enqueue:
    client:
        router_topic: '%enqueue_router_topic%'
        router_queue: '%enqueue_router_queue%'

If you run ./bin/console enqueue:queue it does not show any queues accept router one.

Add support for Google Cloud Pub/Sub

I noticed that there have been several new transports added recently. It would be nice to also support Google Cloud Pub/Sub as it would allow me to drop another dependency from my project.

Thanks for all the hard work!

Add support for production extensions

Much like consumption extensions, it would be great to have a production extension. If possible, method names should not overlap as you might like to have the same extension class be both a production and consumption extension.

It would allow for implementing #69 like a extension in which the same extension would sent itself the user token, receive and store it at the other end.

[transport] Add Psr prefix to transport interfaces.

The implementations have prefixes. We have AmqpProducer, FsProducer and NullProducer the interfaces though do not have prefix.

There is another problem as well both the consumption component and psr have Context class which often used at the same. I as developer end prefixing one of them.

Invalid typehint for Enqueue\Client\Message::setBody

When using PHPStan with my code, it figured out I'm passing an array to Enqueue\Client\Message::setBody (which is supported, as it's serialized to JSON), but the typehint says @param null|string $body, get an error:

Parameter #1 $body of method Enqueue\Client\Message::setBody() expects string|null, string[] given. 

JobQueue/Job shouldn't be required when Doctrine schema update

When I run ./bin/console do:sc:up --force I encounter the following error:

PHP Fatal error:  Class 'Enqueue\JobQueue\Job' not found in /home/ubuntu/HypebeastEditorial/vendor/enqueue/enqueue-bundle/Entity/Job.php on line 13

JobQueue shouldn't be needed because it is default off.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.