Giter Site home page Giter Site logo

fs2-aws's Introduction

fs2-aws

Build Release Maven Central Coverage Status

fs2 Streaming utilities for interacting with AWS

Scope of the project

fs2-aws provides an fs2 interface to AWS services

The design goals are the same as fs2:

compositionality, expressiveness, resource safety, and speed

Using:

Find the latest release version and add the following dependency:

libraryDependencies +=  "io.laserdisc" %% "fs2-aws" % "VERSION"

S3

The module fs2-aws-s3 provides a purely functional API to operate with the AWS-S3 API. It defines four functions:

trait S3[F[_]] {
  def delete(bucket: BucketName, key: FileKey): F[Unit]
  def uploadFile(bucket: BucketName, key: FileKey): Pipe[F, Byte, ETag]
  def uploadFileMultipart(bucket: BucketName, key: FileKey, partSize: PartSizeMB): Pipe[F, Byte, ETag]
  def readFile(bucket: BucketName, key: FileKey): Stream[F, Byte]
  def readFileMultipart(bucket: BucketName, key: FileKey, partSize: PartSizeMB): Stream[F, Byte]
}

You can find out more in the scaladocs for each function, but as a rule of thumb for:

  • Small files: use readFile and uploadFile.
  • Big files: use readFileMultipart and uploadFileMultipart.

You can also combine them as you see fit. For example, use uploadFileMultipart and then read it in one shot using readFile.

Getting started with the S3 module

In order to create an instance of S3 we need to first create an S3Client. Here's an example of the former:

def s3StreamResource: Resource[IO, S3AsyncClientOp[IO]] =
  for {
    credentials = AwsBasicCredentials.create("accesskey", "secretkey")
    port        = 4566
    s3 <- S3Interpreter[IO](blocker).S3AsyncClientOpResource(
      S3AsyncClient
        .builder()
        .credentialsProvider(StaticCredentialsProvider.create(credentials))
        .endpointOverride(URI.create(s"http://localhost:$port"))
        .region(Region.US_EAST_1)
    )
  } yield s3

Now we can create our S3[IO] instance:

s3StreamResource.map(S3.create[IO]).use { s3 =>
  // do stuff with s3 here (or just share it with other functions)
}

Create it once and share it as an argument, as any other resource.

For more details on how to work with S3 streams follow link

Reading a file from S3

The simple way:

s3.readFile(BucketName("test"), FileKey("foo"))
  .through(fs2.text.utf8Decode)
  .through(fs2.text.lines)
  .evalMap(line => IO(println(line)))

The streaming way in a multipart fashion (part size is indicated in MBs and must be 5 or higher):

s3.readFileMultipart(BucketName("test"), FileKey("foo"), partSize = 5)
  .through(fs2.text.utf8Decode)
  .through(fs2.text.lines)
  .evalMap(line => IO(println(line)))

Writing to a file in S3

The simple way:

