Giter Site home page Giter Site logo

zio-sqs's Introduction

ZIO SQS

ZIO SQS is a ZIO-powered client for AWS SQS. It is built on top of the AWS SDK for Java 2.0 via the automatically generated wrappers from zio-aws.

Production Ready CI Badge Sonatype Releases Sonatype Snapshots javadoc ZIO SQS

Introduction

ZIO SQS enables us to produce and consume elements to/from the Amazon SQS service. It is integrated with ZIO Streams, so we can produce and consume elements in a streaming fashion, element by element or micro-batching.

Installation

In order to use this library, we need to add the following line in our build.sbt file:

libraryDependencies += "dev.zio" %% "zio-sqs" % "0.5.0"

Example

In this example we produce a stream of events to the MyQueue and then consume them from that queue:

import zio._
import zio.aws.core.config.AwsConfig
import zio.aws.netty.NettyHttpClient
import zio.aws.sqs.Sqs
import zio.sqs.producer.{Producer, ProducerEvent}
import zio.sqs.serialization.Serializer
import zio.sqs.{SqsStream, SqsStreamSettings, Utils}
import zio.stream.ZStream

object ProducerConsumerExample extends ZIOAppDefault {
  val queueName = "MyQueue"

  val stream: ZStream[Any, Nothing, ProducerEvent[String]] =
    ZStream.iterate(0)(_ + 1).map(_.toString).map(ProducerEvent(_))

  val program: ZIO[Sqs, Throwable, Unit] = for {
    _        <- Utils.createQueue(queueName)
    queueUrl <- Utils.getQueueUrl(queueName)
    producer  = Producer.make(queueUrl, Serializer.serializeString)
    _        <- ZIO.scoped(producer.flatMap(_.sendStream(stream).runDrain))
    _        <- SqsStream(
                  queueUrl,
                  SqsStreamSettings(stopWhenQueueEmpty = true, waitTimeSeconds = Some(3))
                ).foreach(msg => Console.printLine(msg.body))
  } yield ()

  def run =
    program.provide(
      Sqs.live,
      AwsConfig.default,
      NettyHttpClient.default
    )
}

Documentation

Learn more on the ZIO SQS homepage!

Contributing

For the general guidelines, see ZIO contributor's guide.

Code of Conduct

See the Code of Conduct

Support

Come chat with us on Badge-Discord.

License

License

zio-sqs's People

Contributors

adamgfraser avatar aquamatthias avatar darl avatar earldouglas avatar gchudnov avatar ghostdogpr avatar github-actions[bot] avatar khajavi avatar ktfleming avatar kubukoz avatar lmlynik avatar migiside avatar mijicd avatar pandaforme avatar rstradling avatar scala-steward avatar scottweaver avatar softinio avatar solicode avatar sullis avatar svroonland avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

zio-sqs's Issues

Missing symbol type for newer zio-aws versions

zio-sqs with newer version of zio-aws (e.g. 3.17.100.3)

Symbol 'type io.github.vigoo.zioaws.sqs.model.Message.ReadOnly' is missing from the classpath.
[error] This symbol is required by 'value zio.sqs.SqsStream.msg'.
[error] Make sure that type ReadOnly is in your classpath and check for conflicting dependencies with `-Ylog-classpath`.
[error] A full rebuild may help if 'SqsStream.class' was compiled against an incompatible version of io.github.vigoo.zioaws.sqs.model.Message.

Upgrade to ZIO 2.0

ZIO 2.0 is at Milestone 4, with an RC expected in the next few weeks.
https://github.com/zio/zio/releases/tag/v2.0.0-M4

The API is nearly stable at this point, so any early migration work against this version should pay off towards the official 2.0 release.

The progress is being tracked here:
zio/zio#5470

The Stream Encoding work in progress is the only area where the API might still change before the RC.

We are actively working on a ScalaFix rule that will cover the bulk of the simple API changes:
https://github.com/zio/zio/blob/series/2.x/scalafix/rules/src/main/scala/fix/Zio2Upgrade.scala
We highly recommend starting with that, and then working through any remaining compilation errors :)

