diff --git a/.github/PULL_REQUEST_TEMPLATE b/.github/PULL_REQUEST_TEMPLATE index f47f60b5696..726d7a94ac4 100644 --- a/.github/PULL_REQUEST_TEMPLATE +++ b/.github/PULL_REQUEST_TEMPLATE @@ -12,5 +12,5 @@ _Briefly describe your proposed changes:_ - [ ] Rebased/mergeable - [ ] A test has been added if appropriate - [ ] `mvn test` completes successfully -- [ ] Commit messages are in [semantic format](https://seesparkbox.com/foundry/semantic_commit_messages) +- [ ] Commit messages are [conventional](https://www.conventionalcommits.org/en/v1.0.0/) - [ ] Sign [CLA](https://www.influxdata.com/legal/cla/) (if not already signed) diff --git a/CHANGELOG.md b/CHANGELOG.md index e302cd1006a..7d397854868 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ ### Features 1. [#337](https://github.com/influxdata/influxdb-client-java/pull/337): Supports `columns` function [FluxDSL] +1. [#347](https://github.com/influxdata/influxdb-client-java/pull/347): Add `Scala` WriteApi ### Bug Fixes 1. [#339](https://github.com/influxdata/influxdb-client-java/pull/339): Evaluation of connection string diff --git a/client-scala/src/main/scala/com/influxdb/client/scala/InfluxDBClientScala.scala b/client-scala/src/main/scala/com/influxdb/client/scala/InfluxDBClientScala.scala index 33af1e0414d..6df0f8f6d75 100644 --- a/client-scala/src/main/scala/com/influxdb/client/scala/InfluxDBClientScala.scala +++ b/client-scala/src/main/scala/com/influxdb/client/scala/InfluxDBClientScala.scala @@ -39,6 +39,13 @@ trait InfluxDBClientScala { */ @Nonnull def getQueryScalaApi(): QueryScalaApi + /** + * Create a new WriteApi client. + * + * @return the new client instance for the Write API + */ + @Nonnull def getWriteScalaApi: WriteScalaApi + /** * Get the health of an instance. * diff --git a/client-scala/src/main/scala/com/influxdb/client/scala/WriteScalaApi.scala b/client-scala/src/main/scala/com/influxdb/client/scala/WriteScalaApi.scala new file mode 100644 index 00000000000..daebec45853 --- /dev/null +++ b/client-scala/src/main/scala/com/influxdb/client/scala/WriteScalaApi.scala @@ -0,0 +1,150 @@ +/** + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.influxdb.client.scala + +import akka.Done +import akka.stream.scaladsl.Sink +import com.influxdb.client.domain.WritePrecision +import com.influxdb.client.write.{Point, WriteParameters} + +import javax.annotation.Nonnull +import scala.concurrent.Future + +/** + * The Scala API to write time-series data into InfluxDB 2.x. + * + * @author Jakub Bednar (bednar@github) (05/09/2022 09:48) + */ +trait WriteScalaApi { + /** + * Write Line Protocol record into specified bucket. + * + * @param precision Precision for the unix timestamps within the body line-protocol. + * The [[com.influxdb.client.domain.WritePrecision.NS]] will be used as the precision if not specified. + * @param bucket Specifies the destination bucket for writes. + * The [[com.influxdb.client.InfluxDBClientOptions#getBucket]] will be used as the destination + * `bucket` if the `bucket` is not specified. + * @param org Specifies the destination organization for writes. + * The [[com.influxdb.client.InfluxDBClientOptions#getOrg]] will be used as the destination `organization` + * if the `org` is not specified. + * @return the sink that accept the record specified in InfluxDB Line Protocol. The `record` is considered as one batch unit. + */ + def writeRecord(precision: Option[WritePrecision] = None, bucket: Option[String] = None, org: Option[String] = None): Sink[String, Future[Done]] + + /** + * Write Line Protocol records into specified bucket. + * + * @param precision Precision for the unix timestamps within the body line-protocol. + * The [[com.influxdb.client.domain.WritePrecision.NS]] will be used as the precision if not specified. + * @param bucket Specifies the destination bucket for writes. + * The [[com.influxdb.client.InfluxDBClientOptions#getBucket]] will be used as the destination + * `bucket` if the `bucket` is not specified. + * @param org Specifies the destination organization for writes. + * The [[com.influxdb.client.InfluxDBClientOptions#getOrg]] will be used as the destination `organization` + * if the `org` is not specified. + * @return the sink that accept the records specified in InfluxDB Line Protocol. The `records` are considered as one batch unit. + */ + def writeRecords(precision: Option[WritePrecision] = None, bucket: Option[String] = None, org: Option[String] = None): Sink[Seq[String], Future[Done]] + + /** + * Write Line Protocol records into specified bucket. + * + * @param parameters specify InfluxDB Write endpoint parameters + * @return the sink that accept the records specified in InfluxDB Line Protocol. The `records` are considered as one batch unit. + */ + def writeRecords(@Nonnull parameters: WriteParameters): Sink[Seq[String], Future[Done]] + + /** + * Write Data points into specified bucket. + * + * @param bucket Specifies the destination bucket for writes. + * The [[com.influxdb.client.InfluxDBClientOptions#getBucket]] will be used as the destination + * `bucket` if the `bucket` is not specified. + * @param org Specifies the destination organization for writes. + * The [[com.influxdb.client.InfluxDBClientOptions#getOrg]] will be used as the destination `organization` + * if the `org` is not specified. + * @return the sink that accept the Data points. The `point` is considered as one batch unit. + */ + def writePoint(bucket: Option[String] = None, org: Option[String] = None): Sink[Point, Future[Done]] + + /** + * Write Data points into specified bucket. + * + * @param bucket Specifies the destination bucket for writes. + * The [[com.influxdb.client.InfluxDBClientOptions#getBucket]] will be used as the destination + * `bucket` if the `bucket` is not specified. + * @param org Specifies the destination organization for writes. + * The [[com.influxdb.client.InfluxDBClientOptions#getOrg]] will be used as the destination `organization` + * if the `org` is not specified. + * @return the sink that accept the Data points. The `points` are considered as one batch unit. + */ + def writePoints(bucket: Option[String] = None, org: Option[String] = None): Sink[Seq[Point], Future[Done]] + + /** + * Write Data points into specified bucket. + * + * @param parameters specify InfluxDB Write endpoint parameters + * @return the sink that accept the Data points. The `points` are considered as one batch unit. + */ + def writePoints(@Nonnull parameters: WriteParameters): Sink[Seq[Point], Future[Done]] + + /** + * Write Measurement into specified bucket. + * + * @param precision Precision for the unix timestamps within the body line-protocol. + * The [[com.influxdb.client.domain.WritePrecision.NS]] will be used as the precision if not specified. + * @param bucket Specifies the destination bucket for writes. + * The [[com.influxdb.client.InfluxDBClientOptions#getBucket]] will be used as the destination + * `bucket` if the `bucket` is not specified. + * @param org Specifies the destination organization for writes. + * The [[com.influxdb.client.InfluxDBClientOptions#getOrg]] will be used as the destination `organization` + * if the `org` is not specified. + * @tparam M the type of the measurement (POJO) + * @return the sink that accept the measurement. The `measurement` is considered as one batch unit. + */ + def writeMeasurement[M](precision: Option[WritePrecision] = None, bucket: Option[String] = None, org: Option[String] = None): Sink[M, Future[Done]] + + /** + * Write Measurements into specified bucket. + * + * @param precision Precision for the unix timestamps within the body line-protocol. + * The [[com.influxdb.client.domain.WritePrecision.NS]] will be used as the precision if not specified. + * @param bucket Specifies the destination bucket for writes. + * The [[com.influxdb.client.InfluxDBClientOptions#getBucket]] will be used as the destination + * `bucket` if the `bucket` is not specified. + * @param org Specifies the destination organization for writes. + * The [[com.influxdb.client.InfluxDBClientOptions#getOrg]] will be used as the destination `organization` + * if the `org` is not specified. + * @tparam M the type of the measurement (POJO) + * @return the sink that accept the measurements. The `measurements` are considered as one batch unit. + */ + def writeMeasurements[M](precision: Option[WritePrecision] = None, bucket: Option[String] = None, org: Option[String] = None): Sink[Seq[M], Future[Done]] + + /** + * Write Measurements into specified bucket. + * + * @param parameters specify InfluxDB Write endpoint parameters + * @tparam M the type of the measurement (POJO) + * @return the sink that accept the measurements. The `measurements` are considered as one batch unit. + */ + def writeMeasurements[M](@Nonnull parameters: WriteParameters): Sink[Seq[M], Future[Done]] +} diff --git a/client-scala/src/main/scala/com/influxdb/client/scala/internal/InfluxDBClientScalaImpl.scala b/client-scala/src/main/scala/com/influxdb/client/scala/internal/InfluxDBClientScalaImpl.scala index 73f59a20e57..4cba7937e1c 100644 --- a/client-scala/src/main/scala/com/influxdb/client/scala/internal/InfluxDBClientScalaImpl.scala +++ b/client-scala/src/main/scala/com/influxdb/client/scala/internal/InfluxDBClientScalaImpl.scala @@ -25,8 +25,8 @@ import com.influxdb.LogLevel import com.influxdb.client.InfluxDBClientOptions import com.influxdb.client.domain.HealthCheck import com.influxdb.client.internal.AbstractInfluxDBClient -import com.influxdb.client.scala.{InfluxDBClientScala, QueryScalaApi} -import com.influxdb.client.service.QueryService +import com.influxdb.client.scala.{InfluxDBClientScala, QueryScalaApi, WriteScalaApi} +import com.influxdb.client.service.{QueryService, WriteService} import javax.annotation.Nonnull @@ -41,6 +41,14 @@ class InfluxDBClientScalaImpl(@Nonnull options: InfluxDBClientOptions) extends A */ override def getQueryScalaApi(): QueryScalaApi = new QueryScalaApiImpl(retrofit.create(classOf[QueryService]), options) + + /** + * Create a new WriteApi client. + * + * @return the new client instance for the Write API + */ + override def getWriteScalaApi: WriteScalaApi = new WriteScalaApiImpl(retrofit.create(classOf[WriteService]), options) + /** * Get the health of an instance. * diff --git a/client-scala/src/main/scala/com/influxdb/client/scala/internal/WriteScalaApiImpl.scala b/client-scala/src/main/scala/com/influxdb/client/scala/internal/WriteScalaApiImpl.scala new file mode 100644 index 00000000000..cd21bfb434b --- /dev/null +++ b/client-scala/src/main/scala/com/influxdb/client/scala/internal/WriteScalaApiImpl.scala @@ -0,0 +1,216 @@ +/** + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.influxdb.client.scala.internal + +import akka.Done +import akka.stream.scaladsl.{Flow, Keep, Sink} +import com.influxdb.client.InfluxDBClientOptions +import com.influxdb.client.domain.WritePrecision +import com.influxdb.client.internal.{AbstractWriteBlockingClient, AbstractWriteClient} +import com.influxdb.client.scala.WriteScalaApi +import com.influxdb.client.service.WriteService +import com.influxdb.client.write.{Point, WriteParameters} + +import javax.annotation.Nonnull +import scala.collection.immutable.ListMap +import scala.concurrent.Future +import scala.jdk.CollectionConverters._ + +/** + * @author Jakub Bednar (bednar@github) (05/09/2022 09:48) + */ +class WriteScalaApiImpl(@Nonnull service: WriteService, @Nonnull options: InfluxDBClientOptions) + + extends AbstractWriteBlockingClient(service, options) with WriteScalaApi { + + /** + * Write Line Protocol record into specified bucket. + * + * @param precision Precision for the unix timestamps within the body line-protocol. + * The [[com.influxdb.client.domain.WritePrecision.NS]] will be used as the precision if not specified. + * @param bucket Specifies the destination bucket for writes. + * The [[com.influxdb.client.InfluxDBClientOptions#getBucket]] will be used as the destination + * `bucket` if the `bucket` is not specified. + * @param org Specifies the destination organization for writes. + * The [[com.influxdb.client.InfluxDBClientOptions#getOrg]] will be used as the destination `organization` + * if the `org` is not specified. + * @return the sink that accept the record specified in InfluxDB Line Protocol. The `record` is considered as one batch unit. + */ + override def writeRecord(precision: Option[WritePrecision], bucket: Option[String], org: Option[String]): Sink[String, Future[Done]] = { + Flow[String] + .map(record => Seq(new AbstractWriteClient.BatchWriteDataRecord(record))) + .map(batch => writeHttp(precision, bucket, org, batch)) + .toMat(Sink.head)(Keep.right) + } + + /** + * Write Line Protocol records into specified bucket. + * + * @param precision Precision for the unix timestamps within the body line-protocol. + * The [[com.influxdb.client.domain.WritePrecision.NS]] will be used as the precision if not specified. + * @param bucket Specifies the destination bucket for writes. + * The [[com.influxdb.client.InfluxDBClientOptions#getBucket]] will be used as the destination + * `bucket` if the `bucket` is not specified. + * @param org Specifies the destination organization for writes. + * The [[com.influxdb.client.InfluxDBClientOptions#getOrg]] will be used as the destination `organization` + * if the `org` is not specified. + * @return the sink that accept the records specified in InfluxDB Line Protocol. + */ + override def writeRecords(precision: Option[WritePrecision], bucket: Option[String], org: Option[String]): Sink[Seq[String], Future[Done]] = { + writeRecords(toWriteParameters(precision, bucket, org)) + } + + /** + * Write Line Protocol records into specified bucket. + * + * @param parameters specify InfluxDB Write endpoint parameters + * @return the sink that accept the records specified in InfluxDB Line Protocol. The `records` are considered as one batch unit. + */ + override def writeRecords(parameters: WriteParameters): Sink[Seq[String], Future[Done]] = { + Flow[Seq[String]] + .map(records => records.map(record => new AbstractWriteClient.BatchWriteDataRecord(record))) + .map(batch => writeHttp(parameters, batch)) + .toMat(Sink.head)(Keep.right) + } + + /** + * Write Data points into specified bucket. + * + * @param bucket Specifies the destination bucket for writes. + * The [[com.influxdb.client.InfluxDBClientOptions#getBucket]] will be used as the destination + * `bucket` if the `bucket` is not specified. + * @param org Specifies the destination organization for writes. + * The [[com.influxdb.client.InfluxDBClientOptions#getOrg]] will be used as the destination `organization` + * if the `org` is not specified. + * @return the sink that accept the Data points. The `point` is considered as one batch unit. + */ + override def writePoint(bucket: Option[String], org: Option[String]): Sink[Point, Future[Done]] = { + Flow[Point] + .map(point => (point.getPrecision, Seq(new AbstractWriteClient.BatchWriteDataPoint(point, options)))) + .map(batch => writeHttp(Some(batch._1), bucket, org, batch._2)) + .toMat(Sink.head)(Keep.right) + } + + /** + * Write Data points into specified bucket. + * + * @param bucket Specifies the destination bucket for writes. + * The [[com.influxdb.client.InfluxDBClientOptions#getBucket]] will be used as the destination + * `bucket` if the `bucket` is not specified. + * @param org Specifies the destination organization for writes. + * The [[com.influxdb.client.InfluxDBClientOptions#getOrg]] will be used as the destination `organization` + * if the `org` is not specified. + * @return the sink that accept the Data points. The `points` are considered as one batch unit. + */ + override def writePoints(bucket: Option[String], org: Option[String]): Sink[Seq[Point], Future[Done]] = { + writePoints(new WriteParameters(bucket.orNull, org.orNull, null, null)) + } + + /** + * Write Data points into specified bucket. + * + * @param parameters specify InfluxDB Write endpoint parameters + * @return the sink that accept the Data points. The `points` are considered as one batch unit. + */ + override def writePoints(parameters: WriteParameters): Sink[Seq[Point], Future[Done]] = { + Flow[Seq[Point]] + // create ordered Map + .map(records => records.foldRight(ListMap.empty[WritePrecision, Seq[Point]]) { + case (point, map) => map.updated(point.getPrecision, point +: map.getOrElse(point.getPrecision, Seq())) + }.toList.reverse) + .map(grouped => grouped.map(group => (group._1, group._2.map(point => new AbstractWriteClient.BatchWriteDataPoint(point, options))))) + .map(batches => batches.foreach(batch => writeHttp(parameters.copy(batch._1, options), batch._2))) + .map(_ => Done.done()) + .toMat(Sink.head)(Keep.right) + } + + /** + * Write Measurement into specified bucket. + * + * @param precision Precision for the unix timestamps within the body line-protocol. + * The [[com.influxdb.client.domain.WritePrecision.NS]] will be used as the precision if not specified. + * @param bucket Specifies the destination bucket for writes. + * The [[com.influxdb.client.InfluxDBClientOptions#getBucket]] will be used as the destination + * `bucket` if the `bucket` is not specified. + * @param org Specifies the destination organization for writes. + * The [[com.influxdb.client.InfluxDBClientOptions#getOrg]] will be used as the destination `organization` + * if the `org` is not specified. + * @tparam M the type of the measurement (POJO) + * @return the sink that accept the measurement. The `measurement` is considered as one batch unit. + */ + override def writeMeasurement[M](precision: Option[WritePrecision], bucket: Option[String], org: Option[String]): Sink[M, Future[Done]] = { + Flow[M] + .map(measurement => { + val parameters = toWriteParameters(precision, bucket, org) + Seq(toMeasurementBatch(measurement, parameters.precisionSafe(options))) + }) + .map(batch => writeHttp(precision, bucket, org, batch)) + .toMat(Sink.head)(Keep.right) + } + + /** + * Write Measurements into specified bucket. + * + * @param precision Precision for the unix timestamps within the body line-protocol. + * The [[com.influxdb.client.domain.WritePrecision.NS]] will be used as the precision if not specified. + * @param bucket Specifies the destination bucket for writes. + * The [[com.influxdb.client.InfluxDBClientOptions#getBucket]] will be used as the destination + * `bucket` if the `bucket` is not specified. + * @param org Specifies the destination organization for writes. + * The [[com.influxdb.client.InfluxDBClientOptions#getOrg]] will be used as the destination `organization` + * if the `org` is not specified. + * @tparam M the type of the measurement (POJO) + * @return the sink that accept the measurements. The `measurements` are considered as one batch unit. + */ + override def writeMeasurements[M](precision: Option[WritePrecision], bucket: Option[String], org: Option[String]): Sink[Seq[M], Future[Done]] = { + writeMeasurements(toWriteParameters(precision, bucket, org)) + } + + /** + * Write Measurements into specified bucket. + * + * @param parameters specify InfluxDB Write endpoint parameters + * @tparam M the type of the measurement (POJO) + * @return the sink that accept the measurements. The `measurements` are considered as one batch unit. + */ + override def writeMeasurements[M](parameters: WriteParameters): Sink[Seq[M], Future[Done]] = { + Flow[Seq[M]] + .map(records => records.map(record => toMeasurementBatch(record, parameters.precisionSafe(options)))) + .map(batch => writeHttp(parameters, batch)) + .toMat(Sink.head)(Keep.right) + } + + private def writeHttp(precision: Option[WritePrecision], bucket: Option[String], org: Option[String], batch: Seq[AbstractWriteClient.BatchWriteData]): Done = { + writeHttp(toWriteParameters(precision, bucket, org), batch) + } + + private def writeHttp(parameters: WriteParameters, batch: Seq[AbstractWriteClient.BatchWriteData]): Done = { + write(parameters, batch.toList.asJava.stream()) + Done.done() + } + + private def toWriteParameters(precision: Option[WritePrecision], bucket: Option[String], org: Option[String]): WriteParameters = { + val parameters = new WriteParameters(bucket.orNull, org.orNull, precision.orNull, null) + parameters.check(options) + parameters + } +} diff --git a/client-scala/src/test/scala/com/influxdb/client/scala/ITQueryScalaApiQuery.scala b/client-scala/src/test/scala/com/influxdb/client/scala/ITQueryScalaApiQuery.scala index f5dfc2400a1..54723b0318c 100644 --- a/client-scala/src/test/scala/com/influxdb/client/scala/ITQueryScalaApiQuery.scala +++ b/client-scala/src/test/scala/com/influxdb/client/scala/ITQueryScalaApiQuery.scala @@ -22,7 +22,7 @@ package com.influxdb.client.scala import akka.actor.ActorSystem -import akka.stream.scaladsl.FileIO +import akka.stream.scaladsl.{FileIO, Keep, Source} import akka.stream.testkit.scaladsl.TestSink import akka.util.ByteString import com.influxdb.annotations.Column @@ -96,15 +96,15 @@ class ITQueryScalaApiQuery extends AbstractITQueryScalaApi with Matchers { "cpu,host=A,region=west usage_system=35i,user_usage=45i 10000000000", "cpu,host=A,region=west usage_system=38i,user_usage=49i 20000000000", "cpu,host=A,hyper-threading=true,region=west usage_system=55i,user_usage=65i 20000000000") - .mkString("\n") - - val writeApi = client.getWriteApiBlocking - writeApi.writeRecord(bucket.getName, organization.getId, WritePrecision.NS, records) client.close() influxDBClient.close() - influxDBClient = InfluxDBClientScalaFactory.create(influxDBUtils.getUrl, token.toCharArray) + influxDBClient = InfluxDBClientScalaFactory.create(influxDBUtils.getUrl, token.toCharArray, organization.getId, bucket.getName) + val sink = influxDBClient.getWriteScalaApi.writeRecords() + val future = Source.single(records).toMat(sink)(Keep.right) + Await.ready(future.run(), Duration.Inf) + queryScalaApi = influxDBClient.getQueryScalaApi() } diff --git a/client-scala/src/test/scala/com/influxdb/client/scala/InfluxDBClientScalaTest.scala b/client-scala/src/test/scala/com/influxdb/client/scala/InfluxDBClientScalaTest.scala index 369becbfb17..7192924a6d3 100644 --- a/client-scala/src/test/scala/com/influxdb/client/scala/InfluxDBClientScalaTest.scala +++ b/client-scala/src/test/scala/com/influxdb/client/scala/InfluxDBClientScalaTest.scala @@ -24,17 +24,17 @@ package com.influxdb.client.scala import akka.actor.ActorSystem import akka.stream.testkit.scaladsl.TestSink import com.influxdb.query.FluxRecord +import org.scalatest.BeforeAndAfter import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers -import org.scalatest.BeforeAndAfter /** * @author Jakub Bednar (09/06/2020 07:19) */ -class InfluxDBClientScalaTest extends AnyFunSuite with Matchers with BeforeAndAfter{ +class InfluxDBClientScalaTest extends AnyFunSuite with Matchers with BeforeAndAfter { implicit val system: ActorSystem = ActorSystem("unit-tests") - + var utils: InfluxDBUtils = _ before { diff --git a/client-scala/src/test/scala/com/influxdb/client/scala/InfluxDBUtils.scala b/client-scala/src/test/scala/com/influxdb/client/scala/InfluxDBUtils.scala index 1fe743f3b2b..bda83e4d0b7 100644 --- a/client-scala/src/test/scala/com/influxdb/client/scala/InfluxDBUtils.scala +++ b/client-scala/src/test/scala/com/influxdb/client/scala/InfluxDBUtils.scala @@ -37,8 +37,12 @@ class InfluxDBUtils extends AbstractMockServerTest { def serverMockResponse(): Unit = super.enqueuedResponse() + def serverMockErrorResponse(influxError: String): Unit = mockServer.enqueue(super.createErrorResponse(influxError)) + def serverTakeRequest(): RecordedRequest = super.takeRequest() + def getRequestCount: Int = mockServer.getRequestCount + override def generateName(prefix: String): String = super.generateName(prefix) override def getDeclaredField[V](obj: Any, field: String, `type`: Class[_]): V = super.getDeclaredField(obj, field, `type`) diff --git a/client-scala/src/test/scala/com/influxdb/client/scala/WriteScalaApiTest.scala b/client-scala/src/test/scala/com/influxdb/client/scala/WriteScalaApiTest.scala new file mode 100644 index 00000000000..2b91b914095 --- /dev/null +++ b/client-scala/src/test/scala/com/influxdb/client/scala/WriteScalaApiTest.scala @@ -0,0 +1,261 @@ +/** + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.influxdb.client.scala + +import akka.actor.ActorSystem +import akka.stream.scaladsl.{Keep, Source} +import com.influxdb.annotations.{Column, Measurement} +import com.influxdb.client.domain.WritePrecision +import com.influxdb.client.write.{Point, WriteParameters} +import com.influxdb.exceptions.InternalServerErrorException +import org.scalatest.BeforeAndAfter +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.funsuite.AnyFunSuite +import org.scalatest.matchers.should.Matchers + +import java.time.Instant +import scala.concurrent.Await +import scala.concurrent.duration.Duration +import scala.language.postfixOps + +class WriteScalaApiTest extends AnyFunSuite with Matchers with BeforeAndAfter with ScalaFutures { + + implicit val system: ActorSystem = ActorSystem("unit-tests") + + var utils: InfluxDBUtils = _ + var client: InfluxDBClientScala = _ + + before { + utils = new InfluxDBUtils {} + client = InfluxDBClientScalaFactory.create(utils.serverStart, "my-token".toCharArray, "my-org", "my-bucket") + } + + after { + utils.serverStop() + } + + test("write record") { + + utils.serverMockResponse() + + val source = Source.single("m2m,tag=a value=1i") + val sink = client.getWriteScalaApi.writeRecord() + val materialized = source.toMat(sink)(Keep.right) + + Await.ready(materialized.run(), Duration.Inf) + + utils.getRequestCount should be(1) + val request = utils.serverTakeRequest() + // check request + request.getBody.readUtf8() should be("m2m,tag=a value=1i") + request.getRequestUrl.queryParameter("bucket") should be("my-bucket") + request.getRequestUrl.queryParameter("org") should be("my-org") + request.getRequestUrl.queryParameter("precision") should be("ns") + } + + test("write records") { + + utils.serverMockResponse() + + val source = Source.single(Seq("m2m,tag=a value=1i 1", "m2m,tag=a value=2i 2")) + val sink = client.getWriteScalaApi.writeRecords() + val materialized = source.toMat(sink)(Keep.right) + + Await.ready(materialized.run(), Duration.Inf) + + utils.getRequestCount should be(1) + utils.serverTakeRequest().getBody.readUtf8() should be("m2m,tag=a value=1i 1\nm2m,tag=a value=2i 2") + } + + test("write records custom params") { + + utils.serverMockResponse() + + val source = Source.single("m2m,tag=a value=1i 1").map(it => Seq(it)) + val sink = client.getWriteScalaApi.writeRecords(new WriteParameters("my-bucket-2", null, null, null)) + val materialized = source.toMat(sink)(Keep.right) + + Await.ready(materialized.run(), Duration.Inf) + + utils.getRequestCount should be(1) + val request = utils.serverTakeRequest() + request.getBody.readUtf8() should be("m2m,tag=a value=1i 1") + request.getRequestUrl.queryParameter("bucket") should be("my-bucket-2") + } + + test("write records error propagation") { + + utils.serverMockErrorResponse("line protocol poorly formed and no points were written") + + val source = Source.single(Seq("m2m,tag=a value=1i 1", "m2m,tag=a value=2i 2")) + val sink = client.getWriteScalaApi.writeRecords() + val materialized = source.toMat(sink)(Keep.right) + + whenReady(materialized.run().failed) { exc => { + exc.getMessage should be("line protocol poorly formed and no points were written") + exc.getClass should be(classOf[InternalServerErrorException]) + } + } + } + + test("write point") { + + utils.serverMockResponse() + + val point = Point + .measurement("h2o") + .addTag("location", "europe") + .addField("level", 1) + .time(1L, WritePrecision.NS) + + val source = Source.single(point) + val sink = client.getWriteScalaApi.writePoint() + val materialized = source.toMat(sink)(Keep.right) + + Await.ready(materialized.run(), Duration.Inf) + + utils.getRequestCount should be(1) + val request = utils.serverTakeRequest() + // check request + request.getBody.readUtf8() should be("h2o,location=europe level=1i 1") + request.getRequestUrl.queryParameter("bucket") should be("my-bucket") + request.getRequestUrl.queryParameter("org") should be("my-org") + request.getRequestUrl.queryParameter("precision") should be("ns") + } + + test("write points") { + + utils.serverMockResponse() + + val point1 = Point + .measurement("h2o") + .addTag("location", "europe") + .addField("level", 1) + .time(1L, WritePrecision.NS) + + val point2 = Point + .measurement("h2o") + .addTag("location", "europe") + .addField("level", 2) + .time(2L, WritePrecision.NS) + + val source = Source.single(Seq(point1, point2)) + val sink = client.getWriteScalaApi.writePoints() + val materialized = source.toMat(sink)(Keep.right) + + Await.ready(materialized.run(), Duration.Inf) + + utils.getRequestCount should be(1) + utils.serverTakeRequest().getBody.readUtf8() should be("h2o,location=europe level=1i 1\nh2o,location=europe level=2i 2") + } + + test("write points different precision") { + + utils.serverMockResponse() + utils.serverMockResponse() + + val point1 = Point + .measurement("h2o") + .addTag("location", "europe") + .addField("level", 1) + .time(1L, WritePrecision.NS) + + val point2 = Point + .measurement("h2o") + .addTag("location", "europe") + .addField("level", 2) + .time(2L, WritePrecision.S) + + val source = Source.single(Seq(point1, point2)) + val sink = client.getWriteScalaApi.writePoints() + val materialized = source.toMat(sink)(Keep.right) + + Await.ready(materialized.run(), Duration.Inf) + + utils.getRequestCount should be(2) + utils.serverTakeRequest().getBody.readUtf8() should be("h2o,location=europe level=1i 1") + utils.serverTakeRequest().getBody.readUtf8() should be("h2o,location=europe level=2i 2") + } + + test("write measurement") { + + utils.serverMockResponse() + + val measurement = new H2O() + measurement.location = "coyote_creek" + measurement.level = 2.927 + measurement.description = "below 3 feet" + measurement.time = Instant.ofEpochMilli(1440046800L) + + val source = Source.single(measurement) + val sink = client.getWriteScalaApi.writeMeasurement() + val materialized = source.toMat(sink)(Keep.right) + + Await.ready(materialized.run(), Duration.Inf) + + utils.getRequestCount should be(1) + val request = utils.serverTakeRequest() + // check request + request.getBody.readUtf8() should be("h2o,location=coyote_creek level\\ description=\"below 3 feet\",water_level=2.927 1440046800000000") + request.getRequestUrl.queryParameter("bucket") should be("my-bucket") + request.getRequestUrl.queryParameter("org") should be("my-org") + request.getRequestUrl.queryParameter("precision") should be("ns") + } + + test("write measurements") { + + utils.serverMockResponse() + + val measurement1 = new H2O() + measurement1.location = "coyote_creek" + measurement1.level = 2.927 + measurement1.description = "below 3 feet" + measurement1.time = Instant.ofEpochMilli(1440046800L) + + val measurement2 = new H2O() + measurement2.location = "europe" + measurement2.level = 10 + measurement2.description = "below 3 feet" + measurement2.time = Instant.ofEpochMilli(1440046800L) + + val source = Source.single(Seq(measurement1, measurement2)) + val sink = client.getWriteScalaApi.writeMeasurements() + val materialized = source.toMat(sink)(Keep.right) + + Await.ready(materialized.run(), Duration.Inf) + + utils.getRequestCount should be(1) + utils.serverTakeRequest().getBody.readUtf8() should be("h2o,location=coyote_creek level\\ description=\"below 3 feet\",water_level=2.927 1440046800000000\nh2o,location=europe level\\ description=\"below 3 feet\",water_level=10.0 1440046800000000") + } + + @Measurement(name = "h2o") + class H2O() { + @Column(name = "location", tag = true) + var location: String = _ + @Column(name = "water_level") + var level: Double = _ + @Column(name = "level description") + var description: String = _ + @Column(name = "time", timestamp = true) + var time: Instant = _ + } +} diff --git a/examples/README.md b/examples/README.md index 59de800d58a..04cbbd91672 100644 --- a/examples/README.md +++ b/examples/README.md @@ -20,4 +20,13 @@ This directory contains Java, Kotlin and Scala examples. ### Writes - [KotlinWriteApi.kt](src/main/java/example/KotlinWriteApi.kt) - How to ingest data by `DataPoint`, `LineProtocol` or `Data class` - [KotlinWriteBatchingByFlow.kt](src/main/java/example/KotlinWriteBatchingByFlow.kt) - How to use [Flow](https://kotlinlang.org/docs/flow.html) operators to prepare batches for synchronous write into InfluxDB - \ No newline at end of file + +## Scala + +### Query +- [ScalaQuery.scala](src/main/java/example/ScalaQuery.scala) - How to query data into a stream of `FluxRecord` and filter them by `Flow` operators +- [ScalaQueryRaw.scala](src/main/java/example/ScalaQueryRaw.scala) - How to query data into a stream of `String` +- [ScalaQueryDSL.scala](src/main/java/example/ScalaQueryDSL.scala) - How to use the [FluxDSL](../flux-dsl) to query data + +### Writes +- [ScalaWriteApi.scala](src/main/java/example/ScalaWriteApi.scala) - How to ingest data by `DataPoint`, `LineProtocol` or `POJO` \ No newline at end of file diff --git a/examples/src/main/java/example/InfluxDB2ScalaExample.scala b/examples/src/main/java/example/ScalaQuery.scala similarity index 96% rename from examples/src/main/java/example/InfluxDB2ScalaExample.scala rename to examples/src/main/java/example/ScalaQuery.scala index 079d56cb2b8..db25594a648 100644 --- a/examples/src/main/java/example/InfluxDB2ScalaExample.scala +++ b/examples/src/main/java/example/ScalaQuery.scala @@ -29,9 +29,9 @@ import com.influxdb.query.FluxRecord import scala.concurrent.Await import scala.concurrent.duration.Duration -object InfluxDB2ScalaExample { +object ScalaQuery { - implicit val system: ActorSystem = ActorSystem("it-tests") + implicit val system: ActorSystem = ActorSystem("examples") def main(args: Array[String]): Unit = { diff --git a/examples/src/main/java/example/InfluxDB2ScalaExampleDSL.scala b/examples/src/main/java/example/ScalaQueryDSL.scala similarity index 94% rename from examples/src/main/java/example/InfluxDB2ScalaExampleDSL.scala rename to examples/src/main/java/example/ScalaQueryDSL.scala index c744002ef71..7f1dcfa54ca 100644 --- a/examples/src/main/java/example/InfluxDB2ScalaExampleDSL.scala +++ b/examples/src/main/java/example/ScalaQueryDSL.scala @@ -33,11 +33,11 @@ import com.influxdb.query.dsl.functions.restriction.Restrictions import scala.concurrent.Await import scala.concurrent.duration.Duration -object InfluxDB2ScalaExampleDSL { +object ScalaQueryDSL { - implicit val system: ActorSystem = ActorSystem("it-tests") + implicit val system: ActorSystem = ActorSystem("examples") - def main(args: Array[String]) { + def main(args: Array[String]): Unit = { val influxDBClient = InfluxDBClientScalaFactory .create("http://localhost:8086", "my-token".toCharArray, "my-org") diff --git a/examples/src/main/java/example/InfluxDB2ScalaExampleRaw.scala b/examples/src/main/java/example/ScalaQueryRaw.scala similarity index 95% rename from examples/src/main/java/example/InfluxDB2ScalaExampleRaw.scala rename to examples/src/main/java/example/ScalaQueryRaw.scala index db6edd61971..c13ab87bde9 100644 --- a/examples/src/main/java/example/InfluxDB2ScalaExampleRaw.scala +++ b/examples/src/main/java/example/ScalaQueryRaw.scala @@ -28,9 +28,9 @@ import com.influxdb.client.scala.InfluxDBClientScalaFactory import scala.concurrent.Await import scala.concurrent.duration.Duration -object InfluxDB2ScalaExampleRaw { +object ScalaQueryRaw { - implicit val system: ActorSystem = ActorSystem("it-tests") + implicit val system: ActorSystem = ActorSystem("examples") def main(args: Array[String]): Unit = { val influxDBClient = InfluxDBClientScalaFactory diff --git a/examples/src/main/java/example/ScalaWriteApi.scala b/examples/src/main/java/example/ScalaWriteApi.scala new file mode 100644 index 00000000000..7b8bbda0e55 --- /dev/null +++ b/examples/src/main/java/example/ScalaWriteApi.scala @@ -0,0 +1,94 @@ +/** + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package example + +import akka.actor.ActorSystem +import akka.stream.scaladsl.{Keep, Source} +import com.influxdb.annotations.{Column, Measurement} +import com.influxdb.client.domain.WritePrecision +import com.influxdb.client.scala.InfluxDBClientScalaFactory +import com.influxdb.client.write.Point + +import java.time.Instant +import scala.concurrent.Await +import scala.concurrent.duration.Duration + +object ScalaWriteApi { + + implicit val system: ActorSystem = ActorSystem("examples") + + def main(args: Array[String]): Unit = { + + val client = InfluxDBClientScalaFactory.create( + "http://localhost:8086", "my-token".toCharArray, "my-org", "my-bucket") + + // + // Use InfluxDB Line Protocol to write data + // + val record = "mem,host=host1 used_percent=23.43234543" + + val source = Source.single(record) + val sink = client.getWriteScalaApi.writeRecord() + val materialized = source.toMat(sink)(Keep.right) + Await.result(materialized.run(), Duration.Inf) + + // + // Use a Data Point to write data + // + val point = Point + .measurement("mem") + .addTag("host", "host1") + .addField("used_percent", 23.43234543) + .time(Instant.now(), WritePrecision.NS) + + val sourcePoint = Source.single(point) + val sinkPoint = client.getWriteScalaApi.writePoint() + val materializedPoint = sourcePoint.toMat(sinkPoint)(Keep.right) + Await.result(materializedPoint.run(), Duration.Inf) + + // + // Use POJO and corresponding class to write data + // + val mem = new Mem() + mem.host = "host1" + mem.used_percent = 23.43234543 + mem.time = Instant.now + + val sourcePOJO = Source.single(mem) + val sinkPOJO = client.getWriteScalaApi.writeMeasurement() + val materializedPOJO = sourcePOJO.toMat(sinkPOJO)(Keep.right) + Await.result(materializedPOJO.run(), Duration.Inf) + + client.close() + system.terminate() + } + + @Measurement(name = "mem") + class Mem() { + @Column(tag = true) + var host: String = _ + @Column + var used_percent: Double = _ + @Column(timestamp = true) + var time: Instant = _ + } +}