Giter Site home page Giter Site logo

symfony-messenger's Introduction

Bridge to use Symfony Messenger on AWS Lambda with Bref.

This bridge allows messages to be dispatched to SQS, SNS or EventBridge, while workers handle those messages on AWS Lambda.

Installation

This guide assumes that:

First, install this package:

composer require bref/symfony-messenger

Next, register the bundle in config/bundles.php:

return [
    // ...
    Bref\Symfony\Messenger\BrefMessengerBundle::class => ['all' => true],
];

SQS, SNS and EventBridge can now be used with Symfony Messenger.

Usage

Symfony Messenger dispatches messages. To create a message, follow the Symfony Messenger documentation.

To configure where messages are dispatched, all the examples in this documentation are based on the example from the Symfony documentation:

# config/packages/messenger.yaml

framework:
    messenger:
        transports:
            async: '%env(MESSENGER_TRANSPORT_DSN)%'
        routing:
             'App\Message\MyMessage': async

SQS

The SQS service is a queue that works similar to RabbitMQ. To use it, set its URL in the environment variable MESSENGER_TRANSPORT_DSN:

MESSENGER_TRANSPORT_DSN=https://sqs.us-east-1.amazonaws.com/123456789/my-queue

That's it, messages will be dispatched to that queue.

The implementation uses the SQS transport provided by Symfony Amazon SQS Messenger, so all those features are supported. If you already use that transport, the transition to AWS Lamdba is very easy and should not require any change for dispatching messages.

Create the SQS queue

You can create the Queue yourself in the Console, write custom Cloudformation or use Lift's Queue construct that will handle that for you.

Here is a simple example with Lift, make sure to install the plugin first and check out the full documentation for more details.

# serverless.yml

service: my-app
provider:
    name: aws
    environment:
        ...
        MESSENGER_TRANSPORT_DSN: ${construct:jobs.queueUrl}

constructs:
    jobs:
        type: queue
        worker:
            handler: bin/consumer.php
            timeout: 20 # in seconds
            reservedConcurrency: 5 # max. 5 messages processed in parallel
            layers:
                - ${bref:layer.php-80}

plugins:
    - serverless-lift

In all cases, you would want to disable auto_setup to avoid extra requests and permission issues.

# config/packages/messenger.yaml

framework:
    messenger:
        transports:
            async: 
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    auto_setup: false

Add permissions

When running Symfony on AWS Lambda, it is not necessary to configure credentials. The AWS client will read them from environment variables automatically.

You just have to provide the correct role statements in serverless.yml and Lambda will take care of the rest. The required IAM permission to publish to SQS using Messenger is sqs:SendMessage on the given queue.

If you use Lift, this is done automatically for you.

Consume messages from SQS

  1. If you don't use Lift, create the function that will be invoked by SQS in serverless.yml:
functions:
    worker:
        handler: bin/consumer.php
        timeout: 20 # in seconds
        reservedConcurrency: 5 # max. 5 messages processed in parallel
        layers:
            - ${bref:layer.php-80}
        events:
            # Read more at https://www.serverless.com/framework/docs/providers/aws/events/sqs/
            - sqs:
                arn: arn:aws:sqs:us-east-1:1234567890:my_sqs_queue
                # Only 1 item at a time to simplify error handling
                batchSize: 1
  1. Create the handler script (for example bin/consumer.php):
<?php declare(strict_types=1);

use Bref\Symfony\Messenger\Service\Sqs\SqsConsumer;

require dirname(__DIR__) . '/config/bootstrap.php';

$kernel = new \App\Kernel($_SERVER['APP_ENV'], (bool) $_SERVER['APP_DEBUG']);
$kernel->boot();

// Return the Bref consumer service
return $kernel->getContainer()->get(SqsConsumer::class);

If you are using Symfony 5.1 or later, use this instead:

<?php declare(strict_types=1);

use Bref\Symfony\Messenger\Service\Sqs\SqsConsumer;
use Symfony\Component\Dotenv\Dotenv;

require dirname(__DIR__).'/vendor/autoload.php';

(new Dotenv())->bootEnv(dirname(__DIR__).'/.env');

$kernel = new \App\Kernel($_SERVER['APP_ENV'], (bool)$_SERVER['APP_DEBUG']);
$kernel->boot();

// Return the Bref consumer service
return $kernel->getContainer()->get(SqsConsumer::class);
  1. Register and configure the SqsConsumer service:
# config/services.yaml
services:
    Bref\Symfony\Messenger\Service\Sqs\SqsConsumer:
        public: true
        autowire: true
        arguments:
            # true enables partial SQS batch failure
            # Enabling this without proper SQS config will consider all your messages successful
            # See https://bref.sh/docs/function/handlers.html#partial-batch-response for more details.
            $partialBatchFailure: false

Now, anytime a message is dispatched to SQS, the Lambda function will be called. The Bref consumer class will put back the message into Symfony Messenger to be processed.

FIFO Queue

The FIFO queue guarantees exactly once delivery, and has a mandatory queue name suffix .fifo:

# config/packages/messenger.yaml

framework:
    messenger:
        transports:
            async: 
                dsn: 'https://sqs.us-east-1.amazonaws.com/123456789/my-queue.fifo'
# serverless.yml
resources:
    Resources:
        Queue:
            Type: AWS::SQS::Queue
            Properties:
                QueueName: my-queue.fifo
                FifoQueue: true

