Giter Site home page Giter Site logo

akka / alpakka Goto Github PK

View Code? Open in Web Editor NEW
1.3K 56.0 647.0 94.39 MB

Alpakka is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Akka.

Home Page: https://doc.akka.io/docs/alpakka/current/

License: Other

Scala 83.08% Java 16.84% Shell 0.08%
integration reactive akka akka-stream akka-streams-connectors reactive-streams messaging hacktoberfest

alpakka's Introduction

Alpakka

Systems don't come alone. In the modern world of microservices and cloud deployment, new components must interact with legacy systems, making integration an important key to success. Reactive Streams give us a technology-independent tool to let these heterogeneous systems communicate without overwhelming each other.

The Alpakka project implements stream-aware & reactive integration pipelines for Java and Scala. It is built on top of Akka Streams, and has been designed from the ground up to understand streaming natively and provide a DSL for reactive and stream-oriented programming, with built-in support for backpressure. Akka Streams is a Reactive Streams and JDK 9+ java.util.concurrent.Flow-compliant implementation and therefore fully interoperable with other implementations.

The Akka family of projects is managed by teams at Lightbend with help from the community.

Documentation

To keep up with the latest Alpakka releases check out Alpakka releases and Alpakka Kafka connector releases.

Community

You can join these forums and chats to discuss and ask Akka and Alpakka related questions:

In addition to that, you may enjoy the following:

Contributing

Lightbend is committed to Alpakka and has an Alpakka team working on it.

Contributions are very welcome! The Alpakka team appreciates community contributions by both those new to Alpakka and those more experienced. Alpakka depends on the community to keep up with the ever-growing number of technologies with which to integrate. Please step up and share the successful Akka Stream integrations you implement with the Alpakka community.

If you find an issue that you'd like to see fixed, the quickest way to make that happen is to implement the fix and submit a pull request.

Refer to the CONTRIBUTING.md file for more details about the workflow, and general hints on how to prepare your pull request. If you're planning to implement a new module within Alpakka, look at our contributor advice.

You can also ask for clarifications or guidance in GitHub issues directly, or in the akka/dev chat if a more real time communication would be of benefit.

Caveat Emptor

Alpakka components are not always binary compatible between releases. API changes that are not backward compatible might be introduced as we refine and simplify based on your feedback. A module may be dropped in any release without prior deprecation. If not stated otherwise, the Lightbend subscription does not cover support for Alpakka modules.

Our goal is to improve the stability and test coverage for Alpakka APIs over time.

License

Alpakka is licensed under the Business Source License (BSL) 1.1, please see the Akka License FAQ.

Tests and documentation are under a separate license, see the LICENSE file in each documentation and test root directory for details.

alpakka's People

Contributors

2m avatar andreas-schroeder avatar armanbilge avatar chbatey avatar cheleb avatar dotbg avatar ennru avatar filosganga avatar francisdb avatar gipeshka avatar gkatzioura avatar harshalveera avatar huntc avatar johanandren avatar jrudolph avatar juanjodiaz avatar ktoso avatar longshorej avatar mdedetrich avatar patriknw avatar raboof avatar rucek avatar s12v avatar scala-steward avatar sebastian-alfers avatar seglo avatar stephennancekivell avatar sullis avatar takezoe avatar tg44 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

alpakka's Issues

Switch to scalafmt

For me it looks like scalafmt is the future of Scala code formatters. Ain't this project a nice way to give it a try in the Akka world?

Generic Camel endpoint connector

Provide a generic Camel endpoint connector that is compatible with the back-pressure model of Reactive Streams. Camel endpoints that are compatible with back-pressure are, for example, polling consumers, batch consumers and (async) producers. Not compatible are consumers that push messages downstream independent of demand, for example. A generic connector would make a large number of Camel endpoints re-usable for Akka Streams (and will hopefully increase the adoption of Alpakka 😃)

I've done some experimental work into this direction with an FS2 - Camel integration in the Streamz project (although the consumer side doesn't support back-pressure yet). This integration internally uses akka-camel. Using the FS2 - Akka Streams interface one can already use Camel endpoints with Akka Streams in a generic way. Anyway, to have an integration with small runtime overhead, there should be an integration of Akka Streams with the Camel API directly.

Thoughts?

AMQP Source acknowledges messages too early