To assist with the rest of the migration, we have created this guide:
https://zio.dev/howto/migrate/zio-2.x-migration-guide/

If you would like assistance with the migration from myself or other ZIO contributors, please let us know!

Default value for visibilityTimeout in SqsStreamSettings should be set to None

It's misleading because if visibilityTimeout is not overriden to None we can't use the value configured globally on the queue. And since it has a default value, it's easy to forget that this field has been set.

I think it will avoid some future suprises if visibilityTimeout is set to None โ†’ https://github.com/zio/zio-sqs/blob/master/src/main/scala/zio/sqs/SqsStreamSettings.scala#L9

Especially visibilityTimeout has a value of 30 which is the AWS default value so it doesn't provides much value.

The other default values are fine to me because they affect only the consumer behaviour.

Migrate tests to ZIO Test

Replace scalatest by ZIO Test. Might also be possible to replace randomdatagenerator using ZIO Test generators.

At least once semantics

I'm looking to migrate to zio-sqs, but right now as far as I can tell there are 2 main modes of operation:

  • autoDelete=true: This is basically at most once semantics. If your handler dies or there is a hard crash, the message you were in the middle of handling is lost forever.
  • autoDelete=false: Manual mode. You have to ensure to call deleteMessage and so on properly yourself.

I think it would be nice if there was something similar to Alpakka's SqsAckSink:
https://doc.akka.io/docs/alpakka/current/sqs.html#updating-message-statuses

Basically through the type system to ensure that you handle what to do with a message after your handler has run. This gives you at least once semantics (or if you ensure your handler is idempotent, then it's effectively exactly once semantics).

A sealed trait with the following cases would cover everything I believe:

MessageAction

  • Done / Delete
  • Skip / Ignore
  • RetryLater(visibilityTimeout) / ChangeMessageVisibility(visibilityTimeout)

Not sure on the specific naming to use, but that's the basic idea.

Is this something that makes sense for zio-sqs?

Issue with message attributes

Hi,
I have consume method like:

 private def consume(queueUrl: String) =
        SqsStream(
          queueUrl,
          SqsStreamSettings(
            stopWhenQueueEmpty = false,
            waitTimeSeconds = Some(3),
            visibilityTimeout = Some(sqsConfig.visibilityTimeout),
            autoDelete = false,
            messageAttributeNames = List("TEST", AWSTraceHeader.unwrap.toString)
          )
        ).mapError(AwsError.fromThrowable)

and I send message to the queue:

aws sqs send-message --endpoint-url "https://sqs.eu-west-2.amazonaws.com/666"
--queue-url "https://sqs.eu-west-2.amazonaws.com/666/queue-name"
--message-body '{
"event_type": "create",
"user_id": "666"
}' --message-attributes "AWSTraceHeader={StringValue=TestTenant,DataType=String}"

and message.attributes gives me an empty map.. what I do wrong ? thanks!

Release for the latest ZConfig

Hello.

I see there have been no releases since last year. The other ZIO libs have quite advanced, because of that it's difficult to use latest release of zio-sqs together with latest ZConfig eg. Is there a plan to release zio-sqs with the most recent updates? Everything seems to be up to date in master thanks to Steward.

Thanks!

SqsProducer stops working after error from SQS

Hey. We are using zio-sqs version 0.3.2 to receive and produce sqs messages.
For the sending part we see a strange behaviour, in case SQS is not reachable or rejects a request:

The sending fails (which is correct) but the producer is not usable afterwards.
All messages that get produced afterwards are stuck forever (fiber does not return).

Since the code is not very complex I pasted it here.
Are we doing anything wrong, or what could cause this issue?

The producer that we use looks like this:

trait SqsProducer {
  def produce(message: ProducerEvent[String]): IO[Error, ProducerEvent[String]]
}
object SqsProducer {
  def createProducer(
      queueName: String,
      sqsConfig: SqsConfig,
      producerSettings: ProducerSettings
  ): ZManaged[Clock, Throwable, SqsProducer] = {
    def requestQueueUrl(sqs: SqsAsyncClient) =
      ZIO.fromCompletionStage(sqs.getQueueUrl(GetQueueUrlRequest.builder().queueName(queueName).build()))

    for {
      sqs      <- Sqs.fromConfig(sqsConfig)
      queueUrl <- ZManaged.fromEffect(requestQueueUrl(sqs))
      producer <- Producer.make(sqs, queueUrl.queueUrl(), Serializer.serializeString, producerSettings)
    } yield new SqsProducer {
      override def produce(message: ProducerEvent[String]): IO[Error, ProducerEvent[String]] =
        producer.produce(message).mapError(Error.SqsAccessError)
    }
  }
}

val producer: SqsProducer = ... 
producer.produce(event) 
// issue with SQS --> fails with SqsAccessError
producer.produce(anotherEvent)
// fiber never returns

Parallel consumers from 1 queue

Is there an option to run multiple consumers from 1 queue ?
( Or it can be achieved only by running multiple streams from 1 queue and set proper VisibilityTimeout to avoid duplicates. )

thanks!

GenericAwsError when trying to connecting to Sqs

Hello,

When trying to connect to amazon Sqs queue with a call encapsulated in a ZLayer, I received this error GenericAwsError(software.amazon.awssdk.core.exception.SdkClientException: Unable to execute HTTP request: event executor terminated)

The same call done directly works.

I attached a small test.tar.gz sbt app with both examples.

Thanks

Make the SqsStream infinite

The current signature of the SqsStream.apply is

  def apply(
    queueUrl: String,
    settings: SqsStreamSettings = SqsStreamSettings()
  ): ZStream[Sqs, Throwable, Message.ReadOnly]

The stream stops after the first exception that is received in the sqs call. The exceptions can be intermittent, for example,

software.amazon.awssdk.core.exception.SdkClientException: Received an UnknownHostException when attempting to interact with a service. See cause for the exact endpoint that is failing to resolve. If this is happening on an endpoint that previously worked, there may be a network connectivity issue or your DNS cache could be storing endpoints for too long.
....

The next call to sqs after such an exception would be successful.

In most situations (at least those that I've seen) the sqs is supposed to be consumed infinitely. The exceptions are logged and retried.

With the current SqsStream.apply there could be a couple of solutions to achieve the infinite stream:

  1. restart the program when the stream fails
SqsStream(???, ???)
      .runDrain
      .exitCode

In environments like ElasticBeanstalk it's not ideal because the ec2 restart is quite slow.
2) consume a new same stream when the current stream fails

          def infiniteStream(): ZStream[Sqs, Nothing, Message] = {
            SqsStream(???, ???).catchAll(_ => infiniteStream)
          }

In both situations, some additional code is required to continue consuming the messages.

My proposal is to have an additional method in the zio-sqs SqsStream, so that the library provides a short solution for an infinite sqs stream.

Its signature would be

object SqsStream {
...

  def infinite(
    queueUrl: String,
    settings: SqsStreamSettings = SqsStreamSettings()
  ): ZStream[Sqs, Nothing, Either[Throwable, Message.ReadOnly]] = ???
}

If this proposal sounds reasonable, I can provide a pr.

No way to fall back to queue's settings for visibilityTimeout and waitTimeSeconds

These two settings are also present on SQS queues themselves, and if a ReceiveMessages call doesn't include them, the queue's values will be used. Currently in zio-sqs, it seems like it's not possible to fall back to the queue's settings -- you can either provide explicit values, or if you don't provide anything, it will use default settings of 30 for visibilityTimeout and 20 for waitTimeSeconds.

Perhaps by making these settings Options, a None value could mean that they won't be set on the ReceiveMessageRequest, and the queue's settings will be used?

For references, these are the only pages so far that I've been able to find in AWS docs that describes this behavior...
For visibilityTimeout: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_ReceiveMessage.html
For waitTimeSeconds: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-short-and-long-polling.html

Meta data

Is it possible to add some meta data using this library ?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.