Symfony Amazon SQS Messenger will automatically calculate/set the MessageGroupId and MessageDeduplicationId parameters required for FIFO queues, but you can set them explicitly:

use Symfony\Component\Messenger\MessageBus;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsFifoStamp;

/* @var MessageBus $messageBus */
$messageBus->dispatch(new MyAsyncMessage(), [new AmazonSqsFifoStamp('my-group-message-id', 'my-deduplication-id')]);

Everything else is identical to the normal SQS queue.

SNS

AWS SNS is "notification" instead of "queues". Messages may not arrive in the same order as sent, and they might arrive all at once. To use it, create a SNS topic and set it as the DSN:

MESSENGER_TRANSPORT_DSN=sns://arn:aws:sns:us-east-1:1234567890:foobar

That's it, messages will be dispatched to that topic.

Note: when running Symfony on AWS Lambda, it is not necessary to configure credentials. The AWS client will read them from environment variables automatically.

To consume messages from SNS:

  1. Create the function that will be invoked by SNS in serverless.yml:
functions:
    worker:
        handler: bin/consumer.php
        timeout: 20 # in seconds
        reservedConcurrency: 5 # max. 5 messages processed in parallel
        layers:
            - ${bref:layer.php-80}
        events:
            # Read more at https://www.serverless.com/framework/docs/providers/aws/events/sns/
            - sns:
                arn: arn:aws:sns:us-east-1:1234567890:my_sns_topic
  1. Create the handler script (for example bin/consumer.php):
<?php declare(strict_types=1);

use Bref\Symfony\Messenger\Service\Sns\SnsConsumer;

require dirname(__DIR__) . '/config/bootstrap.php';

$kernel = new \App\Kernel($_SERVER['APP_ENV'], (bool) $_SERVER['APP_DEBUG']);
$kernel->boot();

// Return the Bref consumer service
return $kernel->getContainer()->get(SnsConsumer::class);

If you are using Symfony 5.1 or later, use this instead:

<?php declare(strict_types=1);

use Bref\Symfony\Messenger\Service\Sns\SnsConsumer;
use Symfony\Component\Dotenv\Dotenv;

require dirname(__DIR__).'/vendor/autoload.php';

(new Dotenv())->bootEnv(dirname(__DIR__).'/.env');

$kernel = new \App\Kernel($_SERVER['APP_ENV'], (bool) $_SERVER['APP_DEBUG']);
$kernel->boot();

// Return the Bref consumer service
return $kernel->getContainer()->get(SnsConsumer::class);
  1. Register and configure the SnsConsumer service:
# config/services.yaml
services:
    Bref\Symfony\Messenger\Service\Sns\SnsConsumer:
        public: true
        autowire: true

Now, anytime a message is dispatched to SNS, the Lambda function will be called. The Bref consumer class will put back the message into Symfony Messenger to be processed.

EventBridge

AWS EventBridge is a message routing service. It is similar to SNS, but more powerful. To use it, configure the DSN like so:

# "myapp" is the EventBridge "source", i.e. a namespace for your application's messages
# This source name will be reused in `serverless.yml` later.
MESSENGER_TRANSPORT_DSN=eventbridge://myapp

Optionally you can add set the EventBusName via a event_bus_name query parameter, either the name or the ARN:

MESSENGER_TRANSPORT_DSN=eventbridge://myapp?event_bus_name=custom-bus
MESSENGER_TRANSPORT_DSN=eventbridge://myapp?event_bus_name=arn:aws:events:us-east-1:123456780912:event-bus/custom-bus

That's it, messages will be dispatched to EventBridge.

Note: when running Symfony on AWS Lambda, it is not necessary to configure credentials. The AWS client will read them from environment variables automatically.

To consume messages from EventBridge:

  1. Create the function that will be invoked by EventBridge in serverless.yml:
functions:
    worker:
        handler: bin/consumer.php
        timeout: 20 # in seconds
        reservedConcurrency: 5 # max. 5 messages processed in parallel
        layers:
            - ${bref:layer.php-80}
        events:
            # Read more at https://www.serverless.com/framework/docs/providers/aws/events/event-bridge/
            -   eventBridge:
                    # In case of you change bus name in config/packages/messenger.yaml (i.e eventbridge://myapp?event_bus_name=custom-bus) you need to set bus name like below
                    # eventBus: custom-bus
                    # This filters events we listen to: only events from the "myapp" source.
                    # This should be the same source defined in config/packages/messenger.yaml
                    pattern:
                        source:
                            - myapp
  1. Create the handler script (for example bin/consumer.php):
<?php declare(strict_types=1);

use Bref\Symfony\Messenger\Service\EventBridge\EventBridgeConsumer;

require dirname(__DIR__) . '/config/bootstrap.php';

$kernel = new \App\Kernel($_SERVER['APP_ENV'], (bool) $_SERVER['APP_DEBUG']);
$kernel->boot();

// Return the Bref consumer service
return $kernel->getContainer()->get(EventBridgeConsumer::class);

If you are using Symfony 5.1 or later, use this instead:

<?php declare(strict_types=1);

use Bref\Symfony\Messenger\Service\EventBridge\EventBridgeConsumer;
use Symfony\Component\Dotenv\Dotenv;

require dirname(__DIR__).'/vendor/autoload.php';

(new Dotenv())->bootEnv(dirname(__DIR__).'/.env');