Currently the AMQP source sends an acknowledgement back to the RabbitMQ server as soon as it sends the received message to the outlet.
https://github.com/akka/alpakka/blob/master/amqp/src/main/scala/akka/stream/alpakka/amqp/AmqpSourceStage.scala#L126

If anything happens to the message during the flow which would cause the flow to explode then the message will not be re-queued (although other messages that were in the buffer will). It should be possible to reserve the acknowledgement of the message until it has reached the sink. This way there would be no possibility of losing a message due to some non related runtime exception.

I haven't fully thought out how this would deal with invalid messages which could cause repeated exceptions, but it'd be cool to see if acking after the flow had been considered.

Geode connector

Hi
I can provide Geode sink and source (continuous query) connector.

Any interest ?

Elasticsearch connectors

It could be interesting to have source and flow for the corresponding scroll and bulk in elasticsearch.

RSocket connector

RSocket is interesting, especially for integration between microservices (implemented with different technology stacks).

reactivesocket-java API is based on Reactive Streams Publisher and Subscriber and could therefore be used directly with Akka Streams.

However, it might be beneficial to provide a more idiomatic API on top of reactivesocket-java.

The reactivesocket-java API is: https://github.com/ReactiveSocket/reactivesocket-java/blob/0.5.x/reactivesocket-core/src/main/java/io/reactivesocket/ReactiveSocket.java
(note that requestSubscription is being removed)

The same interface is used on both client and server side.

A Scala API wrapper might look like:

def fireAndForget(payload: Payload): Future[Done]
def requestResponse(payload: Payload): Future[Payload]
def requestStream(payload: Payload): Source[Payload]
def requestChannel(payloads: Source[Payload]): Source[Payload]

Consolidate documentation about docker images required for tests

It is now mentioned in docker-compose.yml:

# For detailed information about docker-compose visit https://docs.docker.com/compose/
# To start all docker containers required to execute the tests locally run:
# docker-compose up

in amqp docs: https://github.com/akka/alpakka/blob/e0afa9f89f8744b24356e344059cdfed1c1a7fed/docs/src/main/paradox/amqp.md#running-the-example-code

and in cassandra docs:
https://github.com/akka/alpakka/blob/e0afa9f89f8744b24356e344059cdfed1c1a7fed/docs/src/main/paradox/cassandra.md#running-the-example-code

We should mention docker-compose in the docs as well.

scaladsl vs javadsl

We should use the convention with scaladsl and javadsl packages to make it consistent with other things (e.g. Akka Streams). We should do this before the 0.1 release so that we have good examples.

S3: add support of bucket names with period (.)

based on comment @andrewrapp #24 (review)

"Hi all, looking forward to the s3 support! I played around with it a bit and noticed a cert related error. The issues seems to be related to having periods "." in bucket names, which breaks the s3 wild card certificate. For example, if I have a bucket "has.dots" then I'll get the following error: Caused by: java.security.cert.CertificateException: No subject alternative DNS name matching has.dots.s3.amazonaws.com found.

I was able to work around the issue by moving the bucket out of the host and placing it in the path (ie not using virtual hosted–style buckets):

def requestHost(s3Location: S3Location): Uri.Host = Uri.Host("s3-us-west-1.amazonaws.com")

def requestUri(s3Location: S3Location): Uri = Uri(s"/${s3Location.bucket}/${s3Location.key}").withHost(requestHost(s3Location)).withScheme("https")

This issue is described in http://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html
Specifically, "When using virtual hosted–style buckets with SSL, the SSL wild card certificate only matches buckets that do not contain periods. To work around this, use HTTP or write your own certificate verification logic."

background: http://docs.aws.amazon.com/AmazonS3/latest/dev/BucketRestrictions.html
relates to Ref #24

AMQP Connect Should Support Different SSL protocols

The default Rabbit MQ Connection Factory protocol for the version of the rabbit client in the dependencies (3.6.1) is TLSv1. This is not advised as per the RabbitMQ site and has been updated in their 4.x series as per here

The current AMQP Config doesn't offer the ability to select an alternate protocol version but looks like it could be added with a very minor change. I'm not sure if ye have plans to upgrade the rabbit client version (I havent actually used their 4.x client at all and am not sure how battle tested it is yet) so I'm happy to throw in a PR to add the extra AMQP Config item if people agree with the issue.

Apple Push Connector

If you like a push connector one for push as well as feedback based on first version of the Apple Push. Few of us are interested in helping with this.

Clarify introduction in documentation

