Skip to content

Commit

Permalink
Merge branch 'master' into r-0.9
Browse files Browse the repository at this point in the history
  • Loading branch information
krasserm committed Feb 20, 2018
2 parents 60c60fb + 7ece706 commit 24e6747
Show file tree
Hide file tree
Showing 44 changed files with 203 additions and 251 deletions.
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ before_script:
language:
- scala
scala:
- 2.11.11
- 2.12.2
- 2.11.12
- 2.12.4
jdk:
- oraclejdk8
16 changes: 8 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,21 @@ Streamz artifacts are available for Scala 2.11 and 2.12 at:

resolvers += "krasserm at bintray" at "http://dl.bintray.com/krasserm/maven"

### Latest stable release
### Latest stable release for FS2 0.10

libraryDependencies += "com.github.krasserm" %% "streamz-camel-akka" % "0.8.1"
libraryDependencies += "com.github.krasserm" %% "streamz-camel-akka" % "0.9"

libraryDependencies += "com.github.krasserm" %% "streamz-camel-fs2" % "0.8.1" // uses FS2 0.9.5
libraryDependencies += "com.github.krasserm" %% "streamz-camel-fs2" % "0.9" // uses FS2 0.10.1

libraryDependencies += "com.github.krasserm" %% "streamz-converter" % "0.8.1" // uses FS2 0.9.5
libraryDependencies += "com.github.krasserm" %% "streamz-converter" % "0.9" // uses FS2 0.10.1

### Latest milestone release
### Latest stable release for FS2 0.9

libraryDependencies += "com.github.krasserm" %% "streamz-camel-akka" % "0.9-M1"
libraryDependencies += "com.github.krasserm" %% "streamz-camel-akka" % "0.8.1"

libraryDependencies += "com.github.krasserm" %% "streamz-camel-fs2" % "0.9-M1" // uses FS2 0.10.0-M3
libraryDependencies += "com.github.krasserm" %% "streamz-camel-fs2" % "0.8.1" // uses FS2 0.9.5

libraryDependencies += "com.github.krasserm" %% "streamz-converter" % "0.9-M1" // uses FS2 0.10.0-M3
libraryDependencies += "com.github.krasserm" %% "streamz-converter" % "0.8.1" // uses FS2 0.9.5

Documentation
-------------
Expand Down
23 changes: 8 additions & 15 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
import com.typesafe.sbt.SbtScalariform
import com.typesafe.sbt.SbtScalariform.ScalariformKeys

import de.heikoseeberger.sbtheader.license.Apache2_0

import scalariform.formatter.preferences._

import UnidocKeys._

// ---------------------------------------------------------------------------
// Main settings
// ---------------------------------------------------------------------------
Expand All @@ -17,9 +13,9 @@ organization in ThisBuild := "com.github.krasserm"

version in ThisBuild := "0.9-M1"

crossScalaVersions in ThisBuild := Seq("2.11.11", "2.12.2")
crossScalaVersions in ThisBuild := Seq("2.11.12", "2.12.4")

scalaVersion in ThisBuild := "2.12.2"
scalaVersion in ThisBuild := "2.12.4"

scalacOptions in ThisBuild ++= Seq("-feature", "-language:higherKinds", "-language:implicitConversions", "-deprecation")

Expand All @@ -29,32 +25,29 @@ libraryDependencies in ThisBuild += "org.scalatest" %% "scalatest" % Version.Sca
// Code formatter settings
// ---------------------------------------------------------------------------

SbtScalariform.scalariformSettings

ScalariformKeys.preferences := ScalariformKeys.preferences.value
.setPreference(AlignSingleLineCaseStatements, true)
.setPreference(DanglingCloseParenthesis, Preserve)
.setPreference(DoubleIndentClassDeclaration, false)
.setPreference(DoubleIndentConstructorArguments, false)

// ---------------------------------------------------------------------------
// License header settings
// ---------------------------------------------------------------------------

lazy val header = Apache2_0("2014 - 2017", "the original author or authors.")
lazy val header = HeaderLicense.ALv2("2014 - 2018", "the original author or authors.")

lazy val headerSettings = Seq(headers := Map(
"scala" -> header,
"java" -> header
))
lazy val headerSettings = Seq(
headerLicense := Some(header)
)