$kernel = new \App\Kernel($_SERVER['APP_ENV'], (bool) $_SERVER['APP_DEBUG']);
$kernel->boot();

// Return the Bref consumer service
return $kernel->getContainer()->get(EventBridgeConsumer::class);
  1. Register and configure the EventBridgeConsumer service:
# config/services.yaml
services:
    Bref\Symfony\Messenger\Service\EventBridge\EventBridgeConsumer:
        public: true
        autowire: true
        arguments:
            $transportName: 'async'
            # Optionnally, if you have different buses in config/packages/messenger.yaml, set $bus like below:
            # $bus: '@event.bus'

Now, anytime a message is dispatched to EventBridge for that source, the Lambda function will be called. The Bref consumer class will put back the message into Symfony Messenger to be processed.

Error handling

AWS Lambda has error handling mechanisms (retrying and handling failed messages). Because of that, this package does not integrates Symfony Messenger's retry mechanism. Instead, it works with Lambda's retry mechanism.

This section is work in progress, feel free to contribute to improve it.

When a message fails with SQS, by default it will go back to the SQS queue. It will be retried until the message expires. Here is an example to setup retries and "dead letter queue" with SQS:

# serverless.yml
resources:
    Resources:
        Queue:
            Type: AWS::SQS::Queue
            Properties:
                # This needs to be at least 6 times the lambda function's timeout
                # See https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html
                VisibilityTimeout: '960'
                RedrivePolicy:
                    deadLetterTargetArn: !GetAtt DeadLetterQueue.Arn
                    # Jobs will be retried 5 times
                    # The number needs to be at least 5 per https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html
                    maxReceiveCount: 5
        # The dead letter queue is a SQS queue that receives messages that failed to be processed
        DeadLetterQueue:
            Type: AWS::SQS::Queue
            Properties:
                # Messages are stored up to 14 days (the max)
                MessageRetentionPeriod: 1209600

When using SNS and EventBridge, messages will be retried by default 2 times.

Configuration

Configuring AWS clients

By default, AWS clients (SQS, SNS, EventBridge) are preconfigured to work on AWS Lambda (thanks to environment variables populated by AWS Lambda).

However, it is possible customize the AWS clients, for example to use them outside of AWS Lambda (locally, on EC2…) or to mock them in tests. These clients are registered as Symfony services under the keys:

  • bref.messenger.sqs_client
  • bref.messenger.sns_client
  • bref.messenger.eventbridge_client

For example to customize the SQS client:

services:
    bref.messenger.sqs_client:
        class: AsyncAws\Sqs\SqsClient
        public: true # the AWS clients must be public
        arguments:
            # Apply your own config here
            -
                region: us-east-1

Automatic transport recognition

Automatic transport recognition is primarily handled by default through TransportNameResolvers for SNS and SQS, ensuring that the transport name is automatically passed to your message handlers. However, in scenarios where you need to manually specify the transport name or adjust the default behavior, you can do so by setting the $transportName parameter in your service definitions within the config/services.yaml file. This parameter should match the transport name defined in your config/packages/messenger.yaml. For instance, for a SNSConsumer, you would configure it as follows:

# config/packages/messenger.yaml
framework:
  messenger:
    transports:
      async: '%env(MESSENGER_TRANSPORT_DSN)%'
# config/services.yaml
services:
    Bref\Symfony\Messenger\Service\Sns\SnsConsumer:
        public: true
        autowire: true
        arguments:
            # Pass the transport name used in config/packages/messenger.yaml
            $transportName: 'async'

Disabling transports

By default, this package registers Symfony Messenger transports for SQS, SNS and EventBridge.

If you want to disable some transports (for example in case of conflict), you can remove BrefMessengerBundle from config/bundles.php and reconfigure the transports you want in your application's config. Take a look at Resources/config/services.yaml to copy the part that you want.

Customizing the serializer

If you want to change how messages are serialized, for example to use Happyr message serializer, you need to add the serializer on both the transport and the consumer. For example:

# config/packages/messenger.yaml
framework:
    messenger:
        transports:
            async: 
                dsn: 'https://sqs.us-east-1.amazonaws.com/123456789/my-queue'
                serializer: 'Happyr\MessageSerializer\Serializer'

# config/services.yaml
services:
    Bref\Symfony\Messenger\Service\Sqs\SqsConsumer:
        public: true
        autowire: true
        arguments:
            $serializer: '@Happyr\MessageSerializer\Serializer'

symfony-messenger's People

Contributors

akondas avatar alpha1125 avatar aradoje avatar ddeboer avatar etienneroudeix avatar hadeli avatar jdalmas avatar maxell92 avatar mnapoli avatar nealio82 avatar nyholm avatar robinlehrmann avatar smoench avatar starred-gijs avatar t-richard avatar tyx avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

symfony-messenger's Issues

Not implemented In SqsTransport.php line 73

% bin/console messenger:consume --limit=2500 --time-limit=600 --sleep=30 -vv

                                                                                                                        
 [OK] Consuming messages from transports "sqs".                                                                         
                                                                                                                        

 // The worker will automatically exit once it has processed 2500 messages, been running for 600s or received a stop    
 // signal via the messenger:stop-workers command.                                                                      

 // Quit the worker with CONTROL-C.                                                                                     


In SqsTransport.php line 73:
                   
  [Exception]      
  Not implemented  
                   