What is required of an Alpakka connector?

The connectors in the alpakka repository must have an Akka Streams api (for java and scala). External connectors must have Akka Streams or Reactive Streams api.

Caffeine Flow

Hi
I can provide a Caffeine flow.
Caffeine is a Cache librairie, I use to reconcile events, in memory.

Say event A, B, C

Let A(1), A(2), A(3), ...., B(1), C(1) event to enter the flow, this flow will emit (A(1), B(1), C(1)) on reconciled OutLet and (A(2), A(3)) on Expired OutLet

Any interest out there ?

CSV Component

I can write CSV component, if you like.

I was thinking of creating a CSVFlow as well as CSVSourceStage.
Where CSVFlow will take a ByteString and chunk it, the CSVSourceStage could be a wrapper around FileIO. Let me know if this will be an acceptable component.

Fix groudId name for Cassandra connector

Using:
"com.typesafe.akka" %% "akka-stream-alpakka-cassandra" % "0.1"
reported unresolved dependencies. However, using
"com.lightbend.akka" groupId made it work. Please update documentation.

Cannot load Alpakka AMQP dependency

Hi there!

I followed the guidelines to add the AMQP Alpakka library and I'm getting a unresolved dependency error on sbt.

Bellow is a snippet of my build.sbt file. I am missing something like a resolver? Thanks!

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-stream-experimental" % "2.0.1",
  "com.typesafe.akka" %% "akka-stream-alpakka-amqp" % "1be3b7fc"
)

scalariformSettings

ScalariformKeys.preferences := ScalariformKeys.preferences.value
  .setPreference(AlignSingleLineCaseStatements, true)
  .setPreference(AlignSingleLineCaseStatements.MaxArrowIndent, 100)
  .setPreference(DoubleIndentClassDeclaration, true)
  .setPreference(PreserveDanglingCloseParenthesis, true)

fork in run := true

Discuss whether importing ts-reaktive-marshal would be a good fit

The marshalling part of ts-reaktive (small presentation and demos here) provides a DSL to describe bi-directional marshalling of custom data types to and from XML or JSON. A short example:

String NS = "http://test.com/namespace/api";

XMLProtocol<Date> date(QName tagName) { ... } // shows composability

XMLProtocol<Person> proto = 
  tag(qname(NS, "person"),
    attribute(qname("firstName")),
    attribute(qname("lastName")),
    arrayList(
      tag(qname(NS, "address"), 
        body
      )
    ),
    date(qname("birthday")),
    Person::new, /* ... */
  );

The above XMLProtocol can then be used to create an akka Flow that (with proper back-pressure) takes XMLEvent in and Person out, or, Person in and XMLEvent out.

The result would read and write something like this:

<person xmlns="http://test.com/namespace/api" firstName="Bob" lastName="Jones">
  <address>Hello</address>
  <address>World</address>
  <birthday>1970-01-01</birthday>
</person>

An equivalent DSL exists for JSON, but using actual JSON primitives instead.

Combinators like arrayList, option, etc. are shared between the two DSL.

I think it's a nice fit for alpakka, since it allows us to describe higher-level XML API's in a streaming fashion. For example, we could streamingly list an S3 bucket. Note that akka-http's current streaming marshalling capabilities depend on specific "framing" use cases. This proposed library can split stream elements at (almost) arbitrary locations.

It'd need a Scala API in addition to the Java one of course, but that's small potatoes.

What do you think?

JMS connector

Likely not the most exciting connector to write, but I think it would be interesting to a lot of Java people needing to integrate JEE things.

AWS Kinesis and DynamoDB

A little over a month ago I started working on akka-http based dynamodb and kinesis clients here. Would this be helpful for me to port over here?

FAILED: AmqpConnectorsTest

TimeoutException on AmqpConnectorsTest, but only for Scala 2.12. Timing issues? That timeout already is at 10 seconds...

[error] Test akka.stream.alpakka.amqp.javadsl.AmqpConnectorsTest.publishFanoutAndConsume failed: java.util.concurrent.TimeoutException: null, took 10.326 sec

[error]     at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)

[error]     at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)

[error]     at scala.concurrent.java8.FuturesConvertersImpl$CF.super$get(FutureConvertersImpl.scala:84)

[error]     at scala.concurrent.java8.FuturesConvertersImpl$CF.$anonfun$get$2(FutureConvertersImpl.scala:84)

[error]     at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)

