Skip to content

Commit

Permalink
Upgraded Camel version to 3.7.0 (#19)
Browse files Browse the repository at this point in the history
  • Loading branch information
taraktikos authored Dec 18, 2020
1 parent 084f9e9 commit 60d68c7
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 77 deletions.
12 changes: 6 additions & 6 deletions project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
:dependencies [[org.clojure/clojure "1.10.1"]
[org.clojure/tools.logging "1.0.0"]

[org.apache.camel/camel-core "3.6.0"]
[org.apache.camel/camel-sql "3.6.0"]
[org.apache.camel/camel-jcache "3.6.0"]
[org.apache.camel/camel-management "3.6.0"]
[org.apache.camel/camel-core "3.7.0"]
[org.apache.camel/camel-sql "3.7.0"]
[org.apache.camel/camel-jcache "3.7.0"]
[org.apache.camel/camel-management "3.7.0"]
[malabarba/lazy-map "1.3"]

[camel-snake-kebab "0.4.1"]
Expand All @@ -27,6 +27,6 @@

:profiles {:test {:dependencies [[com.rpl/specter "1.1.3"]
[org.ehcache/ehcache "3.8.1"]
[org.apache.camel/camel-http "3.6.0"]
[org.apache.camel/camel-jsonpath "3.6.0"]]}
[org.apache.camel/camel-http "3.7.0"]
[org.apache.camel/camel-jsonpath "3.7.0"]]}
:uberjar {:aot :all}})
51 changes: 34 additions & 17 deletions src/main/clj/clj_camel/core.clj
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
(ns clj-camel.core
(:refer-clojure :rename {memoize core-memoize
filter core-filter
when core-when})
(:require [clojure.tools.logging :as log]
[clojure.string :as string]
[clojure.walk :as w]
[lazy-map.core :refer [lazy-map]]
[clj-camel.headers :refer [dict]]
[clj-camel.camel-map-wrapper :refer [camel-map]])
(:import (org.apache.camel.model RouteDefinition ProcessorDefinition ChoiceDefinition SplitDefinition)
(org.apache.camel Exchange Processor Predicate Expression CamelContext NamedNode AggregationStrategy TypeConverter ProducerTemplate)
(org.apache.camel Exchange Processor Predicate Expression CamelContext NamedNode AggregationStrategy TypeConverter ProducerTemplate LoggingLevel)
(org.apache.camel.impl DefaultCamelContext)
(org.apache.camel.builder DeadLetterChannelBuilder RouteBuilder SimpleBuilder Builder ValueBuilder AggregationStrategies)
(clojure.lang ExceptionInfo)
Expand Down Expand Up @@ -80,9 +83,9 @@
:properties (camel-map
(-> ex .getProperties))
:body (-> ex .getIn .getBody)}))]
(when headers
(core-when headers
(-> ex .getIn (.setHeaders (.m headers))))
(when body
(core-when body
(-> ex .getIn (.setBody body)))))))

(defn process
Expand Down Expand Up @@ -206,6 +209,20 @@
(.id (.log pd msg) id)
(.log pd msg)))

(defn log-warn
"Creates a log message to be logged at WARN level."
[^ProcessorDefinition pd & [^String msg {:keys [id]}]]
(if id
(.id (.log pd LoggingLevel/WARN msg) id)
(.log pd LoggingLevel/WARN msg)))

(defn log-error
"Creates a log message to be logged at ERROR level."
[^ProcessorDefinition pd & [^String msg {:keys [id]}]]
(if id
(.id (.log pd LoggingLevel/ERROR msg) id)
(.log pd LoggingLevel/ERROR msg)))