Exception trace:
  at /Users/dplatt/Sites/tracking.ukwm.co.uk/app/vendor/bref/symfony-messenger/src/Service/Sqs/SqsTransport.php:73
 Bref\Symfony\Messenger\Service\Sqs\SqsTransport->get() at /Users/dplatt/Sites/tracking.ukwm.co.uk/app/vendor/symfony/messenger/Worker.php:76
 Symfony\Component\Messenger\Worker->run() at /Users/dplatt/Sites/tracking.ukwm.co.uk/app/vendor/symfony/messenger/Command/ConsumeMessagesCommand.php:202
 Symfony\Component\Messenger\Command\ConsumeMessagesCommand->execute() at /Users/dplatt/Sites/tracking.ukwm.co.uk/app/vendor/symfony/console/Command/Command.php:255
 Symfony\Component\Console\Command\Command->run() at /Users/dplatt/Sites/tracking.ukwm.co.uk/app/vendor/symfony/console/Application.php:1019
 Symfony\Component\Console\Application->doRunCommand() at /Users/dplatt/Sites/tracking.ukwm.co.uk/app/vendor/symfony/framework-bundle/Console/Application.php:97
 Symfony\Bundle\FrameworkBundle\Console\Application->doRunCommand() at /Users/dplatt/Sites/tracking.ukwm.co.uk/app/vendor/symfony/console/Application.php:271
 Symfony\Component\Console\Application->doRun() at /Users/dplatt/Sites/tracking.ukwm.co.uk/app/vendor/symfony/framework-bundle/Console/Application.php:83
 Symfony\Bundle\FrameworkBundle\Console\Application->doRun() at /Users/dplatt/Sites/tracking.ukwm.co.uk/app/vendor/symfony/console/Application.php:147
 Symfony\Component\Console\Application->run() at /Users/dplatt/Sites/tracking.ukwm.co.uk/app/bin/console:42

messenger:consume [-l|--limit LIMIT] [-m|--memory-limit MEMORY-LIMIT] [-t|--time-limit TIME-LIMIT] [--sleep SLEEP] [-b|--bus BUS] [-h|--help] [-q|--quiet] [-v|vv|vvv|--verbose] [-V|--version] [--ansi] [--no-ansi] [-n|--no-interaction] [-e|--env ENV] [--no-debug] [--] <command> [<receivers>...]

I'm not sure how I'm supposed to implement this.
I have an app that is based on RabbitMQ, and upgrading it to Symfony Messenger.

Any suggestion?

PartialBatchFailure issue

Hello !

We had few weird issues since we enabled partialBatchFailure.

The main idea is we had some messages that were handled successfully by Symfony BUT requeue in SQS.

The missing clue I found this morning, it happens only when we have another failure in the batch.

So partialbatchFailure become my first target and I just find those lines :

if ($isFifoQueue && $hasPreviousMessageFailed) {
$this->markAsFailed($record);
}

I totally understand why we want to markAsFailed to preserve message order BUT why do we want to handle them when we're going to requeue them by marking them as failed ?

In our situation it leads to some software issue because the batch got 2 events that mutate the same data. So the last win, and as the whole events were marked as failed, the 1st failed at the second run to mutate again because the 2nd did also its job.

Is there anything I miss @t-richard ?

I can totally help with the patch of course but I want to be sure we are ok on the solution ^^

Architectural Improvements for SNS-SQS-Lambda Integration with Symfony Messenger

We have implemented a system where an SNS Topic distributes messages to multiple SQS Queues, each backed by a dedicated Lambda function. The primary reason for this setup is to assign a unique $transportName to each Lambda through environment variables, which is necessary for the Symfony Messenger-based SqsConsumer class from your library.

Current Architecture
SNS Topic: Distributes messages to multiple SQS Queues and their corresponding Dead Letter Queues (DLQs).
SQS Queues: Each queue has a dedicated Lambda function.
Lambda Functions: Created for each SQS Queue to process messages individually, with a unique $transportName set via environment variables.

resource "aws_lambda_function" "user_account_send_mail" { // Lambda function configuration... environment { variables = { // Environment variables... AWS_SUBSCRIBER_TRANSPORT_NAME = "async_${each.key}" } } // Additional configuration... }

Currently, each message handler is linked to a specific transport, as indicated by annotations like
#[AsMessageHandler(fromTransport: 'async_subscriber_send_mail_user_changed')]
or
#[AsMessageHandler(fromTransport: 'async_subscriber_backup_user_changed')]
(Both message handlers consumes the same event)

This setup leads to limitations, such as the inability to process the same event by multiple handlers without duplicating messages across different SQS Queues.

Is there a way to eliminate the need for separate transports ($transportName) https://github.com/brefphp/symfony-messenger/blob/master/src/Service/Sqs/SqsConsumer.php#L38 and deduce this information dynamically, perhaps from the messenger.yaml configuration or the SQS event payload?

I'm asking because I want to get rid of many lambdas for each sqs queue and just use one lambda. Maybe you know a different way (if it's not possible to get rid of the $transportName) How I can configure my lambda dynamically? :)

messenger.yaml

framework:
    messenger:
        transports:
            async_event_user_changed: '%env(MESSENGER_EVENT_USER_CHANGED_TRANSPORT_DSN)%'
            async_subscriber_send_mail_user_removed: '%env(MESSENGER_SUBSCRIBER_SEND_MAIL_USER_REMOVED_TRANSPORT_DSN)%'
            async_subscriber_backup_user_changed: '%env(MESSENGER_SUBSCRIBER_BACKUP_USER_CHANGED_TRANSPORT_DSN)%'
            
        routing:
            'App\Event\UserChangedEvent': async_event_user_changed