Stream.emits("test data".getBytes("UTF-8"))
  .through(s3.uploadFile(BucketName("foo"), FileKey("bar"))
  .evalMap(t => IO(println(s"eTag: $t")))

The streaming way in a multipart fashion. Again, part size is indicated in MBs and must be 5 or higher.

Stream.emits("test data".getBytes("UTF-8"))
  .through(s3.uploadFileMultipart(BucketName("foo"), FileKey("bar"), partSize = 5))
  .evalMap(t => IO(println(s"eTag: $t")))

Deleting a file in S3

There is a simple function to delete a file.

s3.delete(BucketName("foo"), FileKey("bar"))

Kinesis

Streaming records from Kinesis with KCL

Example using IO for effects (any monad F <: ConcurrentEffect can be used):

val stream: Stream[IO, CommittableRecord] = readFromKinesisStream[IO]("appName", "streamName")

There are a number of other stream constructors available where you can provide more specific configuration for the KCL worker.

Testing

TODO: Implement better test consumer

For now, you can stub CommitableRecord and create a fs2.Stream to emit these records:

val record = new Record()
  .withApproximateArrivalTimestamp(new Date())
  .withEncryptionType("encryption")
  .withPartitionKey("partitionKey")
  .withSequenceNumber("sequenceNum")
  .withData(ByteBuffer.wrap("test".getBytes))

val testRecord = CommittableRecord(
  "shardId0",
  mock[ExtendedSequenceNumber],
  0L,
  record,
  mock[RecordProcessor],
  mock[IRecordProcessorCheckpointer])

Checkpointing records

Records must be checkpointed in Kinesis to keep track of which messages each consumer has received. Checkpointing a record in the KCL will automatically checkpoint all records upto that record. To checkpoint records, a Pipe and Sink are available. To help distinguish whether a record has been checkpointed or not, a CommittableRecord class exists to denote a record that hasn't been checkpointed, while the base Record class denotes a committed record.

readFromKinesisStream[IO]("appName", "streamName")
  .through(someProcessingPipeline)
  .to(checkpointRecords_[IO]())

Publishing records to Kinesis with KPL

A Pipe and Sink allow for writing a stream of tuple2 (partitionKey, ByteBuffer) to a Kinesis stream.

Example:

Stream("testData")
  .map { d => ("partitionKey", ByteBuffer.wrap(d.getBytes))}
  .to(writeToKinesis_[IO]("streamName"))

AWS credential chain and region can be configured by overriding the respective fields in the KinesisProducerClient parameter to writeToKinesis. Defaults to using the default AWS credentials chain and us-east-1 for region.

Use with LocalStack

In some situations (e.g. local dev and automated tests), it may be desirable to be able to consume from and publish to Kinesis running inside LocalStack. Ensure that the endpoint setting is set correctly (e.g. http://localhost:4566) and that retrievalMode is set to Polling (LocalStack doesn't support FanOut).

Kinesis Firehose

TODO: Stream get data, Stream send data

SQS

Example

implicit val messageDecoder: Message => Either[Throwable, Quote] = { sqs_msg =>
    io.circe.parser.decode[Quote](sqs_msg.asInstanceOf[TextMessage].getText)
}
fs2.aws
      .sqsStream[IO, Quote](
        sqsConfig,
        (config, callback) => SQSConsumerBuilder(config, callback))
      .through(...)
      .compile
      .drain
      .as(ExitCode.Success)

Testing

//create stream for testing
def stream(deferredListener: Deferred[IO, MessageListener]) =
            aws.testkit
              .sqsStream[IO, Quote](deferredListener)
              .through(...)
              .take(2)
              .compile
              .toList

//create the program for testing the stream
import io.circe.fs2.aws.examples.syntax._
import io.circe.generic.auto._
val quote = Quote(...)
val program : IO[List[(Quote, MessageListener)]] = for {
            d <- Deferred[IO, MessageListener]
            r <- IO.racePair(stream(d), d.get).flatMap {
              case Right((streamFiber, listener)) =>
                //simulate SQS stream fan-in here
                listener.onMessage(new SQSTextMessage(Printer.noSpaces.pretty(quote.asJson)))
                streamFiber.join
              case _ => IO(Nil)
            }
          } yield r

//Assert results
val result = program
            .unsafeRunSync()
result should be(...)

TODO: Stream send SQS messages

Support

YourKit Image

This project is supported by YourKit with monitoring and profiling Tools. YourKit supports open source with innovative and intelligent tools for monitoring and profiling Java and .NET applications. YourKit is the creator of YourKit Java Profiler, YourKit .NET Profiler, and YourKit YouMonitor.

fs2-aws's People

Contributors

barryoneill avatar busybyte avatar chuwy avatar custommonkey avatar daddykotex avatar daenyth avatar damienoreilly avatar danicheg avatar dmateusp avatar fredshonorio avatar gvolpe avatar iivat avatar jarrodcodes avatar jatcwang avatar johng84 avatar kiambogo avatar mattkohl avatar mmienko avatar scala-steward avatar semenodm avatar simy4 avatar sullis avatar toddburnside avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar

fs2-aws's Issues

DynamoDB Stream Consumer Race Condition with `shardEnded`

I'm using fs2.aws.dynamodb.package$#readFromDynamDBStream to read from a ddb stream, and fs2.aws.dynamodb#checkpointRecords to checkpoint the records after processing.

The following problem arises when there is a delay (in my case, intentional) in checkpointing the records, and the software.amazon.kinesis.processor.ShardRecordProcessor#shardEnded event fires. fs2_aws correctly checkpoints the end of the shard, but then subsequent checkpoints of the previously emitted events will fail with:

ava.lang.IllegalArgumentException: Could not checkpoint at extended sequence number {SequenceNumber: 554022700000000052561897332,SubsequenceNumber: 0} as it did not fall into acceptable range between the last checkpoint {SequenceNumber: SHARD_END,SubsequenceNumber: 0} and the greatest extended sequence number passed to this record processor {SequenceNumber: SHARD_END,SubsequenceNumber: 0}

We need to maintain a list of all in-flight commitable records, and evict them when fs2-aws checkpoints a shardEnded event.

Stream can hang if there is an error reading a source file from S3

Problem

We have an fs2 job that reads a JSON file from S3 and then processes this file with 30 parallel streams to extract different sets of data to write to 30 parquet files. We find that if there is an error reading the input file, such as the file key being incorrect or the S3 permissions being incorrect, then the job will hang, print no errors, and produce no output.

However, we also find that if we configure the job to produce fewer out files (which results in fewer parallel streams), then the job doesn't hang and an appropriate exception is produce and shows up in the logs.

Work Around

The problem appears to be that when the stream from S3 is read it does not use the Blocker that it should. To work around this we no longer use readS3File, but instead use our own version, which is as follows:

  def readFromS3(bucket: String, key: String, blocker: Blocker, chunkSize: Int)(implicit sync: Sync[IO], shift: ContextShift[IO]): ByteStream = {
    fs2.io.readInputStream[IO](
      blocker.blockOn(IO(AmazonS3ClientBuilder.defaultClient().getObject(new GetObjectRequest(bucket, key)).getObjectContent)),
      chunkSize = chunkSize,
      blocker = blocker,
      closeAfterUse = true)
  }

The first difference from the fs2-aws API is that this readFromS3 method takes a Blocker instead of an ExecutionContext, which seems more correct as we already have a Blocker in context to use for this. The second different from the fs2-aws API is that blocker.blockOn(...) is used to wrap the IO[InputSTream], which is what ultimately appears to fix the hanging problem.

Add Scala 2.13 cross compile

please add scala 2.13 support since all libraries You are using are available with Scala 2.13.

the only issue is the scanamo-circe library with is also under Your maintenance

Regards

s3.listFiles does not manage truncation

In response to a ListObjectsV2Request, if a bucket contains too many objects (more than a 1000 in my experience), AWS automatically truncates the response to a reasonable size (1000, as before) and sends along with it a continuation token to send the next request.

This behaviour is completely ignored by s3.listFiles, leading to an obvious bug while listing crowded buckets.

AWS authentication with web identity token

Hi,
using this library for an app running on EKS with injecting AWS credentials via IRSA, it doesn't seem to read the credentials provided as Web identity Token. Were using the lib fs2.aws.kinesis.publisher.
These are provided via the env vars AWS_ROLE_ARN and AWS_WEB_IDENTITY_TOKEN_FILE.
Can we have support for this authentication method as well, please?
Thanks

v4.0.0-RC1 Availability

Thanks for running this project. Interested to try it out with CE3 and was wondering if v4.0.0-RC1 was available prebuilt anywhere? I didn't see it on maven.

Pipe method signature prevents changing the Stream effect type

Hello! I have a requirement to use fs2-aws-s3 within a tracing context. We are using trace4cats and thus our effect type is Kleisli[F, Span[F], *. For most of our use cases, changing to this effect type has been either built in or easy to add. For fs2, we have the option of using the translate method. However, because the public API for fs2-aws-s3 has a method signature of Pipe[F, Byte, ETag] for the methods of uploadFile and uploadFileMultipart, we cannot apply the translate method and thus cannot easily change the effect type.

A few options would be:

  1. add similar methods that return a Stream
  2. add a new s3 object called s3Streams (or similar) with these methods using shared logic
  3. Add an effect translation method, (i.e. mapK)

With any of these options a new effect type can easily be applied for all methods.
Happy to make a PR with whichever option is decided by the maintainers. Thanks for your time.

Remove logback dependency

My app is using slf4j-simplelogger as a logging backend and I'm getting following warning when running it:

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/Users/antonparkhomenko/Library/Caches/Coursier/v1/https/repo1.maven.org/maven2/org/slf4j/slf4j-simple/1.7.30/slf4j-simple-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/Users/antonparkhomenko/Library/Caches/Coursier/v1/https/repo1.maven.org/maven2/ch/qos/logback/logback-classic/1.2.3/logback-classic-1.2.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.SimpleLoggerFactory]

As I understand logback-classic is the same kind of backend as slf4j-simplelogger and they're both supposed to be facaded by slf4j-api and not included into libs, instead an app developer should be adding it as a dependency.

Need to support non-default CredentialProvider when building KinesisAsyncClient

For apps that need to use assume-role or session credentials (see https://docs.aws.amazon.com/AmazonS3/latest/dev/AuthUsingTempSessionTokenJava.html), the current approach does not work. Some possible solutions:

  1. Make add an optional CredentialsProvider field to fs2.aws.kinesis.KinesisConsumerSettings, and use that in fs2.aws.kinesis.defaultScheduler() when building KinesisAsyncClient.

or 2) Make fs2.aws.kinesis.defaultScheduler() public or protected

or 3) Make fs2.aws.kinesis.readFromKinesisStream() public or protected

Type mismatch when upgrading from 3.0.7 to 3.0.9

Having an S3 client up and running, basically as described in the readme, an upgrade from 3.0.7 to version 3.0.9 leads to the following compile error:

[error]  found   : software.amazon.awssdk.services.s3.S3Client
[error]  required: io.laserdisc.pure.s3.tagless.S3AsyncClientOp[?]
[error]         .eval(S3.create(awsS3SyncClient, blocker))

Unable to connect to S3 and read a file

package fs2
package aws


//import java.util.concurrent.Executors

import cats.effect.{Blocker, ContextShift, IO}
import com.amazonaws.auth.{AWSStaticCredentialsProvider, BasicAWSCredentials}
import com.amazonaws.services.s3.AmazonS3ClientBuilder
import fs2.aws.internal.S3Client
import fs2.aws.s3._
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

import scala.concurrent.ExecutionContext





class S3Spec extends AnyFlatSpec with Matchers {

  //private val blockingEC = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(6))
  implicit val ec: ExecutionContext = ExecutionContext.global
  implicit val ioContextShift: ContextShift[IO] = IO.contextShift(ec)

  implicit val s3Client: S3Client[IO] = fs2.aws.utils.s3TestClient
  val provider = new AWSStaticCredentialsProvider(
    new BasicAWSCredentials("...","...")
  )
  val client = AmazonS3ClientBuilder
    .standard
    .withCredentials(provider)
    .withRegion("eu-west-1") // or whatever  your region is
    .build
  val reader: Stream[IO, Unit] = Stream.resource(Blocker[IO]).flatMap { blocker =>
    readS3FileMultipart[IO]("...", "text", 25, client)
      .through(fs2.text.utf8Decode)
      .through(fs2.text.lines)
      .intersperse("\n")
      .through(text.utf8Encode)
      .through(io.file.writeAll(java.nio.file.Paths.get("output.txt"), blocker))
  }

  reader.compile.drain.unsafeRunSync

} 

getting the following error:

An exception or error caused a run to abort.
java.lang.NullPointerException
at java.io.Reader.<init>(Reader.java:78)
at java.io.InputStreamReader.<init>(InputStreamReader.java:129)
at scala.io.BufferedSource.reader(BufferedSource.scala:26)
at scala.io.BufferedSource.bufferedReader(BufferedSource.scala:27)
at scala.io.BufferedSource.charReader$lzycompute(BufferedSource.scala:37)
at scala.io.BufferedSource.charReader(BufferedSource.scala:35)
at scala.io.BufferedSource.scala$io$BufferedSourcedecachedReader(BufferedSource.scala:64) at scala.io.BufferedSource.mkString(BufferedSource.scala:93) at fs2.aws.utils.packagedecachedReader(BufferedSource.scala:64)atscala.io.BufferedSource.mkString(BufferedSource.scala:93)atfs2.aws.utils.packageanon$1.$anonfun$getObjectContentOrError$1(package.scala:27)

Kinesis - Handle exceptions and keep consuming messages

So we got a Kinesis consumer which is getting messages from a Kinesis stream, decoding them into a custom case class and then we perform some actions according to the information decoded. The method in charge of consuming and decoding the stream is the following:

def consumeCommands: F[Unit] =
      kinesis
        .readFromKinesisStream(consumerSettings)
        .evalMap(cr => decodeAndExecuteCommand(StandardCharsets.UTF_8.decode(cr.record.data()).toString))
        .compile
        .drain

And the methods which decode the message and execute the command are the followings:

def decodeAndExecuteCommand(value: String): F[Unit] =
      decode[KinesisCommand](value) match {
        case Right(command) => logger.info(s"Processing $value") >> executeCommand(command)
        case Left(err)      => logger.error(s"Error decoding the command: $err")
      }

private def executeCommand(command: KinesisCommand): F[Unit] = ???

So, imagine that, for some unexpected reason executeCommand(command: KinesisCommand) raises an exception:

private def executeCommand(command: KinesisCommand): F[Unit] = {
      throw new IllegalStateException("An unexpected problem")
}

How can we handle this exception gracefully in our consumeCommands method so our Kinesis consumer continues consuming messages? I've tried to put some .recoverWith() in several parts of the code in order to recover from the unexpected exception:

def consumeCommands: F[Unit] =
      kinesis
        .readFromKinesisStream(consumerSettings)
        .evalMap(cr => decodeAndExecuteCommand(StandardCharsets.UTF_8.decode(cr.record.data()).toString))
        .recoverWith { case e: Throwable => fs2.Stream.eval(logger.error(e.getMessage)) }
        .compile
        .drain

But the consumer stills gets the exception, shut itself down, and stops consuming new messages:

[io-compute-12] ERROR c.g.m.a.Server - An unexpected problem 
[prefetch-cache-shardId-000000000000-0000] ERROR s.a.k.r.p.PrefetchRecordsPublisher - meat-grinder-scala-commands:shardId-000000000000 :  Unexpected exception was thrown. This could probably be an issue or a bug. Please search for the exception/error online to check what is going on. If the issue persists or is a recurring problem, feel free to open an issue on, https://github.com/awslabs/amazon-kinesis-client. 
java.lang.IllegalStateException: Shutdown has been called on the cache, can't accept new requests.
	at software.amazon.kinesis.retrieval.polling.PrefetchRecordsPublisher.throwOnIllegalState(PrefetchRecordsPublisher.java:283)
	at software.amazon.kinesis.retrieval.polling.PrefetchRecordsPublisher.peekNextResult(PrefetchRecordsPublisher.java:292)
	at software.amazon.kinesis.retrieval.polling.PrefetchRecordsPublisher.drainQueueForRequests(PrefetchRecordsPublisher.java:388)
	at software.amazon.kinesis.retrieval.polling.PrefetchRecordsPublisher$DefaultGetRecordsCacheDaemon.makeRetrievalAttempt(PrefetchRecordsPublisher.java:506)
	at software.amazon.kinesis.retrieval.polling.PrefetchRecordsPublisher$DefaultGetRecordsCacheDaemon.run(PrefetchRecordsPublisher.java:464)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
[io-compute-3] INFO  s.a.k.c.Scheduler - All record processors have been shutdown successfully. 
[io-compute-3] INFO  s.a.k.c.Scheduler - Starting worker's final shutdown. 
[io-compute-3] INFO  s.a.k.m.CloudWatchPublisherRunnable - Shutting down CWPublication thread. 
[cw-metrics-publisher] INFO  s.a.k.m.CloudWatchPublisherRunnable - CWPublication thread finished. 
[io-compute-3] INFO  s.a.k.c.Scheduler - Worker loop is complete. Exiting from worker. 

So, what is the correct strategy to handle the exception and keep the consumer up and getting messages from the stream?

Kind regards!

4.0.0-RC1 and Amazon S3

Apologies if I am behind the times here. I recently upgraded a project of mine to CE3 which uses the excellent fs2-aws-s3 module to stream data from Amazon S3 buckets. In 4.0.0 I only see kinesis-style streaming. Is there a plan to interface with S3 directly or is that now gone in favour of kinesis and should I be looking to move my data from S3 to a kinesis stream? I may well have missed something obvious, if so I would grateful if someone could demonstrate how to pull from S3 using fs2-aws-kinesis.

readS3FileMultipart should throw error on file missing

Originally discovered by @tpon. Currently if invoked with a path to a file that doesn't exist in S3, an empty stream will be returned. Up for discussion, but I feel like an exception should be thrown so the user is aware that the file doesn't exist.

should S3 create method require Concurrent?

hello,
i was playing with Doobie and streaming to S3 and had some problems of making it work cause i would like to work within the doobie effect ConnectionIO which does not implement Concurrent while the create method of fs2.aws.s3.S3 does require it.

I dont see why (yet?)

    def create[F[_]: Async: Concurrent](s3: S3AsyncClientOp[F]): F[S3[F]] 

would the following be sufficient (at least it compiles)?

    def create[F[_]: Async](s3: S3AsyncClientOp[F]): F[S3[F]] 

S3 upload should make it easy to set properties on upload requests

Problem

We have an fs2 job that runs in one AWS account and must upload files to a separate AWS account. To make sure the ownership/permissions of the files are correct we must set bucket owner full control on the upload request. However, the fs2-aws API makes this tricky to do.

Work Around

To set this property we extend the AbstractAmazonS23 class and implement just enough of the API to support uploading through fs2-aws. The code looks something like this:

object S3ClientWithCannedAcl extends AbstractAmazonS3 {

  private val client = AmazonS3ClientBuilder.defaultClient()

  override def initiateMultipartUpload(request: InitiateMultipartUploadRequest): InitiateMultipartUploadResult = {
    request.setCannedACL(CannedAccessControlList.BucketOwnerFullControl)
    client.initiateMultipartUpload(request)
  }

  override def uploadPart(request: UploadPartRequest): UploadPartResult = {
    client.uploadPart(request)
  }

 override def completeMultipartUpload(request: CompleteMultipartUploadRequest): CompleteMultipartUploadResult = {
    client.completeMultipartUpload(request)
  }
}

fs2.aws.s3.uploadS3FileMultipart[IO](bucket, key, chunkSize, amazonS3 = S3ClientWithCannedAcl)

It would be great if the fs2-aws API allowed for setting properties like this in a more direct way.

Unused values in `KinesisSettings`

The maxConcurrency, stsAssumeRole, and endpoint values in KinesisSettings are not used anywhere else in the code. It looks like when they were added they were each used for building a KinesisAsyncClient in consumer.scala:

However it looks like consumer.scala was removed as part of the cats-effect 3 upgrade. Would it make sense to remove these values as well? Or would it make more sense to restore some functionality to build a kinesis client given some KinesisSettings?

S3 consumer stream always throws an error at EOF

The requested range is not satisfiable (Service: Amazon S3; Status Code: 416; Error Code: InvalidRange

<Error><Code>InvalidRange</Code><Message>The requested range is not satisfiable</Message><RangeRequested>bytes=263-1263</RangeRequested><ActualObjectSize>263</ActualObjectSize>

Readme.MD examples do not compile

From README.md:

Stream.emits("test data".getBytes("UTF-8"))
  .through(s3.uploadFileMultipart(BucketName("foo"), FileKey("bar"), partSize = 5))
  .evalMap(t => IO(println(s"eTag: $t")))

Does not work anymore, since 5 is an Int, and not a PartSizeMB.
Any pointers on how to create an instance of PartSizeMB?

Update:
Fixed by copying the definition of PartSizeMB to my codebase, but I'd rather not import it if there's a better way

Publish fs2-aws-sqs for 3.x

I might be missing something, but when I add fs2-aws:3.0.2 as a dependency I don't have fs2.aws.sqs package.

Also don't see AWS SQS runtime dependency on Maven, though 2.29.0 works as expected.

S3 uploadFileMultipart fails with empty input stream

WHAT IS WRONG

I use uploadFileMultipart to stream serialized database transactions to an S3 file. But, in the edge case that my DB query produces no results, I get the following error:

The XML you provided was not well-formed or did not validate against our published schema (Service: S3, Status Code: 400, Request ID: AXVBZSEAM6S8WJR7, Extended Request ID: luAacBiEwX5Xt8rpN2BvoNdaub+Q3phQTKal5flI0X5GedYxgIRONNXXp77V+6Yd7EZw0PL20t4=)

WHAT IS EXPECTED

I think that the most sensible thing in this case is to just create an empty file.

Kinesis consumer returns `StreamName should not be empty`

Problem

After upgrading to v3.0.9 using readFromKinesisStream with the default FanOut retrieval mode returns the following error. The streamName in KinesisConsumerSettings is set properly.

[info] ERROR 2021-03-01 15:09:49,630 s.a.kinesis.coordinator.Scheduler - Worker.run caught exception, sleeping for 1000 milli seconds!
[info] java.lang.NullPointerException: StreamName should not be empty
[info] 	at software.amazon.awssdk.utils.Validate.notEmpty(Validate.java:290)
[info] 	at software.amazon.kinesis.common.StreamIdentifier.singleStreamInstance(StreamIdentifier.java:85)
[info] 	at software.amazon.kinesis.retrieval.fanout.FanOutRetrievalFactory.createGetRecordsCache(FanOutRetrievalFactory.java:63)
[info] 	at software.amazon.kinesis.coordinator.Scheduler.buildConsumer(Scheduler.java:915)
[info] 	at software.amazon.kinesis.coordinator.Scheduler.createOrGetShardConsumer(Scheduler.java:890)
[info] 	at software.amazon.kinesis.coordinator.Scheduler.runProcessLoop(Scheduler.java:416)
[info] 	at software.amazon.kinesis.coordinator.Scheduler.run(Scheduler.java:325)
[info] 	at fs2.aws.kinesis.consumer$.$anonfun$readChunksFromKinesisStream$7(consumer.scala:239)

The application tries to read from one stream which has one shard.
Switching to Polling works fine.

SqsConfig class is misplaced, it seems

The source is located under fs2.aws.sqs folder structure but the actual class has sqs package declared.

I think the class was misplaced. I wonder if you're open to put it back on the right place?

Can you provide an example?

This looks like a very interesting library. Can you please provide an example. I have to stream content of GZ files from S3 and I will be able to get started if there was an example of how to use this library.

Documentation / comments

Open to suggestions here!

We need to work on the README for sure, maybe some Scaladocs and a github page for the package.

Too many dependencies

Right now, my build file looks like this:

"io.laserdisc"         %% "fs2-aws"       % "2.28.39" excludeAll(
        ExclusionRule("com.amazonaws", "aws-java-sdk-kinesis"),
        ExclusionRule("com.amazonaws", "aws-java-sdk-sqs"),
        ExclusionRule("com.amazonaws", "amazon-kinesis-producer"),
        ExclusionRule("software.amazon.kinesis", "amazon-kinesis-client"),
        ExclusionRule("software.amazon.awssdk", "sts"),
        ExclusionRule("com.amazonaws", "aws-java-sdk-sqs"),
        ExclusionRule("com.amazonaws", "amazon-sqs-java-messaging-lib"),
      ),

...because all I'm interested in is the S3 multipart upload logic functionality.

Is it possible to split s3, kinesis, sts, and sqs into distinct modules so that we're not otherwise forced to download this lot and a whole forest of transitive dependencies unless we actually use them?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.