(defn copy-body-to-header
"Copies current body to header with specific key
eg. (c/process (c/copy-body-to-header :body-data))"
Expand Down Expand Up @@ -246,10 +263,10 @@
`(.split ~pd ~expr))
~@(when-let [id# (:id opts)]
`((.id ~id#)))
~@(when (:streaming opts)
`((.streaming)))
~@(when (:parallel-processing opts)
`((.parallelProcessing)))
~@(core-when (:streaming opts)
`((.streaming)))
~@(core-when (:parallel-processing opts)
`((.parallelProcessing)))
~@(concat body `((.end)))))

(defmacro filter
Expand Down Expand Up @@ -307,12 +324,12 @@
correlation-expression
caller-runs-when-rejected]}]]
`(-> (.throttle ~pd ~requests-number)
~@(when (some? async-delayed) `((.asyncDelayed ~async-delayed)))
~@(when (some? reject-execution) `((.rejectExecution ~reject-execution)))
~@(when (some? time-period-millis) `((.timePeriodMillis ~time-period-millis)))
~@(when (some? executor-service-ref) `((.executorServiceRef ~executor-service-ref)))
~@(when (some? correlation-expression) `((.correlationExpression ~correlation-expression)))
~@(when (some? caller-runs-when-rejected) `((.callerRunsWhenRejected ~caller-runs-when-rejected)))))
~@(core-when (some? async-delayed) `((.asyncDelayed ~async-delayed)))
~@(core-when (some? reject-execution) `((.rejectExecution ~reject-execution)))
~@(core-when (some? time-period-millis) `((.timePeriodMillis ~time-period-millis)))
~@(core-when (some? executor-service-ref) `((.executorServiceRef ~executor-service-ref)))
~@(core-when (some? correlation-expression) `((.correlationExpression ~correlation-expression)))
~@(core-when (some? caller-runs-when-rejected) `((.callerRunsWhenRejected ~caller-runs-when-rejected)))))

(defmacro aggregate
"The Aggregator from the EIP patterns allows you to combine a number of messages together into a single message.
Expand All @@ -331,10 +348,10 @@
parallel-processing
completion-predicate]}]]
`(-> (.aggregate ~pd ~expression ~strategy)
~@(when (some? completion-size) `((.completionSize ~completion-size)))
~@(when (some? completion-timeout) `((.completionTimeout ~completion-timeout)))
~@(when (some? parallel-processing) `((.parallelProcessing ~parallel-processing)))
~@(when (some? completion-predicate) `((.completionPredicate ~completion-predicate)))))
~@(core-when (some? completion-size) `((.completionSize ~completion-size)))
~@(core-when (some? completion-timeout) `((.completionTimeout ~completion-timeout)))
~@(core-when (some? parallel-processing) `((.parallelProcessing ~parallel-processing)))
~@(core-when (some? completion-predicate) `((.completionPredicate ~completion-predicate)))))

(defn create-jdbc-idempotent-repository
"Creates a new jdbc based repository"
Expand Down
74 changes: 40 additions & 34 deletions src/main/clj/clj_camel/headers.clj
Original file line number Diff line number Diff line change
@@ -1,36 +1,42 @@
(ns clj-camel.headers)

(def dict {:camel-http-uri "CamelHttpUri"
:camel-http-query "CamelHttpQuery"
:camel-http-method "CamelHttpMethod"
:camel-http-response-code "CamelHttpResponseCode"
:camel-http-response-text "CamelHttpResponseText"
:camel-filter-matched "CamelFilterMatched"

:camel-ssh-stderr "CamelSshStderr"
:camel-ssh-exit-value "CamelSshExitValue"

:camel-content-type "Content-Type"

:fired-time "firedTime"

:camel-split-complete "CamelSplitComplete"
:camel-split-size "CamelSplitSize"

:camel-file-absolute "CamelFileAbsolute"
:camel-file-absolute-path "CamelFileAbsolutePath"
:camel-file-host "CamelFileHost"
:camel-file-last-modified "CamelFileLastModified"
:camel-file-length "CamelFileLength"
:camel-file-name "CamelFileName"
:camel-file-name-consumed "CamelFileNameConsumed"
:camel-file-name-only "CamelFileNameOnly"
:camel-file-name-produced "CamelFileNameProduced"
:camel-file-parent "CamelFileParent"
:camel-file-path "CamelFilePath"
:camel-file-relative-path "CamelFileRelativePath"
:camel-ftp-reply-code "CamelFtpReplyCode"
:camel-ftp-reply-string "CamelFtpReplyString"

:x-token "x-token"
:x-service-origin "x-service-origin"})
(def dict {:camel-http-uri "CamelHttpUri"
:camel-http-query "CamelHttpQuery"
:camel-http-method "CamelHttpMethod"
:camel-http-response-code "CamelHttpResponseCode"
:camel-http-response-text "CamelHttpResponseText"
:camel-filter-matched "CamelFilterMatched"

:camel-ssh-stderr "CamelSshStderr"
:camel-ssh-exit-value "CamelSshExitValue"

:camel-content-type "Content-Type"

:fired-time "firedTime"

:camel-split-complete "CamelSplitComplete"
:camel-split-size "CamelSplitSize"

:camel-file-absolute "CamelFileAbsolute"
:camel-file-absolute-path "CamelFileAbsolutePath"
:camel-file-host "CamelFileHost"
:camel-file-last-modified "CamelFileLastModified"
:camel-file-length "CamelFileLength"
:camel-file-name "CamelFileName"
:camel-file-name-consumed "CamelFileNameConsumed"
:camel-file-name-only "CamelFileNameOnly"
:camel-file-name-produced "CamelFileNameProduced"
:camel-file-parent "CamelFileParent"
:camel-file-path "CamelFilePath"
:camel-file-relative-path "CamelFileRelativePath"
:camel-ftp-reply-code "CamelFtpReplyCode"
:camel-ftp-reply-string "CamelFtpReplyString"

:camel-google-pubsub-message-id "CamelGooglePubsub.MessageId"
:camel-google-pubsub-msg-ack-id "CamelGooglePubsub.MsgAckId"
:camel-google-pubsub-attributes "CamelGooglePubsub.Attributes"
:camel-google-pubsub-publish-time "CamelGooglePubsub.PublishTime"

:x-token "x-token"
:x-service-origin "x-service-origin"
:x-correlation-id "x-correlation-id"})
38 changes: 19 additions & 19 deletions src/main/clj/clj_camel/util.clj
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
(ns clj-camel.util
(:require [clj-camel.data.json :as json]
[clj-camel.core :refer :all]
(:require [clj-camel.core :as c]
[clj-camel.data.json :as json]
[camel-snake-kebab.core :refer [->kebab-case ->kebab-case-keyword]]
[camel-snake-kebab.extras :refer [transform-keys]]
[clojure.java.io :as io])
Expand Down Expand Up @@ -42,20 +42,20 @@

(defn transform-json-to-clojure-map-with-kebabified-keys [named-node]
(-> named-node
(process parse-json)
(process kebabify-keys)))
(c/process parse-json)
(c/process kebabify-keys)))

(def map-to-input-stream-converter
(type-converter
(c/type-converter
(-> value
(pr-str)
(.getBytes)
(io/input-stream))))

(def exchange->map
(type-converter
(c/type-converter
(-> value
(get-in-body))))
(c/get-in-body))))

(defn set-start-time [{:keys [headers]}]
{:headers (assoc headers :start-time (. System (nanoTime)))})
Expand All @@ -68,29 +68,29 @@

(defn debug-route [{:keys [ctx headers body]} route]
(let [res (atom nil)
^DefaultCamelContext ctx (or ctx (camel-context))
^DefaultCamelContext ctx (or ctx (c/camel-context))
^ManagedCamelContext managed-ctx (.getExtension ctx ManagedCamelContext)
pd (.createProducerTemplate ctx)]
(add-routes ctx
(c/add-routes ctx
route
(route-builder (from "direct:result")
(route-id "debug-result-route")
(process (fn [msg] (reset! res msg)))
(to "mock:mock")))
(c/route-builder (c/from "direct:result")
(c/route-id "debug-result-route")
(c/process (fn [msg] (reset! res msg)))
(c/to "mock:mock")))
(.start ctx)
(spit "routes.xml" (.dumpRoutesAsXml (.getManagedCamelContext managed-ctx)))
(send-body-and-headers pd "direct:test" body headers)
(remove-route ctx "test-route")
(remove-route ctx "debug-result-route")
(c/send-body-and-headers pd "direct:test" body headers)
(c/remove-route ctx "test-route")
(c/remove-route ctx "debug-result-route")
(.shutdown ctx)
(Thread/sleep 100)
@res))

(defn dump-route-to-xml [route]
(let [^DefaultCamelContext ctx (camel-context)
(let [^DefaultCamelContext ctx (c/camel-context)
^ManagedCamelContext managed-ctx (.getExtension ctx ManagedCamelContext)]
(add-routes ctx route)
(c/add-routes ctx route)
(.start ctx)
(let [xml (.dumpRoutesAsXml (.getManagedCamelContext managed-ctx))]
(.shutdown ctx)
xml)))
xml)))
2 changes: 1 addition & 1 deletion src/test/resources/throttle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<setBody id="1">
<constant>test</constant>
</setBody>
<throttle asyncDelayed="false" id="1" timePeriodMillis="10000" rejectExecution="false">
<throttle customId="true" asyncDelayed="false" id="1" timePeriodMillis="10000" rejectExecution="false">
<expressionDefinition>20</expressionDefinition>
</throttle>
<log id="1" message="after throttling"/>
Expand Down

0 comments on commit 60d68c7

Please sign in to comment.