Batch failure with Fifo queue

We just detect (in fact we detected it a long time ago but we had no time to deal with it :p ) that the partialBatchFailure feature does not work correctly for fifo queue.

At this time, if a message failed in a fifo queue, every messages after it are marked as failed.

In fact if the message had a message group id, we should only marked as failed the next messages with the same message group id.

Currently, a failing event could block a whole batch of events event if you configured correctly each messageGroupId.

I will work on the PR of course, I'm not yet sure how get back the message group id ATM

403 Access denied after upgrading to 0.4.0 and bref 1.1.4

Hi,

First of all I don't know if I should post this here or in https://github.com/brefphp/bref/.

I've been getting the error HTTP 403 returned for "https://sqs.eu-west-3.amazonaws.com/". Code: AccessDenied Message: Access to the resource https://sqs.eu-west-3.amazonaws.com/ is denied. Type: Sender Detail: whenever I try to call one of my SQS queues.

I've been using bref/symfony-messenger 0.2.0 and bref/bref 0.5.29 without any issue but I now get the 403 error since I upgraded to bref/symfony-messenger 0.4.0 and bref/bref 1.1.4

I didn't change anything in AWS or in the credentials, and reverting to the previous version fixes the issue.
I only made the required changes so that the configuration works with the new version, like changing the runtime from provided to provided.al2

This is the content of my messenger.yaml:

framework:
  messenger:
    transports:
      export:
        dsn: '%env(MESSENGER_TRANSPORT_DSN_EXPORT)%'
        options:
          auto_setup: false
        retry_strategy:
          max_retries: 1
      import:
        dsn: '%env(MESSENGER_TRANSPORT_DSN_IMPORT)%'
        options:
          auto_setup: false
        retry_strategy:
          max_retries: 1
      analyze:
        dsn: '%env(MESSENGER_TRANSPORT_DSN_ANALYZE)%'
        options:
          auto_setup: false
        retry_strategy:
          max_retries: 1

    routing:
      'App\Message\ExportMessage': export
      'App\Message\ImportMessage': import
      'App\Message\AnalyzeInvoiceMessage': analyze

bref_messenger:
  sqs: true # Register the SQS transport

Do you know of anything I should be doing so that the authorization works with newer versions of bref/symfony-messenger and bref/bref?
I've been struggling with this for days so if you have any idea of what could be the issue here please let me know.

Thank you

Message handler does not seem to be working

Hi,

Thanks for this package and the brefphp bundle in general, it's magical !

I do have an issue to make the consumer work on my lambda though.

My setup works fine when using the doctrine queue, but when I switch in order to use the SQS queue, the message is sent to the SQS and then the lambda seems to 'consume' the message, based on the logs, but my process (sending a message) is not happening.

I tried debugging my MessageHandler using var_dump and the Psr\Logger but nothing is being logged.

All I know is that the lambda run for about ~6s each time but nothing more :/

I checked the lambda timeout (15min), vpc, env varibales and the sqs config but I can't seem to find why it's not working.

An interesting fact is that one of my other message type is working when using the same sqs + lambda.

Any idea what I could be missing ?

Thanks in advance for your help,
Julien Pessey

P.S : I did not know what config file to include (I basically followed the doc) so please feel free to ask me for any !

Implement partial failure on SqsConsumer