// ---------------------------------------------------------------------------
// Projects
// ---------------------------------------------------------------------------

lazy val root = project.in(file("."))
.aggregate(camelContext, camelAkka, camelFs2, converter, examples)
.settings(unidocSettings: _*)
.settings(unidocProjectFilter in (ScalaUnidoc, unidoc) := inAnyProject -- inProjects(examples))
.enablePlugins(ScalaUnidocPlugin)

lazy val camelContext = project.in(file("streamz-camel-context"))
.enablePlugins(AutomateHeaderPlugin)
Expand Down
10 changes: 5 additions & 5 deletions project/Version.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
object Version {
val Akka = "2.4.18"
val Camel = "2.19.0"
val Fs2 = "0.10.0-M3"
val Log4j = "2.5"
val Akka = "2.5.9"
val Camel = "2.20.2"
val Fs2 = "0.10.1"
val Log4j = "2.10.0"
val JUnitInterface = "0.11"
val Scalatest = "3.0.1"
val Scalatest = "3.0.5"
}
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=0.13.15
sbt.version=1.1.0
6 changes: 3 additions & 3 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
addSbtPlugin("com.eed3si9n" % "sbt-unidoc" % "0.3.3")
addSbtPlugin("com.eed3si9n" % "sbt-unidoc" % "0.4.1")

addSbtPlugin("de.heikoseeberger" % "sbt-header" % "1.6.0")
addSbtPlugin("de.heikoseeberger" % "sbt-header" % "4.1.0")

addSbtPlugin("org.scalariform" % "sbt-scalariform" % "1.6.0")
addSbtPlugin("org.scalariform" % "sbt-scalariform" % "1.8.2")
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014 - 2017 the original author or authors.
* Copyright 2014 - 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014 - 2017 the original author or authors.
* Copyright 2014 - 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,73 +16,62 @@

package streamz.camel.akka

import akka.actor.Props
import akka.pattern.pipe
import akka.stream.actor.ActorPublisher
import akka.stream.actor.ActorPublisherMessage.Request
import org.apache.camel.ExchangePattern
import akka.stream._
import akka.stream.stage._
import org.apache.camel._
import streamz.camel.{ StreamContext, StreamMessage }

import scala.concurrent.{ ExecutionContext, Future }
import scala.reflect.ClassTag
import scala.util.{ Failure, Success, Try }

private[akka] object EndpointConsumer {
case object ConsumeTimeout
case class ConsumeSuccess(m: Any)
case class ConsumeFailure(t: Throwable)

def props[A](uri: String)(implicit streamContext: StreamContext, tag: ClassTag[A]): Props =
Props(new EndpointConsumer[A](uri))
}

