diff --git a/project.clj b/project.clj index ba78d2d..1492d18 100644 --- a/project.clj +++ b/project.clj @@ -6,10 +6,10 @@ :dependencies [[org.clojure/clojure "1.10.1"] [org.clojure/tools.logging "1.0.0"] - [org.apache.camel/camel-core "3.6.0"] - [org.apache.camel/camel-sql "3.6.0"] - [org.apache.camel/camel-jcache "3.6.0"] - [org.apache.camel/camel-management "3.6.0"] + [org.apache.camel/camel-core "3.7.0"] + [org.apache.camel/camel-sql "3.7.0"] + [org.apache.camel/camel-jcache "3.7.0"] + [org.apache.camel/camel-management "3.7.0"] [malabarba/lazy-map "1.3"] [camel-snake-kebab "0.4.1"] @@ -27,6 +27,6 @@ :profiles {:test {:dependencies [[com.rpl/specter "1.1.3"] [org.ehcache/ehcache "3.8.1"] - [org.apache.camel/camel-http "3.6.0"] - [org.apache.camel/camel-jsonpath "3.6.0"]]} + [org.apache.camel/camel-http "3.7.0"] + [org.apache.camel/camel-jsonpath "3.7.0"]]} :uberjar {:aot :all}}) diff --git a/src/main/clj/clj_camel/core.clj b/src/main/clj/clj_camel/core.clj index 56c9a66..4803137 100644 --- a/src/main/clj/clj_camel/core.clj +++ b/src/main/clj/clj_camel/core.clj @@ -1,4 +1,7 @@ (ns clj-camel.core + (:refer-clojure :rename {memoize core-memoize + filter core-filter + when core-when}) (:require [clojure.tools.logging :as log] [clojure.string :as string] [clojure.walk :as w] @@ -6,7 +9,7 @@ [clj-camel.headers :refer [dict]] [clj-camel.camel-map-wrapper :refer [camel-map]]) (:import (org.apache.camel.model RouteDefinition ProcessorDefinition ChoiceDefinition SplitDefinition) - (org.apache.camel Exchange Processor Predicate Expression CamelContext NamedNode AggregationStrategy TypeConverter ProducerTemplate) + (org.apache.camel Exchange Processor Predicate Expression CamelContext NamedNode AggregationStrategy TypeConverter ProducerTemplate LoggingLevel) (org.apache.camel.impl DefaultCamelContext) (org.apache.camel.builder DeadLetterChannelBuilder RouteBuilder SimpleBuilder Builder ValueBuilder AggregationStrategies) (clojure.lang ExceptionInfo) @@ -80,9 +83,9 @@ :properties (camel-map (-> ex .getProperties)) :body (-> ex .getIn .getBody)}))] - (when headers + (core-when headers (-> ex .getIn (.setHeaders (.m headers)))) - (when body + (core-when body (-> ex .getIn (.setBody body))))))) (defn process @@ -206,6 +209,20 @@ (.id (.log pd msg) id) (.log pd msg))) +(defn log-warn + "Creates a log message to be logged at WARN level." + [^ProcessorDefinition pd & [^String msg {:keys [id]}]] + (if id + (.id (.log pd LoggingLevel/WARN msg) id) + (.log pd LoggingLevel/WARN msg))) + +(defn log-error + "Creates a log message to be logged at ERROR level." + [^ProcessorDefinition pd & [^String msg {:keys [id]}]] + (if id + (.id (.log pd LoggingLevel/ERROR msg) id) + (.log pd LoggingLevel/ERROR msg))) + (defn copy-body-to-header "Copies current body to header with specific key eg. (c/process (c/copy-body-to-header :body-data))" @@ -246,10 +263,10 @@ `(.split ~pd ~expr)) ~@(when-let [id# (:id opts)] `((.id ~id#))) - ~@(when (:streaming opts) - `((.streaming))) - ~@(when (:parallel-processing opts) - `((.parallelProcessing))) + ~@(core-when (:streaming opts) + `((.streaming))) + ~@(core-when (:parallel-processing opts) + `((.parallelProcessing))) ~@(concat body `((.end))))) (defmacro filter @@ -307,12 +324,12 @@ correlation-expression caller-runs-when-rejected]}]] `(-> (.throttle ~pd ~requests-number) - ~@(when (some? async-delayed) `((.asyncDelayed ~async-delayed))) - ~@(when (some? reject-execution) `((.rejectExecution ~reject-execution))) - ~@(when (some? time-period-millis) `((.timePeriodMillis ~time-period-millis))) - ~@(when (some? executor-service-ref) `((.executorServiceRef ~executor-service-ref))) - ~@(when (some? correlation-expression) `((.correlationExpression ~correlation-expression))) - ~@(when (some? caller-runs-when-rejected) `((.callerRunsWhenRejected ~caller-runs-when-rejected))))) + ~@(core-when (some? async-delayed) `((.asyncDelayed ~async-delayed))) + ~@(core-when (some? reject-execution) `((.rejectExecution ~reject-execution))) + ~@(core-when (some? time-period-millis) `((.timePeriodMillis ~time-period-millis))) + ~@(core-when (some? executor-service-ref) `((.executorServiceRef ~executor-service-ref))) + ~@(core-when (some? correlation-expression) `((.correlationExpression ~correlation-expression))) + ~@(core-when (some? caller-runs-when-rejected) `((.callerRunsWhenRejected ~caller-runs-when-rejected))))) (defmacro aggregate "The Aggregator from the EIP patterns allows you to combine a number of messages together into a single message. @@ -331,10 +348,10 @@ parallel-processing completion-predicate]}]] `(-> (.aggregate ~pd ~expression ~strategy) - ~@(when (some? completion-size) `((.completionSize ~completion-size))) - ~@(when (some? completion-timeout) `((.completionTimeout ~completion-timeout))) - ~@(when (some? parallel-processing) `((.parallelProcessing ~parallel-processing))) - ~@(when (some? completion-predicate) `((.completionPredicate ~completion-predicate))))) + ~@(core-when (some? completion-size) `((.completionSize ~completion-size))) + ~@(core-when (some? completion-timeout) `((.completionTimeout ~completion-timeout))) + ~@(core-when (some? parallel-processing) `((.parallelProcessing ~parallel-processing))) + ~@(core-when (some? completion-predicate) `((.completionPredicate ~completion-predicate))))) (defn create-jdbc-idempotent-repository "Creates a new jdbc based repository" diff --git a/src/main/clj/clj_camel/headers.clj b/src/main/clj/clj_camel/headers.clj index fe4ea79..d99c3b4 100644 --- a/src/main/clj/clj_camel/headers.clj +++ b/src/main/clj/clj_camel/headers.clj @@ -1,36 +1,42 @@ (ns clj-camel.headers) -(def dict {:camel-http-uri "CamelHttpUri" - :camel-http-query "CamelHttpQuery" - :camel-http-method "CamelHttpMethod" - :camel-http-response-code "CamelHttpResponseCode" - :camel-http-response-text "CamelHttpResponseText" - :camel-filter-matched "CamelFilterMatched" - - :camel-ssh-stderr "CamelSshStderr" - :camel-ssh-exit-value "CamelSshExitValue" - - :camel-content-type "Content-Type" - - :fired-time "firedTime" - - :camel-split-complete "CamelSplitComplete" - :camel-split-size "CamelSplitSize" - - :camel-file-absolute "CamelFileAbsolute" - :camel-file-absolute-path "CamelFileAbsolutePath" - :camel-file-host "CamelFileHost" - :camel-file-last-modified "CamelFileLastModified" - :camel-file-length "CamelFileLength" - :camel-file-name "CamelFileName" - :camel-file-name-consumed "CamelFileNameConsumed" - :camel-file-name-only "CamelFileNameOnly" - :camel-file-name-produced "CamelFileNameProduced" - :camel-file-parent "CamelFileParent" - :camel-file-path "CamelFilePath" - :camel-file-relative-path "CamelFileRelativePath" - :camel-ftp-reply-code "CamelFtpReplyCode" - :camel-ftp-reply-string "CamelFtpReplyString" - - :x-token "x-token" - :x-service-origin "x-service-origin"}) +(def dict {:camel-http-uri "CamelHttpUri" + :camel-http-query "CamelHttpQuery" + :camel-http-method "CamelHttpMethod" + :camel-http-response-code "CamelHttpResponseCode" + :camel-http-response-text "CamelHttpResponseText" + :camel-filter-matched "CamelFilterMatched" + + :camel-ssh-stderr "CamelSshStderr" + :camel-ssh-exit-value "CamelSshExitValue" + + :camel-content-type "Content-Type" + + :fired-time "firedTime" + + :camel-split-complete "CamelSplitComplete" + :camel-split-size "CamelSplitSize" + + :camel-file-absolute "CamelFileAbsolute" + :camel-file-absolute-path "CamelFileAbsolutePath" + :camel-file-host "CamelFileHost" + :camel-file-last-modified "CamelFileLastModified" + :camel-file-length "CamelFileLength" + :camel-file-name "CamelFileName" + :camel-file-name-consumed "CamelFileNameConsumed" + :camel-file-name-only "CamelFileNameOnly" + :camel-file-name-produced "CamelFileNameProduced" + :camel-file-parent "CamelFileParent" + :camel-file-path "CamelFilePath" + :camel-file-relative-path "CamelFileRelativePath" + :camel-ftp-reply-code "CamelFtpReplyCode" + :camel-ftp-reply-string "CamelFtpReplyString" + + :camel-google-pubsub-message-id "CamelGooglePubsub.MessageId" + :camel-google-pubsub-msg-ack-id "CamelGooglePubsub.MsgAckId" + :camel-google-pubsub-attributes "CamelGooglePubsub.Attributes" + :camel-google-pubsub-publish-time "CamelGooglePubsub.PublishTime" + + :x-token "x-token" + :x-service-origin "x-service-origin" + :x-correlation-id "x-correlation-id"}) diff --git a/src/main/clj/clj_camel/util.clj b/src/main/clj/clj_camel/util.clj index 2694767..65c4cfb 100644 --- a/src/main/clj/clj_camel/util.clj +++ b/src/main/clj/clj_camel/util.clj @@ -1,6 +1,6 @@ (ns clj-camel.util - (:require [clj-camel.data.json :as json] - [clj-camel.core :refer :all] + (:require [clj-camel.core :as c] + [clj-camel.data.json :as json] [camel-snake-kebab.core :refer [->kebab-case ->kebab-case-keyword]] [camel-snake-kebab.extras :refer [transform-keys]] [clojure.java.io :as io]) @@ -42,20 +42,20 @@ (defn transform-json-to-clojure-map-with-kebabified-keys [named-node] (-> named-node - (process parse-json) - (process kebabify-keys))) + (c/process parse-json) + (c/process kebabify-keys))) (def map-to-input-stream-converter - (type-converter + (c/type-converter (-> value (pr-str) (.getBytes) (io/input-stream)))) (def exchange->map - (type-converter + (c/type-converter (-> value - (get-in-body)))) + (c/get-in-body)))) (defn set-start-time [{:keys [headers]}] {:headers (assoc headers :start-time (. System (nanoTime)))}) @@ -68,29 +68,29 @@ (defn debug-route [{:keys [ctx headers body]} route] (let [res (atom nil) - ^DefaultCamelContext ctx (or ctx (camel-context)) + ^DefaultCamelContext ctx (or ctx (c/camel-context)) ^ManagedCamelContext managed-ctx (.getExtension ctx ManagedCamelContext) pd (.createProducerTemplate ctx)] - (add-routes ctx + (c/add-routes ctx route - (route-builder (from "direct:result") - (route-id "debug-result-route") - (process (fn [msg] (reset! res msg))) - (to "mock:mock"))) + (c/route-builder (c/from "direct:result") + (c/route-id "debug-result-route") + (c/process (fn [msg] (reset! res msg))) + (c/to "mock:mock"))) (.start ctx) (spit "routes.xml" (.dumpRoutesAsXml (.getManagedCamelContext managed-ctx))) - (send-body-and-headers pd "direct:test" body headers) - (remove-route ctx "test-route") - (remove-route ctx "debug-result-route") + (c/send-body-and-headers pd "direct:test" body headers) + (c/remove-route ctx "test-route") + (c/remove-route ctx "debug-result-route") (.shutdown ctx) (Thread/sleep 100) @res)) (defn dump-route-to-xml [route] - (let [^DefaultCamelContext ctx (camel-context) + (let [^DefaultCamelContext ctx (c/camel-context) ^ManagedCamelContext managed-ctx (.getExtension ctx ManagedCamelContext)] - (add-routes ctx route) + (c/add-routes ctx route) (.start ctx) (let [xml (.dumpRoutesAsXml (.getManagedCamelContext managed-ctx))] (.shutdown ctx) - xml))) \ No newline at end of file + xml))) diff --git a/src/test/resources/throttle.xml b/src/test/resources/throttle.xml index f52f195..4fa0db6 100644 --- a/src/test/resources/throttle.xml +++ b/src/test/resources/throttle.xml @@ -5,7 +5,7 @@ test - + 20