Since brefphp has now support for partial batch failure, (brefphp/bref#1113) I was pretty confident to implement it into the SqsConsumer.

Except I'm not sure about the behavior we want.

Decorates the whole loop iteration of SqsConsumer with a try cach and markAsFailed $record in catch block looks a good idea at the first glance. But doing this will prevent to reach the stopfailure method that currently log the exception. So it will lead to losing this log.

I'm available to make the implementation but if anyone comes with a better idea how handle it, it would be great.

Deprecation notice in Symfony >= 5.1

Getting deprecation notices.

 1x: Since symfony/dependency-injection 5.1: The "Psr\Container\ContainerInterface" autowiring alias is deprecated. Define it explicitly in your app if you want to keep using it. It is being referenced by the "Bref\Symfony\Messenger\Service\Sns\SnsTransportFactory" service.

 1x: Since symfony/dependency-injection 5.1: The "Psr\Container\ContainerInterface" autowiring alias is deprecated. Define it explicitly in your app if you want to keep using it. It is being referenced by the "Bref\Symfony\Messenger\Service\EventBridge\EventBridgeTransportFactory" service.

Would be grat to fix them. Thank you!

Using with Monolog results in no mail?

Maybe some of you already know this "issue".
I'm wondering why I got no error message of the messenger worker.
But in the log of CloudWatch I can see all of this.

My monolog.yaml looks like:

monolog:
    handlers:
        main:
            type: fingers_crossed
            action_level: error
            handler: grouped
            excluded_http_codes: [404, 405, 409]
            buffer_size: 50
        grouped:
            type:    group
            members: [nested, deduplicated]
        deduplicated:
            type:    deduplication
            handler: swift
        swift:
            type:       swift_mailer
            from_email: '[email protected]'
            to_email:   ['[email protected]']
            subject:    'Error! %%message%%'
            level:      debug
            formatter:  monolog.formatter.html
            content_type: text/html
        nested:
            type: stream
            path: "php://stderr"
            level: debug
        console:
            type: console
            process_psr_3_messages: false
            channels: ["!event", "!doctrine"]

And the exception is:

{
    "errorType": "Symfony\\Component\\Messenger\\Exception\\HandlerFailedException",
    "errorMessage": "Handling \"App\\Message\\PaymentCapturedMessage\" failed: Call to undefined method",
    "stack": [
        "#0 /var/task/vendor/symfony/messenger/Middleware/SendMessageMiddleware.php(74): Symfony\\Component\\Messenger\\Middleware\\HandleMessageMiddleware->handle()",
        "#1 /var/task/vendor/symfony/messenger/Middleware/FailedMessageProcessingMiddleware.php(34): Symfony\\Component\\Messenger\\Middleware\\SendMessageMiddleware->handle()",
        "#2 /var/task/vendor/symfony/messenger/Middleware/DispatchAfterCurrentBusMiddleware.php(68): Symfony\\Component\\Messenger\\Middleware\\FailedMessageProcessingMiddleware->handle()",
        "#3 /var/task/vendor/symfony/messenger/Middleware/RejectRedeliveredMessageMiddleware.php(48): Symfony\\Component\\Messenger\\Middleware\\DispatchAfterCurrentBusMiddleware->handle()",
        "#4 /var/task/vendor/symfony/messenger/Middleware/AddBusNameStampMiddleware.php(37): Symfony\\Component\\Messenger\\Middleware\\RejectRedeliveredMessageMiddleware->handle()",
        "#5 /var/task/vendor/symfony/messenger/MessageBus.php(77): Symfony\\Component\\Messenger\\Middleware\\AddBusNameStampMiddleware->handle()",
        "#6 /var/task/vendor/bref/symfony-messenger/src/Service/SimpleBusDriver.php(24): Symfony\\Component\\Messenger\\MessageBus->dispatch()",
        "#7 /var/task/vendor/bref/symfony-messenger/src/Service/Sqs/SqsConsumer.php(58): Bref\\Symfony\\Messenger\\Service\\SimpleBusDriver->putEnvelopeOnBus()",
        "#8 /var/task/vendor/bref/bref/src/Event/Sqs/SqsHandler.php(24): Bref\\Symfony\\Messenger\\Service\\Sqs\\SqsConsumer->handleSqs()",
        "#9 /var/task/vendor/bref/bref/src/Runtime/Invoker.php(29): Bref\\Event\\Sqs\\SqsHandler->handle()",
        "#10 /var/task/vendor/bref/bref/src/Runtime/LambdaRuntime.php(91): Bref\\Runtime\\Invoker->invoke()",
        "#11 /opt/bref/bootstrap.php(43): Bref\\Runtime\\LambdaRuntime->processNextEvent()",
        "#12 {main}"
    ]
}

Use symfony/amazon-sqs-messenger Transport for sending messages

I was wondering if it is an idea now symfony/amazon-sqs-messenger is released, to use the Transport from that package so this package handles only the serverless consumer side?
Does that make sense?

I doesn't seem right to keep that functionality in 2 places?

SQS record with id "%s" failed to be processed. But failure was marked as unrecoverable.

Hello,

sometimes we get these two messages in our error logs:

SQS record with id "UUID" failed to be processed. But failure was marked as unrecoverable. Message will be acknowledged.

unlabeled event

These errors are logged here: https://github.com/brefphp/symfony-messenger/blob/master/src/Service/Sqs/SqsConsumer.php#L94. I am not able to reproduce this error, so it is very hard to debug.

Any idea when this error could occur? I do not know why only the message is logged and not the whole Exception. Can I send a PR logging the Exception instead of the message?

Can not process messages encoded with Symfony serializer

dcf9e264-9bfe-5b94-b98d-b553c2ade3f0	Invoke Error	
{
    "errorType": "Symfony\\Component\\Messenger\\Exception\\MessageDecodingFailedException",
    "errorMessage": "Encoded envelope should have at least a \"body\" and some \"headers\".",
    "stack": [
        "#0 /var/task/vendor/bref/symfony-messenger/src/Service/Sqs/SqsConsumer.php(38): Symfony\\Component\\Messenger\\Transport\\Serialization\\Serializer->decode()",
        "#1 /var/task/vendor/bref/bref/src/Event/Sqs/SqsHandler.php(18): Bref\\Symfony\\Messenger\\Service\\Sqs\\SqsConsumer->handleSqs()",
        "#2 /var/task/vendor/bref/bref/src/Runtime/LambdaRuntime.php(104): Bref\\Event\\Sqs\\SqsHandler->handle()",
        "#3 /opt/bref/bootstrap.php(38): Bref\\Runtime\\LambdaRuntime->processNextEvent()",
        "#4 {main}"
    ]
}

Reason is that SqsConsumer does not provide header data from SQS message attributes.
\Bref\Symfony\Messenger\Service\Sqs\SqsConsumer::handleSqs

Handling messages from SQS not working

Somehow when the consumer tries to handle a message the MessageBus doesn't have any middleware registered at Symfony\Component\Messenger\MessageBus. This is only the case when consuming async. The regular sync messages all get handled correctly.
I've debugged it extensively. The messages are definitely 'consumed', but nothing happens and they're tried over and over.

I have a custom event.bus with async transport configured like this:

# config/packages/messenger.yaml
framework:
    messenger:
        default_bus: command.bus
        buses:
            command.bus:
                ...
            query.bus:
                ...
            event.bus:
                default_middleware: allow_no_handlers
                middleware:
                    - validation
                    - doctrine_transaction

        transports:
            ...
            async:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                serializer: messenger.transport.symfony_serializer
                options:
                   auto_setup: false
            ...
        routing:
            ...
            Application\Event\Async\AsyncEvent: async

And this is in services.yaml:

    Bref\Symfony\Messenger\Service\Sqs\SqsConsumer:
        public: true
        autowire: true
        arguments:
            # Pass the transport name used in config/packages/messenger.yaml
            $transportName: 'async'
            # true enables partial SQS batch failure
            # Enabling this without proper SQS config will consider all your messages successful
            # See https://bref.sh/docs/function/handlers.html#partial-batch-response for more details.
            $partialBatchFailure: false
            $bus: '@event.bus'
            $logger: '@logger'
            $serializer: '@messenger.transport.symfony_serializer'

I'm currently using Symfony 6.3.3 and Bref 1.7.32

Running the consumer locally with the vendor/bin/bref local worker command works perfectly.

Might Bref 2 work? I'll try and upgrade tomorrow.

How to specify different serializer for each Lambda / transport?

We are processing messages from different producers in our Lambdas, so we need different Serializers. This is an example of our definition:

framework:
    messenger:

        transports:
            priority:
                dsn: '%env(xxx)%'
                serializer: messenger.transport.symfony_serializer
            dataUpdate:
                dsn: '%env(xxx)%'
                serializer: App\DataUpdate\DataUpdateMessageSerializer
            priceUpdate:
                dsn: '%env(xxx)%'
                serializer: App\PriceUpdate\PriceUpdateMessageSerializer

According to the documentation, I tried to specify a serializer for SqsConsumer. But how to add them there? It is possible to specify only 1 serializer there.

SQS queue : Connection setup error

Hi ! ✋

I'm trying to use SQS queue and I keep getting the same error :

I'm using API platform based on symfony then i'm using bref configuration for symfony.

The Amazon SQS queue "heroad-sqs-dev-Queue-10KLY4X240HKO" does not exists (or you don't have permissions on it), and can't be created when an account is provided.'

To explain what i want to do :

I have my main app ' web ' wich is an API i want to call a SQS queue to do a long task because the 29 second timeout is not suffisent for my task.

What i'm expecting is when i call my API from this special endpoint my worker will do the job in background without that 29 second timeout and sending me a notification when the task is finished.

here is my simple configuration in my serverless.yml :

provider:
    name: aws
    region: eu-west-3
    runtime: provided.al2
    environment:
        APP_ENV: stagging
        
resources:
    Resources:
        Queue:
            Type: AWS::SQS::Queue
            Properties:
                # This needs to be at least 6 times the lambda function's timeout
                # See https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html
                VisibilityTimeout: '960'
                RedrivePolicy:
                    deadLetterTargetArn: !GetAtt DeadLetterQueue.Arn
                    # Jobs will be retried 5 times
                    # The number needs to be at least 5 per https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html
                    maxReceiveCount: 5
        # The dead letter queue is a SQS queue that receives messages that failed to be processed
        DeadLetterQueue:
            Type: AWS::SQS::Queue
            Properties:
                # Messages are stored up to 14 days (the max)
                MessageRetentionPeriod: 1209600

plugins:
    - ./vendor/bref/bref

package:
    exclude:
        - node_modules/**
        - tests/**
        - var/**
        - public/build/**'
            
functions:
    # This function runs the Symfony website/API
    web:
        handler: public/index.php
        timeout: 28 # in seconds (API Gateway has a timeout of 29 seconds)
        layers:
            - ${bref:layer.php-74-fpm}
        events:
            -   httpApi: '*'

    worker:
        handler: bin/consumer.php
        timeout: 20 # in seconds
        reservedConcurrency: 5 # max. 5 messages processed in parallel
        layers:
            - ${bref:layer.php-74}
        events:
            # Read more at https://www.serverless.com/framework/docs/providers/aws/events/sqs/
            - sqs:
                  arn: arn:aws:sqs:eu-west-3:508623790554:heroad-sqs-dev-Queue-10KLY4X240HKO
                  # Only 1 item at a time to simplify error handling
                  batchSize: 1

I'm using the same worker for symfony 5.1 because I'm on 5.3

My messenger.yaml :

framework:
    messenger:
        transports:
            async: '%env(MESSENGER_TRANSPORT_DSN)%'
        routing:
            # async is whatever name you gave your transport above
            'App\Message\GeneratePlanning': async

in my .env.stagging I've

MESSENGER_TRANSPORT_DSN=https://sqs.eu-west-3.amazonaws.com/508623790554/heroad-sqs-dev-Queue-10KLY4X240HKO

And finaly my bus dispatch in an API endpoint controller

    /**
     * @Route("/generate", methods={"GET"})
     * @return JsonResponse|Response
     * @throws \Exception
     */
    public function generate(MessageBusInterface $bus)
    {
        $bus->dispatch(new GeneratePlanning('hello world'));

        return new JsonResponse(['success' => 'success'], 201);

    }

First break at : vendor/symfony/amazon-sqs-messenger/Transport/Connection.php (line 276)

Road to 1.0

The package is stable now, for 1.0 let's cleanup the docs and focus the package on 1 main way:

  • SQS and Lift by default
  • document alternative approaches (e.g. EventBridge, SNS…), possibly in separate pages

Could we cleanup options and enable a few things by default too? (like partial batch failures)

Also might be worth exploring possibilities of respecting Symfony Messenger features (like retries) instead of relying on SQS retry config?

Not found bootstrap.php file

In the document says consumer.php has this require
require dirname(__DIR__) . '/config/bootstrap.php';

But I can't find bootstrap.php file in config folder.

Invoke error

With a setup using a function to use SNS, I am getting the following error:

{
    "errorType": "Symfony\\Component\\Messenger\\Exception\\MessageDecodingFailedException",
    "errorMessage": "Could not decode message using PHP serialization: {\"notificationType\":\"Bounce\",\"bounce\":{\"feedbackId\":\"01070177b7c56aa1-117088ce-e70c-42c6-9120-42aacdde7907-000000\",\"bounceType\":\"Permanent\",\"bounceSubType\":\"General\",\"bouncedRecipients\":[{\"emailAddress\":\"[email protected]\",\"action\":\"failed\",\"status\":\"5.1.1\",\"diagnosticCode\":\"smtp; 550 5.1.1 user unknown\"}],\"timestamp\":\"2021-02-19T00:51:44.000Z\",\"remoteMtaIp\":\"3.226.40.239\",\"reportingMTA\":\"dsn; b224-13.smtp-out.eu-central-1.amazonses.com\"},\"mail\":{\"timestamp\":\"2021-02-19T00:51:42.822Z\",\"source\":\"[email protected]\",\"sourceArn\":\"arn:aws:ses:eu-central-1:544387708820:identity/[email protected]\",\"sourceIp\":\"178.132.211.234\",\"sendingAccountId\":\"544387708820\",\"messageId\":\"01070177b7c56466-279ebebc-6da2-4a1c-af23-b78eac23a6da-000000\",\"destination\":[\"[email protected]\"]}}.",
    "stack": [
        "#0 /var/task/vendor/symfony/messenger/Transport/Serialization/PhpSerializer.php(38): Symfony\\Component\\Messenger\\Transport\\Serialization\\PhpSerializer->safelyUnserialize()",
        "#1 /var/task/vendor/bref/symfony-messenger/src/Service/Sns/SnsConsumer.php(40): Symfony\\Component\\Messenger\\Transport\\Serialization\\PhpSerializer->decode()",
        "#2 /var/task/vendor/bref/bref/src/Event/Sns/SnsHandler.php(18): Bref\\Symfony\\Messenger\\Service\\Sns\\SnsConsumer->handleSns()",
        "#3 /var/task/vendor/bref/bref/src/Runtime/Invoker.php(29): Bref\\Event\\Sns\\SnsHandler->handle()",
        "#4 /var/task/vendor/bref/bref/src/Runtime/LambdaRuntime.php(102): Bref\\Runtime\\Invoker->invoke()",
        "#5 /opt/bref/bootstrap.php(43): Bref\\Runtime\\LambdaRuntime->processNextEvent()",
        "#6 {main}"
    ]
}

Any clue of what I'm doing wrong?

Here's some more information about the application:

config/packages/messenger.yaml:

framework:
    messenger:
        transports:
            async: '%env(MESSENGER_TRANSPORT_DSN)%'
        routing:
            'App\Message\SesMessage': async

src/Message/SesMessage.php

<?php
namespace App\Message;

class SesMessage
{
    private $content;

    public function __construct(string $content)
    {
        $this->content = $content;
    }

    public function getContent(): string
    {
        return $this->content;
    }
}

src/MessageHandler/SesNotificationHandler.php

<?php

namespace App\MessageHandler;

use App\Message\SesMessage;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;

class SesNotificationHandler implements MessageHandlerInterface
{
    public function __invoke(SesMessage $message)
    {

    }
}

bin/consumer.php

<?php declare(strict_types=1);

use App\Kernel;
use Bref\Symfony\Messenger\Service\Sns\SnsConsumer;
use Symfony\Component\Dotenv\Dotenv;

require dirname(__DIR__) . '/vendor/autoload.php';

(new Dotenv())->bootEnv(dirname(__DIR__) . '/.env');

$kernel = new Kernel($_SERVER['APP_ENV'], (bool)$_SERVER['APP_DEBUG']);
$kernel->boot();

// Return the Bref consumer service
return $kernel->getContainer()->get(SnsConsumer::class);

serverless.yml

...

functions:
  worker:
    handler: bin/consumer.php
    timeout: 20
    reservedConcurrency: 5 
    layers:
      - ${bref:layer.php-74}
    events:
      - sns:
          arn: arn:aws:sns:eu-central-1:1234567890:Test

Sending message to a custom event bus

Looking through EventBridgeTransport.php it appears it does not allow you to send events to a custom event bus.
[
'Detail' => json_encode($encodedMessage, JSON_THROW_ON_ERROR),
// Ideally here we could put the class name of the message, but how to retrieve it?
'DetailType' => 'Symfony Messenger message',
'Source' => $this->source,
]

Is this the case...or am I missing something.

Question about the DetailType parameter

In the send method of the EventBridgeTransport class I noticed that we don't have control over the DetailType parameter, and this parameter is very important in defining the EventBridge rules.

I noticed it's not on version 1.0 yet, I'd like to know what your plans are.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.