private[akka] class EndpointConsumer[A](uri: String)(implicit streamContext: StreamContext, tag: ClassTag[A]) extends ActorPublisher[StreamMessage[A]] {
import EndpointConsumer._
private[akka] class EndpointConsumer[A](uri: String)(implicit streamContext: StreamContext, tag: ClassTag[A])
extends GraphStage[SourceShape[StreamMessage[A]]] {

private implicit val ec: ExecutionContext =
ExecutionContext.fromExecutorService(streamContext.executorService)

def waiting: Receive = {
case r: Request =>
consumeAsync()
context.become(consuming)
}
val out: Outlet[StreamMessage[A]] =
Outlet("EndpointConsumer.out")

def consuming: Receive = {
case ConsumeSuccess(m) =>
onNext(m.asInstanceOf[StreamMessage[A]])
if (!isCanceled && totalDemand > 0) consumeAsync() else context.become(waiting)
case ConsumeTimeout =>
if (!isCanceled) consumeAsync()
case ConsumeFailure(e) =>
onError(e)
}
override val shape: SourceShape[StreamMessage[A]] =
SourceShape(out)

def receive = waiting
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
import streamContext.consumerTemplate

private def consumeAsync()(implicit tag: ClassTag[A]): Unit = {
import streamContext.consumerTemplate
private val consumedCallback = getAsyncCallback(consumed)

Future(consumerTemplate.receive(uri, 500)).map {
case null =>
ConsumeTimeout
case ce if ce.getPattern != ExchangePattern.InOnly =>
ConsumeFailure(new IllegalArgumentException(s"Exchange pattern ${ExchangePattern.InOnly} expected but was ${ce.getPattern}"))
case ce if ce.getException ne null =>
consumerTemplate.doneUoW(ce)
ConsumeFailure(ce.getException)
case ce =>
Try(StreamMessage.from[A](ce.getIn)) match {
case Success(m) =>
consumerTemplate.doneUoW(ce)
ConsumeSuccess(m)
case Failure(e) =>
ce.setException(e)
setHandler(out, new OutHandler {
override def onPull(): Unit =
consumeAsync()
})

private def consumeAsync(): Unit = {
Future(consumerTemplate.receive(uri, 500)).foreach(consumedCallback.invoke)
}

private def consumed(exchange: Exchange): Unit = {
exchange match {
case null =>
if (!isClosed(out)) consumeAsync()
case ce if ce.getPattern != ExchangePattern.InOnly =>
failStage(new IllegalArgumentException(s"Exchange pattern ${ExchangePattern.InOnly} expected but was ${ce.getPattern}"))
case ce if ce.getException ne null =>
consumerTemplate.doneUoW(ce)
ConsumeFailure(e)
failStage(ce.getException)
case ce =>
Try(StreamMessage.from[A](ce.getIn)) match {
case Success(m) =>
consumerTemplate.doneUoW(ce)
push(out, m)
case Failure(e) =>
ce.setException(e)
consumerTemplate.doneUoW(ce)
failStage(e)
}
}
}.recover {
case ex => ConsumeFailure(ex)
}.pipeTo(self)
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014 - 2017 the original author or authors.
* Copyright 2014 - 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -46,7 +46,7 @@ private class AsyncExchangeProcessor(capacity: Int) extends AsyncProcessor {
}

private[akka] class EndpointConsumerReplier[A, B](uri: String, capacity: Int)(implicit streamContext: StreamContext, tag: ClassTag[B])
extends GraphStage[FlowShape[StreamMessage[A], StreamMessage[B]]] {
extends GraphStage[FlowShape[StreamMessage[A], StreamMessage[B]]] {

private implicit val ec: ExecutionContext =
ExecutionContext.fromExecutorService(streamContext.executorService)
Expand All @@ -72,7 +72,7 @@ private[akka] class EndpointConsumerReplier[A, B](uri: String, capacity: Int)(im
setHandler(in, new InHandler {
override def onPush(): Unit = {
val AsyncExchange(ce, ac) = emittedExchanges.dequeue()
ce.setOut(grab(in).camelMessage)
ce.setOut(grab(in).camelMessage(streamContext.camelContext))
ac.done(false)
pull(in)
if (!consuming && isAvailable(out)) consumeAsync()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014 - 2017 the original author or authors.
* Copyright 2014 - 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014 - 2017 the original author or authors.
* Copyright 2014 - 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -212,7 +212,7 @@ package object scaladsl {
Flow[A].map(StreamMessage(_)).via(sendRequest[A, B](uri, parallelism)).map(_.body)

private def consumeInOnly[A](uri: String)(implicit streamContext: StreamContext, tag: ClassTag[A]): Source[StreamMessage[A], NotUsed] =
Source.actorPublisher[StreamMessage[A]](EndpointConsumer.props[A](uri)).mapMaterializedValue(_ => NotUsed)
Source.fromGraph(new EndpointConsumer[A](uri))

private def consumeInOut[A, B](uri: String, capacity: Int)(implicit streamContext: StreamContext, tag: ClassTag[B]): Flow[StreamMessage[A], StreamMessage[B], NotUsed] =
Flow.fromGraph(new EndpointConsumerReplier[A, B](uri, capacity))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014 - 2017 the original author or authors.
* Copyright 2014 - 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014 - 2017 the original author or authors.
* Copyright 2014 - 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
1 change: 1 addition & 0 deletions streamz-camel-akka/src/test/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
akka.test.single-expect-default = 10s
akka.log-dead-letters-during-shutdown = off
akka.stream.materializer.debug.fuzzing-mode = on
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014 - 2017 the original author or authors.
* Copyright 2014 - 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Loading

0 comments on commit 24e6747

Please sign in to comment.