[error]     at scala.concurrent.package$.blocking(package.scala:123)

[error]     at scala.concurrent.java8.FuturesConvertersImpl$CF.get(FutureConvertersImpl.scala:84)

[error]     at akka.stream.alpakka.amqp.javadsl.AmqpConnectorsTest.publishFanoutAndConsume(AmqpConnectorsTest.java:142)

Parallelize Travis build

Instead of using a custom script (sbt ... +test) with sbt's flaky cross compilation support by +, better define multiple Scala versions in the Travis configuration and use sbt_args. See the Travis docs. This will spawn parallel builds for each Scala version.

AmqpConnectorsTest.publishFanoutAndConsume

[info] Test akka.stream.alpakka.amqp.AmqpConnectorsTest.publishFanoutAndConsume started
[error] Test akka.stream.alpakka.amqp.AmqpConnectorsTest.publishFanoutAndConsume failed: java.util.concurrent.TimeoutException: null, took 10.468 sec
[error]     at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
[error]     at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
[error]     at scala.concurrent.java8.FuturesConvertersImpl$CF.scala$concurrent$java8$FuturesConvertersImpl$CF$$super$get(FutureConvertersImpl.scala:84)
[error]     at scala.concurrent.java8.FuturesConvertersImpl$CF$$anonfun$get$2.apply(FutureConvertersImpl.scala:84)
[error]     at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
[error]     at scala.concurrent.package$.blocking(package.scala:123)
[error]     at scala.concurrent.java8.FuturesConvertersImpl$CF.get(FutureConvertersImpl.scala:84)
[error]     at akka.stream.alpakka.amqp.AmqpConnectorsTest.publishFanoutAndConsume(AmqpConnectorsTest.java:141)
[error]     ...
[info] Test run finished: 1 failed, 0 ignored, 2 total, 10.785s

https://travis-ci.org/akka/alpakka/builds/173177910

Server-Sent Events Connector

Server-Sent Events (SSE) are a W3C standard for streaming simple UTF-8 encoded text based events via HTTP from a server to a client (one-way).

While the primary focus is pushing events from a webserver to a webbrowser (well supported except for IE), SSE can also be used to exchange events between services, i.e. allow for event-based collaboration. An example can be found in the Gabbler demo project which uses the akka-sse library.

In the above example SSE is used to establish a point-to-point connection between two services. If more services need to consume events published by one service, integration via Alpakka might be an attractive alternative to opening up many point-to-point connections. Also, data transformations could be applied by Alpakka and arbitrary endponints with an available connector could consume the events.

Gitter Connector

Hey,

Is anyone interested in a Gitter connector, It could be useful for chat bots and building skynet ;)

I can build on akka http and provide the json model as case classes.

S3 DiskBuffer test fails sporadically

[info] DiskBuffer
[info] - should emit a chunk on its output containing the concatenation of all input values *** FAILED *** (827 milliseconds)
[info]   A timeout occurred waiting for a future to complete. Queried 6 times, sleeping 15 milliseconds between each query. (DiskBufferSpec.scala:39)
[info]   org.scalatest.concurrent.Futures$FutureConcept$$anon$1:
[info]   at org.scalatest.concurrent.Futures$FutureConcept$class.tryTryAgain$1(Futures.scala:538)
[info]   at org.scalatest.concurrent.Futures$FutureConcept$class.futureValueImpl(Futures.scala:550)
[info]   at org.scalatest.concurrent.ScalaFutures$$anon$1.futureValueImpl(ScalaFutures.scala:275)
[info]   at org.scalatest.concurrent.Futures$FutureConcept$class.futureValue(Futures.scala:476)
[info]   at org.scalatest.concurrent.ScalaFutures$$anon$1.futureValue(ScalaFutures.scala:275)
[info]   at akka.stream.alpakka.s3.impl.DiskBufferSpec$$anonfun$1.apply(DiskBufferSpec.scala:39)
[info]   at akka.stream.alpakka.s3.impl.DiskBufferSpec$$anonfun$1.apply(DiskBufferSpec.scala:30)
[info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
[info]   at org.scalatest.FlatSpecLike$$anon$1.apply(FlatSpecLike.scala:1682)
[info]   at org.scalatest.TestSuite$class.withFixture(TestSuite.scala:196)
[info]   at akka.stream.alpakka.s3.impl.DiskBufferSpec.withFixture(DiskBufferSpec.scala:17)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.