diff --git a/.travis.yml b/.travis.yml index 4d145d1..323785e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,7 +2,7 @@ language: java scala: - 2.12.4 jdk: -- oraclejdk8 +- oraclejdk9 cache: directories: - $HOME/.gradle/wrapper/ diff --git a/README.md b/README.md index a5adaa0..267c5bc 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,6 @@ Finally-tagless implementation of client for misc. storages represented by backends. Supports backends fallbacks. Currently supported backends: -1. [Stor](stor/README.md) 1. [HCP](hcp/README.md) ## Dependency @@ -66,21 +65,3 @@ val backend: StorageBackend[Task] = ??? val client = StorageClient(backend) ``` - -### Using strict types (Future etc.) - -The library supports only `F[_]: Effect` by default which makes some common (strict) types like `scala.concurrent.Future` impossible to use. -There exists a workaround however: - -```scala -import com.avast.clients.storage.{StorageBackend, StorageClient} -import monix.eval.Task -import monix.execution.Scheduler - -implicit val scheduler: Scheduler = ??? -implicit val taskToFuture: Task ~> Future = ??? - -val backend: StorageBackend[Task] = ??? - -val client: StorageClient[Future] = StorageClient[Task](backend).mapK[Future] -``` diff --git a/build.gradle b/build.gradle index 431fd3c..0228be0 100644 --- a/build.gradle +++ b/build.gradle @@ -7,14 +7,15 @@ buildscript { dependencies { classpath 'com.jfrog.bintray.gradle:gradle-bintray-plugin:1.7.3' classpath 'com.github.maiflai:gradle-scalatest:0.19' + classpath "com.github.ben-manes:gradle-versions-plugin:0.27.0" } } ext { - circeVersion = "0.9.3" - metricsVersion = "2.4.4" - http4sVersion = "0.18.15" - monixVersion = "3.0.0-RC1" + circeVersion = "0.13.0" + metricsVersion = "2.6.6" + http4sVersion = "0.21.1" + monixVersion = "3.1.0" // Used only in tests. } subprojects { @@ -24,6 +25,8 @@ subprojects { apply plugin: 'maven' apply plugin: 'maven-publish' apply plugin: 'com.github.maiflai.scalatest' + apply plugin: "com.github.ben-manes.versions" + sourceCompatibility = '1.8' targetCompatibility = '1.8' @@ -45,14 +48,14 @@ subprojects { dependencies { compile 'org.slf4j:jul-to-slf4j:1.7.23' compile 'org.slf4j:jcl-over-slf4j:1.7.23' - compile 'com.typesafe.scala-logging:scala-logging_2.12:3.9.0' + compile 'com.typesafe.scala-logging:scala-logging_2.12:3.9.2' testCompile "org.http4s:http4s-blaze-server_2.12:$http4sVersion" - testCompile "ch.qos.logback:logback-classic:1.1.8" - testCompile 'junit:junit:4.12' + testCompile "ch.qos.logback:logback-classic:1.2.3" + testCompile 'junit:junit:4.13' testCompile "org.scalatest:scalatest_2.12:3.0.5" - testCompile 'org.mockito:mockito-core:2.21.0' + testCompile 'org.mockito:mockito-core:3.3.1' testCompile "org.pegdown:pegdown:1.6.0" } diff --git a/core/build.gradle b/core/build.gradle index 40718c3..a374421 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -1,20 +1,17 @@ archivesBaseName = "storage-client-core_2.12" dependencies { - compile 'com.avast.hashes:scala-hashes_2.12:1.0.1' + compile 'com.avast.hashes:scala-hashes_2.12:1.1.1' - compile 'com.github.pathikrit:better-files_2.12:3.4.0' + compile 'com.github.pathikrit:better-files_2.12:3.8.0' - compile "io.monix:monix_2.12:$monixVersion" - compile "org.typelevel:cats-core_2.12:1.2.0" - compile "com.kailuowang:mainecoon-core_2.12:0.6.2" + compile "org.typelevel:cats-core_2.12:2.1.1" - compile 'commons-codec:commons-codec:1.11' - compile 'commons-io:commons-io:2.6' - - compile 'com.typesafe:config:1.3.1' - compile "com.github.pureconfig:pureconfig_2.12:0.9.1" - compile "com.github.pureconfig:pureconfig-http4s_2.12:0.9.1" + compile 'com.typesafe:config:1.4.0' + compile "com.github.pureconfig:pureconfig_2.12:0.12.3" + compile "com.github.pureconfig:pureconfig-http4s_2.12:0.12.3" compile "com.avast.metrics:metrics-scala_2.12:$metricsVersion" + + testCompile "io.monix:monix_2.12:$monixVersion" } \ No newline at end of file diff --git a/core/src/main/scala/com/avast/clients/storage/FileCopier.scala b/core/src/main/scala/com/avast/clients/storage/FileCopier.scala deleted file mode 100644 index 868c950..0000000 --- a/core/src/main/scala/com/avast/clients/storage/FileCopier.scala +++ /dev/null @@ -1,71 +0,0 @@ -package com.avast.clients.storage - -import java.io.{InputStream, OutputStream} -import java.security.MessageDigest - -import com.avast.scala.hashes.Sha256 -import org.apache.commons.io.IOUtils - -class FileCopier { - - private val lock = new Object - - private val digest: MessageDigest = MessageDigest.getInstance("SHA-256") - - private var finalHash: Option[Sha256] = None // scalastyle:ignore - - def copy(is: InputStream, os: OutputStream): Long = lock.synchronized { - val fis = new ProxyInputStream(is)(digest.update(_)) - - IOUtils.copyLarge(fis, os) - } - - def finalSha256: Sha256 = lock.synchronized { - finalHash match { - case Some(hash) => hash - case None => - val hash = Sha256(digest.digest()) - finalHash = Some(hash) - hash - } - } -} - -private class ProxyInputStream(is: InputStream)(handle: Array[Byte] => Unit) extends InputStream { - - override def read(): Int = { - val b = is.read() - - if (b != -1) handle(Array(b.toByte)) - - b - } - - override def read(b: Array[Byte]): Int = { - val bytes = is.read(b) - - handle(b.slice(0, bytes)) - - bytes - } - - override def read(b: Array[Byte], off: Int, len: Int): Int = { - val bytes = is.read(b, off, len) - - handle(b.slice(off, off + bytes)) - - bytes - } - - override def markSupported(): Boolean = is.markSupported() - - override def available(): Int = is.available() - - override def skip(n: Long): Long = is.skip(n) - - override def reset(): Unit = is.reset() - - override def close(): Unit = is.close() - - override def mark(readlimit: Int): Unit = is.mark(readlimit) -} diff --git a/core/src/main/scala/com/avast/clients/storage/StorageBackend.scala b/core/src/main/scala/com/avast/clients/storage/StorageBackend.scala index 374a1f1..5b7770c 100644 --- a/core/src/main/scala/com/avast/clients/storage/StorageBackend.scala +++ b/core/src/main/scala/com/avast/clients/storage/StorageBackend.scala @@ -9,5 +9,4 @@ trait StorageBackend[F[_]] extends AutoCloseable { def head(sha256: Sha256): F[Either[StorageException, HeadResult]] def get(sha256: Sha256, dest: File = File.newTemporaryFile(prefix = "stor")): F[Either[StorageException, GetResult]] - } diff --git a/core/src/main/scala/com/avast/clients/storage/StorageException.scala b/core/src/main/scala/com/avast/clients/storage/StorageException.scala index 64ee18e..a373b59 100644 --- a/core/src/main/scala/com/avast/clients/storage/StorageException.scala +++ b/core/src/main/scala/com/avast/clients/storage/StorageException.scala @@ -3,11 +3,9 @@ package com.avast.clients.storage sealed abstract class StorageException(msg: String, cause: Throwable = null) extends Exception(msg, cause) object StorageException { - case class InvalidResponseException(status: Int, body: String, desc: String, cause: Throwable = null) extends StorageException(s"Invalid response with status $status: $desc", cause) case class InvalidDataException(status: Int, body: String, desc: String, cause: Throwable = null) extends StorageException(s"Invalid response data [ with status $status ]: $desc", cause) - } diff --git a/core/src/main/scala/com/avast/clients/storage/results.scala b/core/src/main/scala/com/avast/clients/storage/results.scala index 820401f..4d88f50 100644 --- a/core/src/main/scala/com/avast/clients/storage/results.scala +++ b/core/src/main/scala/com/avast/clients/storage/results.scala @@ -5,19 +5,15 @@ import better.files.File sealed trait HeadResult object HeadResult { - case class Exists(length: Long) extends HeadResult case object NotFound extends HeadResult - } sealed trait GetResult object GetResult { - case class Downloaded(file: File, fileSize: Long) extends GetResult case object NotFound extends GetResult - } diff --git a/core/src/main/scala/com/avast/clients/storage/storage.scala b/core/src/main/scala/com/avast/clients/storage/storage.scala index 9101a7d..c0f7872 100644 --- a/core/src/main/scala/com/avast/clients/storage/storage.scala +++ b/core/src/main/scala/com/avast/clients/storage/storage.scala @@ -1,21 +1,14 @@ package com.avast.clients import better.files.File +import cats.Monad import cats.data.EitherT -import cats.{~>, Monad} import com.avast.scala.hashes.Sha256 -import mainecoon.FunctorK import scala.language.higherKinds package object storage { - implicit class StorageClientOps[F[_]](val client: StorageClient[F]) extends AnyVal { - def mapK[G[_]](implicit fToG: F ~> G): StorageClient[G] = { - storageClientFunctorK.mapK(client)(fToG) - } - } - implicit class StorageBackendOps[F[_]: Monad](val backend: StorageBackend[F]) { def withFallbackIfError(fallback: StorageBackend[F]): StorageBackend[F] = new StorageBackend[F] { override def head(sha256: Sha256): F[Either[StorageException, HeadResult]] = { @@ -38,17 +31,21 @@ package object storage { def withFallbackIfNotFound(fallback: StorageBackend[F]): StorageBackend[F] = new StorageBackend[F] { override def head(sha256: Sha256): F[Either[StorageException, HeadResult]] = { - EitherT(backend.head(sha256)).flatMap[StorageException, HeadResult] { - case r: HeadResult.Exists => EitherT.rightT(r) - case HeadResult.NotFound => EitherT(fallback.head(sha256)) - }.value + EitherT(backend.head(sha256)) + .flatMap[StorageException, HeadResult] { + case r: HeadResult.Exists => EitherT.rightT(r) + case HeadResult.NotFound => EitherT(fallback.head(sha256)) + } + .value } override def get(sha256: Sha256, dest: File): F[Either[StorageException, GetResult]] = { - EitherT(backend.get(sha256, dest)).flatMap[StorageException, GetResult] { - case r: GetResult.Downloaded => EitherT.rightT(r) - case GetResult.NotFound => EitherT(fallback.get(sha256, dest)) - }.value + EitherT(backend.get(sha256, dest)) + .flatMap[StorageException, GetResult] { + case r: GetResult.Downloaded => EitherT.rightT(r) + case GetResult.NotFound => EitherT(fallback.get(sha256, dest)) + } + .value } override def close(): Unit = { @@ -57,32 +54,4 @@ package object storage { } } } - - implicit val storageBackendFunctorK: FunctorK[StorageBackend] = new FunctorK[StorageBackend] { - override def mapK[F[_], G[_]](backend: StorageBackend[F])(fToG: F ~> G): StorageBackend[G] = new StorageBackend[G] { - override def head(sha256: Sha256): G[Either[StorageException, HeadResult]] = fToG { - backend.head(sha256) - } - - override def get(sha256: Sha256, dest: File): G[Either[StorageException, GetResult]] = fToG { - backend.get(sha256, dest) - } - - override def close(): Unit = backend.close() - } - } - - implicit val storageClientFunctorK: FunctorK[StorageClient] = new FunctorK[StorageClient] { - override def mapK[F[_], G[_]](client: StorageClient[F])(fToG: F ~> G): StorageClient[G] = new StorageClient[G] { - override def head(sha256: Sha256): G[Either[StorageException, HeadResult]] = fToG { - client.head(sha256) - } - - override def get(sha256: Sha256, dest: File): G[Either[StorageException, GetResult]] = fToG { - client.get(sha256, dest) - } - - override def close(): Unit = client.close() - } - } } diff --git a/core/src/test/scala/com/avast/clients/storage/FileCopierTest.scala b/core/src/test/scala/com/avast/clients/storage/FileCopierTest.scala deleted file mode 100644 index acc1e36..0000000 --- a/core/src/test/scala/com/avast/clients/storage/FileCopierTest.scala +++ /dev/null @@ -1,31 +0,0 @@ -package com.avast.clients.storage - -import java.io.{ByteArrayInputStream, ByteArrayOutputStream} - -import org.scalatest.FunSuite - -class FileCopierTest extends FunSuite { - test("basic") { - val bis = new ByteArrayInputStream("helloworld".getBytes) - val bos = new ByteArrayOutputStream() - - val copier = new FileCopier - - copier.copy(bis, bos) - - assertResult("helloworld")(bos.toString) - assertResult("936a185caaa266bb9cbe981e9e05cb78cd732b0b3280eb944412bb6f8f8f07af")(copier.finalSha256.toString()) - } - - test("basic2") { - val bis = new ByteArrayInputStream("936a185caaa266bb9cbe981e9e05cb78cd732b0b3280eb944412bb6f8f8f07af".getBytes) - val bos = new ByteArrayOutputStream() - - val copier = new FileCopier - - copier.copy(bis, bos) - - assertResult("936a185caaa266bb9cbe981e9e05cb78cd732b0b3280eb944412bb6f8f8f07af")(bos.toString) - assertResult("cda6fa38611c153780f3a234cd986c51eb8de001aeab5fb5ca1c66506f0c83b4")(copier.finalSha256.toString) - } -} diff --git a/core/src/test/scala/com/avast/clients/storage/StorageBackendOpsTest.scala b/core/src/test/scala/com/avast/clients/storage/StorageBackendOpsTest.scala index dcd8a86..843c1a9 100644 --- a/core/src/test/scala/com/avast/clients/storage/StorageBackendOpsTest.scala +++ b/core/src/test/scala/com/avast/clients/storage/StorageBackendOpsTest.scala @@ -30,9 +30,9 @@ class StorageBackendOpsTest extends FunSuite with ScalaFutures { val merged = first.withFallbackIfError(second) - assertResult(Right(HeadResult.Exists(42)))(merged.head(randomSha).runAsync.futureValue) + assertResult(Right(HeadResult.Exists(42)))(merged.head(randomSha).runToFuture.futureValue) val dest = File.newTemporaryFile() - assertResult(Right(GetResult.Downloaded(dest, 42)))(merged.get(randomSha, dest).runAsync.futureValue) + assertResult(Right(GetResult.Downloaded(dest, 42)))(merged.get(randomSha, dest).runToFuture.futureValue) } test("withFallbackIfError - fallback used") { @@ -56,8 +56,8 @@ class StorageBackendOpsTest extends FunSuite with ScalaFutures { val merged = first.withFallbackIfError(second) - assertResult(Right(HeadResult.Exists(42)))(merged.head(randomSha).runAsync.futureValue) + assertResult(Right(HeadResult.Exists(42)))(merged.head(randomSha).runToFuture.futureValue) val dest = File.newTemporaryFile() - assertResult(Right(GetResult.Downloaded(dest, 42)))(merged.get(randomSha, dest).runAsync.futureValue) + assertResult(Right(GetResult.Downloaded(dest, 42)))(merged.get(randomSha, dest).runToFuture.futureValue) } } diff --git a/core/src/test/scala/com/avast/clients/storage/StorageClientOpsTest.scala b/core/src/test/scala/com/avast/clients/storage/StorageClientOpsTest.scala deleted file mode 100644 index 1e8b299..0000000 --- a/core/src/test/scala/com/avast/clients/storage/StorageClientOpsTest.scala +++ /dev/null @@ -1,33 +0,0 @@ -package com.avast.clients.storage - -import better.files.File -import com.avast.scala.hashes.Sha256 -import monix.eval.Task -import org.scalatest.FunSuite -import monix.execution.Scheduler.Implicits.global - -import scala.concurrent.Future -import TestImplicits._ -import org.scalatest.concurrent.ScalaFutures - -class StorageClientOpsTest extends FunSuite with ScalaFutures { - test("mapK") { - - val backend = new StorageBackend[Task] { - override def head(sha256: Sha256): Task[Either[StorageException, HeadResult]] = Task.now(Right(HeadResult.Exists(42))) - - override def get(sha256: Sha256, dest: File): Task[Either[StorageException, GetResult]] = { - Task.now(Right(GetResult.Downloaded(dest, 42))) - } - - override def close(): Unit = () - } - - val client: StorageClient[Task] = new DefaultStorageClient[Task](backend) - val futureClient: StorageClient[Future] = client.mapK[Future] - - val sha = randomSha - - assertResult(client.head(sha).runAsync.futureValue)(futureClient.head(sha).futureValue) - } -} diff --git a/core/src/test/scala/com/avast/clients/storage/TestImplicits.scala b/core/src/test/scala/com/avast/clients/storage/TestImplicits.scala index 86e130a..366ccff 100644 --- a/core/src/test/scala/com/avast/clients/storage/TestImplicits.scala +++ b/core/src/test/scala/com/avast/clients/storage/TestImplicits.scala @@ -26,9 +26,4 @@ object TestImplicits { def newInputStream: InputStream = new ByteArrayInputStream(s.getBytes) } - - implicit def fkTaskToFuture(implicit ec: ExecutionContext): Task ~> Future = new FunctionK[Task, Future] { - override def apply[A](fa: Task[A]): Future[A] = fa.runAsync(Scheduler(ec)) - } - } diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 83e857f..ea46bbb 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -3,4 +3,4 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-4.6-all.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-4.10.3-all.zip diff --git a/hcp/build.gradle b/hcp/build.gradle index 53f90c2..ef4162c 100644 --- a/hcp/build.gradle +++ b/hcp/build.gradle @@ -10,4 +10,6 @@ dependencies { compile "io.circe:circe-parser_2.12:$circeVersion" compile "io.circe:circe-generic_2.12:$circeVersion" compile "io.circe:circe-generic-extras_2.12:$circeVersion" + + testCompile "io.monix:monix_2.12:$monixVersion" } \ No newline at end of file diff --git a/hcp/src/main/resources/reference.conf b/hcp/src/main/resources/reference.conf index c235f02..7486151 100644 --- a/hcp/src/main/resources/reference.conf +++ b/hcp/src/main/resources/reference.conf @@ -1,12 +1,18 @@ hcpRestBackendDefaults { + protocol = "http" //namespace = "" // REQUIRED //tenant = "" // REQUIRED //repository = "" // REQUIRED - protocol = "http" - requestTimeout = 10 minutes - socketTimeout = 30 seconds - responseHeaderTimeout = 10 seconds + //username = "" // REQUIRED + //password = "" // REQUIRED + + responseHeaderTimeout = 10 seconds // duration between the submission of a request and the completion of the response header. Does not include time to read the response body + requestTimeout = 10 minutes // maximum duration from the submission of a request through reading the body before a timeout + idleTimeout = 11 minutes // duration that a connection can wait without traffic being read or written before timeout + + maxWainQueueLimit = 100 // maximum number of requests waiting for a connection at any specific time - maxConnections = 10 + maxConnections = 10 // maximum connections the client will have at any specific time + maxConnectionsPerNode = 10 // maximum connections the client will have at any specific time to one specific HCP node/IP address } diff --git a/hcp/src/main/scala/com/avast/clients/storage/hcp/HcpRestStorageBackend.scala b/hcp/src/main/scala/com/avast/clients/storage/hcp/HcpRestStorageBackend.scala index fa4cfc4..fe9dc60 100644 --- a/hcp/src/main/scala/com/avast/clients/storage/hcp/HcpRestStorageBackend.scala +++ b/hcp/src/main/scala/com/avast/clients/storage/hcp/HcpRestStorageBackend.scala @@ -1,37 +1,51 @@ package com.avast.clients.storage.hcp -import java.io.ByteArrayInputStream +import java.net.{Inet4Address, Inet6Address, InetAddress} +import java.nio.charset.StandardCharsets +import java.nio.file.StandardOpenOption +import java.security.{DigestOutputStream, MessageDigest} +import java.util.Base64 +import java.util.concurrent.ThreadLocalRandom import better.files.File import cats.data.NonEmptyList -import cats.effect.Effect +import cats.effect.implicits._ +import cats.effect.{Async, Blocker, ConcurrentEffect, ContextShift, Resource, Sync} import cats.syntax.all._ import com.avast.clients.storage.hcp.HcpRestStorageBackend._ -import com.avast.clients.storage.{ConfigurationException, FileCopier, GetResult, HeadResult, StorageBackend, StorageException} +import com.avast.clients.storage.{ConfigurationException, GetResult, HeadResult, StorageBackend, StorageException} +import com.avast.scala.hashes import com.avast.scala.hashes.Sha256 import com.typesafe.config.{Config, ConfigFactory} import com.typesafe.scalalogging.StrictLogging +import org.http4s.Uri.{Authority, Ipv4Address, Ipv6Address} import org.http4s.client.Client -import org.http4s.client.blaze.{BlazeClientConfig, Http1Client} -import org.http4s.headers.`Content-Length` +import org.http4s.client.blaze.BlazeClientBuilder +import org.http4s.headers.{`Content-Length`, `User-Agent`, AgentProduct} import org.http4s.{Method, Request, Response, Status, _} import pureconfig._ import pureconfig.error.ConfigReaderException +import pureconfig.generic.ProductHint +import pureconfig.generic.auto._ import scala.concurrent.ExecutionContext import scala.concurrent.duration.Duration import scala.language.higherKinds import scala.util.control.NonFatal -class HcpRestStorageBackend[F[_]](rootUri: Uri, httpClient: Client[F])(implicit F: Effect[F]) extends StorageBackend[F] with StrictLogging { +class HcpRestStorageBackend[F[_]: Sync: ContextShift](baseUrl: Uri, username: String, password: String, httpClient: Client[F])( + blocker: Blocker)(implicit F: Async[F]) + extends StorageBackend[F] + with StrictLogging { + + private val baseUrlAuthority = baseUrl.authority.getOrElse(throw ConfigurationException(s"BaseUrl $baseUrl is missing path part")) + private val authenticationHeader = composeAuthenticationHeader(username, password) + override def head(sha256: Sha256): F[Either[StorageException, HeadResult]] = { logger.debug(s"Checking presence of file $sha256 in HCP") - val finalUri: Uri = splitFileName(sha256).foldLeft(rootUri)(_ / _) + val relativeUrl: Uri = composeFileUrl(sha256) try { - val request = Request[F]( - Method.HEAD, - finalUri - ) + val request = prepareRequest(Method.HEAD, relativeUrl) httpClient.fetch(request) { resp => resp.status match { @@ -39,19 +53,20 @@ class HcpRestStorageBackend[F[_]](rootUri: Uri, httpClient: Client[F])(implicit `Content-Length`.from(resp.headers) match { case Some(`Content-Length`(length)) => F.pure(Right(HeadResult.Exists(length))) case None => - resp.bodyAsText.compile.toList.map(_.mkString).map { body => - Left(StorageException.InvalidResponseException(resp.status.code, body.toString, "Missing Content-Length header")) + resp.bodyAsText.compile.toList.map { body => + Left(StorageException.InvalidResponseException(resp.status.code, body.mkString, "Missing Content-Length header")) } } case Status.NotFound => F.pure(Right(HeadResult.NotFound)) case _ => - resp.bodyAsText.compile.toList.map(_.mkString).map { body => - Left(StorageException.InvalidResponseException(resp.status.code, body.toString, "Unexpected status")) + resp.bodyAsText.compile.toList.map { body => + Left(StorageException.InvalidResponseException(resp.status.code, body.mkString, "Unexpected status")) } } } + } catch { case NonFatal(e) => F.raiseError(e) } @@ -60,138 +75,196 @@ class HcpRestStorageBackend[F[_]](rootUri: Uri, httpClient: Client[F])(implicit override def get(sha256: Sha256, dest: File): F[Either[StorageException, GetResult]] = { logger.debug(s"Getting file $sha256 from HCP") - val finalUri: Uri = splitFileName(sha256).foldLeft(rootUri)(_ / _) + val relativeUrl: Uri = composeFileUrl(sha256) try { - val request = Request[F]( - Method.GET, - finalUri - ) + val request = prepareRequest(Method.GET, relativeUrl) - httpClient.fetch(request) { resp => - resp.status match { - case Status.Ok => receiveStreamedFile(sha256, dest, resp) - case Status.NotFound => F.pure(Right(GetResult.NotFound)) + httpClient + .fetch(request) { resp => + resp.status match { + case Status.Ok => receiveStreamedFile(resp, dest, sha256) + case Status.NotFound => F.pure(Right(GetResult.NotFound)) - case _ => - resp.bodyAsText.compile.toList.map(_.mkString).map { body => - Left(StorageException.InvalidResponseException(resp.status.code, body.toString, "Unexpected status")) - } + case _ => + resp.bodyAsText.compile.toList.map { body => + Left(StorageException.InvalidResponseException(resp.status.code, body.mkString, "Unexpected status")) + } + } } - - } } catch { case NonFatal(e) => F.raiseError(e) } } - private def receiveStreamedFile(sha256: Sha256, dest: File, resp: Response[F]): F[Either[StorageException, GetResult]] = { - `Content-Length`.from(resp.headers) match { - case Some(clh) => - val fileCopier = new FileCopier - val fileOs = dest.newOutputStream - - resp.body.chunks - .map { bytes => - val bis = new ByteArrayInputStream(bytes.toArray) - try { - fileCopier.copy(bis, fileOs) - } finally { - bis.close() - } - } - .compile - .toVector - .map { chunksSizes => - val transferred = chunksSizes.sum + override def close(): Unit = () - fileOs.close() // all data has been transferred + private def composeFileUrl(sha256: Sha256): Uri = { + splitFileName(sha256).foldLeft(RelativeUrlBase)(_ / _) + } - if (clh.length != transferred) { - Left(StorageException.InvalidDataException(resp.status.code, "-stream-", s"Expected ${clh.length} B but got $transferred B")) - } else { - val transferredSha = fileCopier.finalSha256 + private def prepareRequest(method: Method, relative: Uri): Request[F] = { + val hostName = baseUrlAuthority.host.value + // Using JVM DNS cache, see: https://javaeesupportpatterns.blogspot.com/2011/03/java-dns-cache-reference-guide.html + val addresses = InetAddress.getAllByName(hostName) + val address = addresses(ThreadLocalRandom.current().nextInt(addresses.size)) + val ipHost = address match { + case ipv4: Inet4Address => Ipv4Address.unsafeFromString(ipv4.getHostAddress) + case ipv6: Inet6Address => Ipv6Address.unsafeFromString(ipv6.getHostAddress) + } - if (transferredSha != sha256) { - Left { - StorageException.InvalidDataException(resp.status.code, "-stream-", s"Expected SHA256 $sha256 but got $transferredSha") - } - } else { - Right(GetResult.Downloaded(dest, transferred)) - } - } - } + val url = baseUrl.copy(authority = Some(Authority(host = ipHost, port = baseUrlAuthority.port)), path = relative.path) - case None => F.pure(Left(StorageException.InvalidResponseException(resp.status.code, "-stream-", "Missing Content-Length header"))) - } + Request[F](method, url, headers = Headers.of(Header("Connection", "close"), Header("Host", hostName), authenticationHeader)) } - override def close(): Unit = httpClient.shutdownNow() + private def receiveStreamedFile(response: Response[F], + destination: File, + expectedHash: Sha256): F[Either[StorageException, GetResult]] = { + logger.debug(s"Downloading streamed data to $destination") + + val openOptions = Seq(StandardOpenOption.WRITE, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING) + + blocker + .blockOn { + F.delay(destination.newOutputStream(openOptions)) + .bracket { fileStream => + F.delay(new DigestOutputStream(fileStream, MessageDigest.getInstance("SHA-256"))) + .bracket { stream => + response.body.chunks + .map { ch => + stream.write(ch.toArray) + ch.size + } + .compile + .toList + .map { transferred => + val totalSize = transferred.map(_.toLong).sum + (totalSize, Sha256(stream.getMessageDigest.digest)) + } + }(stream => F.delay(stream.close())) + }(fileStream => F.delay(fileStream.close())) + } + .map { + case (transferred, sha256) => + verifyResult(destination, transferred, response.contentLength, sha256, expectedHash, response.status.code) + } + } + + private def verifyResult(file: File, + transferred: Long, + expectedSize: Option[Long], + hash: Sha256, + expectedHash: Sha256, + statusCode: Int): Either[StorageException, GetResult] = { + + expectedSize match { + case Some(contentLength) => + if (contentLength != transferred) { + Left { + StorageException.InvalidDataException(statusCode, "-stream-", s"Expected $contentLength B but got $transferred B") + } + } else if (expectedHash != hash) { + Left { + StorageException.InvalidDataException(statusCode, "-stream-", s"Expected SHA256 $expectedHash but got $hash") + } + } else { + Right { + GetResult.Downloaded(file, transferred) + } + } + case None => + Left { + StorageException.InvalidResponseException(statusCode, "-stream-", "Missing Content-Length header") + } + } + } } object HcpRestStorageBackend { - private val RootConfigKey = "hcpRestBackendDefaults" - private val DefaultConfig = ConfigFactory.defaultReference().getConfig(RootConfigKey) + private val RelativeUrlBase = Uri(path = "/rest") - // configure pureconfig: - private implicit val ph: ProductHint[HcpRestBackendConfiguration] = ProductHint[HcpRestBackendConfiguration]( - fieldMapping = ConfigFieldMapping(CamelCase, CamelCase) - ) + def fromConfig[F[_]: ConcurrentEffect: ContextShift](config: Config, blocker: Blocker)( + implicit ec: ExecutionContext): Either[ConfigurationException, Resource[F, HcpRestStorageBackend[F]]] = { - def fromConfig[F[_]: Effect](config: Config)( - implicit ec: ExecutionContext): Either[ConfigurationException, F[HcpRestStorageBackend[F]]] = { - - def assembleUri(repository: String, namespace: String, tenant: String, protocol: String): Either[ConfigurationException, Uri] = { + def assembleUri(protocol: String, namespace: String, tenant: String, repository: String): Either[ConfigurationException, Uri] = { Uri - .fromString(s"$protocol://$namespace.$tenant.$repository/rest") + .fromString(s"$protocol://$namespace.$tenant.$repository") .leftMap(ConfigurationException("Could not assemble final URI", _)) - } - def loadConfig: Either[ConfigurationException, HcpRestBackendConfiguration] = { - pureconfig - .loadConfig[HcpRestBackendConfiguration](config.withFallback(DefaultConfig)) + def composeConfig: Either[ConfigurationException, HcpRestBackendConfiguration] = { + val DefaultConfig = ConfigFactory.defaultReference().getConfig("hcpRestBackendDefaults") + pureconfig.ConfigSource + .fromConfig(config.withFallback(DefaultConfig)) + .load[HcpRestBackendConfiguration] .leftMap { failures => ConfigurationException("Could not load config", new ConfigReaderException[HcpRestBackendConfiguration](failures)) } } for { - conf <- loadConfig - uri <- { + conf <- composeConfig + baseUri <- { import conf._ - assembleUri(repository, namespace, tenant, protocol) + assembleUri(protocol = protocol, namespace = namespace, tenant = tenant, repository = repository) } + clientBuilder = conf.toBlazeClientBuilder[F](ec) } yield { - Http1Client[F](conf.toBlazeConfig.copy(executionContext = ec)) - .map(new HcpRestStorageBackend[F](uri, _)) + clientBuilder.resource.map { httpClient => + new HcpRestStorageBackend(baseUri, username = conf.username, password = conf.password, httpClient)(blocker) + } } } - private[storage] def splitFileName(sha256: Sha256): NonEmptyList[String] = { + private[hcp] def splitFileName(sha256: Sha256): NonEmptyList[String] = { val str = sha256.toString() - NonEmptyList.of(str.substring(0, 2), str.substring(2, 4), str.substring(4, 6), str) } -} -private case class HcpRestBackendConfiguration(repository: String, - namespace: String, - tenant: String, - protocol: String, - requestTimeout: Duration, - socketTimeout: Duration, - responseHeaderTimeout: Duration, - maxConnections: Int, - userAgent: Option[String]) { - def toBlazeConfig: BlazeClientConfig = BlazeClientConfig.defaultConfig.copy( - requestTimeout = requestTimeout, - maxTotalConnections = maxConnections, - maxConnectionsPerRequestKey = _ => maxConnections, - responseHeaderTimeout = responseHeaderTimeout, - idleTimeout = socketTimeout, - userAgent = userAgent.map { - org.http4s.headers.`User-Agent`.parse(_).getOrElse(throw new IllegalArgumentException("Unsupported format of user-agent provided")) + private def composeAuthenticationHeader(username: String, password: String): Header.Raw = { + val encodedUserName = Base64.getEncoder.encodeToString(username.getBytes) + val encodedPassword = { + val stringBytes = password.getBytes(StandardCharsets.UTF_8) + hashes.bytes2hex { + MessageDigest.getInstance("MD5").digest(stringBytes) + } } + + Header("Authorization", s"HCP $encodedUserName:$encodedPassword") + } +} + +case class HcpRestBackendConfiguration(protocol: String, + namespace: String, + tenant: String, + repository: String, + username: String, + password: String, + responseHeaderTimeout: Duration, + requestTimeout: Duration, + idleTimeout: Duration, + maxConnections: Int, + maxConnectionsPerNode: Int, + maxWainQueueLimit: Int, + userAgent: Option[String]) { + + def toBlazeClientBuilder[F[_]: ConcurrentEffect](executionContext: ExecutionContext): BlazeClientBuilder[F] = { + BlazeClientBuilder + .apply[F](executionContext) + .withResponseHeaderTimeout(responseHeaderTimeout) + .withRequestTimeout(requestTimeout) + .withIdleTimeout(idleTimeout) + .withMaxTotalConnections(maxConnections) + .withMaxConnectionsPerRequestKey(_ => maxConnectionsPerNode) + .withUserAgentOption(userAgent.map(v => `User-Agent`(AgentProduct(v)))) + .withMaxWaitQueueLimit(maxWainQueueLimit) + } +} + +object HcpRestBackendConfiguration { + // configure pureconfig: + implicit val productHint: ProductHint[HcpRestBackendConfiguration] = ProductHint[HcpRestBackendConfiguration]( + fieldMapping = ConfigFieldMapping(CamelCase, CamelCase) ) } diff --git a/hcp/src/test/scala/com/avast/clients/storage/hcp/HcpRestStorageBackendTest.scala b/hcp/src/test/scala/com/avast/clients/storage/hcp/HcpRestStorageBackendTest.scala index 180099c..96a0f36 100644 --- a/hcp/src/test/scala/com/avast/clients/storage/hcp/HcpRestStorageBackendTest.scala +++ b/hcp/src/test/scala/com/avast/clients/storage/hcp/HcpRestStorageBackendTest.scala @@ -1,22 +1,25 @@ package com.avast.clients.storage.hcp import cats.data.NonEmptyList -import cats.effect.IO +import cats.effect.{Blocker, Resource} import com.avast.clients.storage.hcp.TestImplicits._ import com.avast.clients.storage.{GetResult, HeadResult} import com.avast.scala.hashes.Sha256 import monix.eval.Task import monix.execution.Scheduler.Implicits.global -import org.http4s.client.blaze.Http1Client -import org.http4s.dsl.io._ +import org.http4s.client.blaze.BlazeClientBuilder +import org.http4s.dsl.Http4sDsl import org.http4s.headers.`Content-Length` -import org.http4s.server.blaze.BlazeBuilder -import org.http4s.{HttpService, Uri} +import org.http4s.implicits._ +import org.http4s.server.blaze.BlazeServerBuilder +import org.http4s.{HttpRoutes, Uri} import org.scalatest.FunSuite import org.scalatest.concurrent.ScalaFutures import org.scalatest.mockito.MockitoSugar import org.scalatest.time.{Seconds, Span} -class HcpRestStorageBackendTest extends FunSuite with ScalaFutures with MockitoSugar { +import scala.concurrent.duration._ + +class HcpRestStorageBackendTest extends FunSuite with ScalaFutures with MockitoSugar with Http4sDsl[Task] { implicit val p: PatienceConfig = PatienceConfig(timeout = Span(5, Seconds)) test("head") { @@ -25,7 +28,7 @@ class HcpRestStorageBackendTest extends FunSuite with ScalaFutures with MockitoS val sha = content.sha256 val shaStr = sha.toString() - val service = HttpService[IO] { + val service = HttpRoutes.of[Task] { case request @ HEAD -> urlPath => request .as[String] @@ -44,19 +47,13 @@ class HcpRestStorageBackendTest extends FunSuite with ScalaFutures with MockitoS Ok().map(_.putHeaders(`Content-Length`.unsafeFromLong(fileSize))) } - } - val server = BlazeBuilder[IO].bindHttp(port = 0).mountService(service).start.unsafeRunSync() - - val httpClient = Http1Client[Task]().runAsync.futureValue - - val client = new HcpRestStorageBackend( - Uri.fromString(s"http://localhost:${server.address.getPort}/rest").getOrElse(fail()), - httpClient - ) - - val Right(HeadResult.Exists(size)) = client.head(sha).runAsync.futureValue + val Right(HeadResult.Exists(size)) = composeTestEnv(service) + .use { target => + target.head(sha) + } + .runSyncUnsafe(10.seconds) assertResult(fileSize)(size) } @@ -67,7 +64,7 @@ class HcpRestStorageBackendTest extends FunSuite with ScalaFutures with MockitoS val sha = content.sha256 val shaStr = sha.toString() - val service = HttpService[IO] { + val service = HttpRoutes.of[Task] { case request @ GET -> urlPath => request .as[String] @@ -89,16 +86,11 @@ class HcpRestStorageBackendTest extends FunSuite with ScalaFutures with MockitoS } - val server = BlazeBuilder[IO].bindHttp(port = 0).mountService(service).start.unsafeRunSync() - - val httpClient = Http1Client[Task]().runAsync.futureValue - - val client = new HcpRestStorageBackend( - Uri.fromString(s"http://localhost:${server.address.getPort}/rest").getOrElse(fail()), - httpClient - ) - - val Right(GetResult.Downloaded(file, size)) = client.get(sha).runAsync.futureValue + val Right(GetResult.Downloaded(file, size)) = composeTestEnv(service) + .use { target => + target.get(sha) + } + .runSyncUnsafe(10.seconds) assertResult(sha.toString.toLowerCase)(file.sha256.toLowerCase) assertResult(fileSize)(size) @@ -112,4 +104,18 @@ class HcpRestStorageBackendTest extends FunSuite with ScalaFutures with MockitoS NonEmptyList("d0", List("5a", "f9", "d05af9a8494696906e8eec79843ca1e4bf408c280616a121ed92f9e92e2de831")) }(HcpRestStorageBackend.splitFileName(sha)) } + + private def composeTestEnv(service: HttpRoutes[Task]): Resource[Task, HcpRestStorageBackend[Task]] = { + for { + server <- BlazeServerBuilder[Task].bindHttp(port = 0).withHttpApp(service.orNotFound).resource + httpClient <- BlazeClientBuilder[Task](monix.execution.Scheduler.Implicits.global).resource + blocker = Blocker.liftExecutionContext(monix.execution.Scheduler.io()) + storageBackend = new HcpRestStorageBackend( + Uri.unsafeFromString(s"http://localhost:${server.address.getPort}"), + "N/A", + "N/A", + httpClient + )(blocker) + } yield storageBackend + } } diff --git a/hcp/src/test/scala/com/avast/clients/storage/hcp/TestImplicits.scala b/hcp/src/test/scala/com/avast/clients/storage/hcp/TestImplicits.scala index 78ca44c..02ffbc8 100644 --- a/hcp/src/test/scala/com/avast/clients/storage/hcp/TestImplicits.scala +++ b/hcp/src/test/scala/com/avast/clients/storage/hcp/TestImplicits.scala @@ -3,13 +3,8 @@ package com.avast.clients.storage.hcp import java.io.{ByteArrayInputStream, InputStream} import java.security.MessageDigest -import cats.arrow.FunctionK -import cats.~> import com.avast.scala.hashes.Sha256 -import monix.eval.Task -import monix.execution.Scheduler -import scala.concurrent.{ExecutionContext, Future} import scala.util.Random /* Utils for testing. */ @@ -26,9 +21,4 @@ object TestImplicits { def newInputStream: InputStream = new ByteArrayInputStream(s.getBytes) } - - implicit def fkTaskToFuture(implicit ec: ExecutionContext): Task ~> Future = new FunctionK[Task, Future] { - override def apply[A](fa: Task[A]): Future[A] = fa.runAsync(Scheduler(ec)) - } - } diff --git a/settings.gradle b/settings.gradle index e948f82..3de701b 100644 --- a/settings.gradle +++ b/settings.gradle @@ -1,3 +1,3 @@ rootProject.name = 'storage-client' -include "core", "stor", "hcp" \ No newline at end of file +include "core", "hcp" \ No newline at end of file diff --git a/stor/README.md b/stor/README.md deleted file mode 100644 index 94267d5..0000000 --- a/stor/README.md +++ /dev/null @@ -1,36 +0,0 @@ -# Stor backend - -Backend for querying [Stor](https://github.com/avast/stor). - -## Dependency - -```groovy -compile "com.avast.clients.storage:storage-client-stor_2.12:x.x.x" -``` - -## Usage - -Configuration: - -```hocon -uri = "my-stor.uri.com" -``` - -Client init, example for `monix.eval.Task`: - -```scala -import com.avast.clients.storage.stor.StorBackend -import com.typesafe.config.Config -import monix.eval.Task -import monix.execution.Scheduler - -implicit val scheduler: Scheduler = ??? -val config: Config = ??? - -def initClient: Task[StorBackend[Task]] = { - StorBackend.fromConfig[Task](config) match { - case Right(cl) => cl - case Left(err) => Task.raiseError(err) - } -} -``` \ No newline at end of file diff --git a/stor/build.gradle b/stor/build.gradle deleted file mode 100644 index 14beadc..0000000 --- a/stor/build.gradle +++ /dev/null @@ -1,13 +0,0 @@ -archivesBaseName = "storage-client-stor_2.12" - -dependencies { - compile project(":core") - - compile "org.http4s:http4s-dsl_2.12:$http4sVersion" - compile "org.http4s:http4s-blaze-client_2.12:$http4sVersion" - - compile "io.circe:circe-core_2.12:$circeVersion" - compile "io.circe:circe-parser_2.12:$circeVersion" - compile "io.circe:circe-generic_2.12:$circeVersion" - compile "io.circe:circe-generic-extras_2.12:$circeVersion" -} \ No newline at end of file diff --git a/stor/src/main/resources/reference.conf b/stor/src/main/resources/reference.conf deleted file mode 100644 index 3ed5cd4..0000000 --- a/stor/src/main/resources/reference.conf +++ /dev/null @@ -1,9 +0,0 @@ -storBackendDefaults { -// uri = "" // REQUIRED - - requestTimeout = 10 minutes - socketTimeout = 30 seconds - responseHeaderTimeout = 10 seconds - - maxConnections = 10 -} diff --git a/stor/src/main/scala/com/avast/clients/storage/stor/StorBackend.scala b/stor/src/main/scala/com/avast/clients/storage/stor/StorBackend.scala deleted file mode 100644 index 9d6561a..0000000 --- a/stor/src/main/scala/com/avast/clients/storage/stor/StorBackend.scala +++ /dev/null @@ -1,171 +0,0 @@ -package com.avast.clients.storage.stor - -import java.io.ByteArrayInputStream - -import better.files.File -import cats.effect.Effect -import cats.syntax.all._ -import com.avast.clients.storage._ -import com.avast.scala.hashes.Sha256 -import com.typesafe.config.{Config, ConfigFactory} -import com.typesafe.scalalogging.StrictLogging -import org.http4s._ -import org.http4s.client.Client -import org.http4s.client.blaze.{BlazeClientConfig, Http1Client} -import org.http4s.headers.`Content-Length` -import pureconfig.error.ConfigReaderException -import pureconfig.modules.http4s.uriReader -import pureconfig._ - -import scala.concurrent.ExecutionContext -import scala.concurrent.duration.Duration -import scala.language.higherKinds -import scala.util.control.NonFatal - -class StorBackend[F[_]](rootUri: Uri, httpClient: Client[F])(implicit F: Effect[F]) extends StorageBackend[F] with StrictLogging { - - override def head(sha256: Sha256): F[Either[StorageException, HeadResult]] = { - logger.debug(s"Checking presence of file $sha256 in Stor") - - try { - val request = Request[F]( - Method.HEAD, - rootUri / sha256.toString - ) - - httpClient.fetch(request) { resp => - resp.status match { - case Status.Ok => - `Content-Length`.from(resp.headers) match { - case Some(`Content-Length`(length)) => F.pure(Right(HeadResult.Exists(length))) - case None => - resp.bodyAsText.compile.toList.map(_.mkString).map { body => - Left(StorageException.InvalidResponseException(resp.status.code, body.toString, "Missing Content-Length header")) - } - } - case Status.NotFound => - F.pure(Right(HeadResult.NotFound)) - - case _ => - resp.bodyAsText.compile.toList.map(_.mkString).map { body => - Left(StorageException.InvalidResponseException(resp.status.code, body.toString, "Unexpected status")) - } - } - } - } catch { - case NonFatal(e) => F.raiseError(e) - } - } - - override def get(sha256: Sha256, dest: File): F[Either[StorageException, GetResult]] = { - logger.debug(s"Getting file $sha256 from Stor") - - try { - val request = Request[F]( - Method.GET, - rootUri / sha256.toString - ) - - httpClient.fetch(request) { resp => - resp.status match { - case Status.Ok => receiveStreamedFile(sha256, dest, resp) - case Status.NotFound => F.pure(Right(GetResult.NotFound)) - - case _ => - resp.bodyAsText.compile.toList.map(_.mkString).map { body => - Left(StorageException.InvalidResponseException(resp.status.code, body.toString, "Unexpected status")) - } - } - - } - } catch { - case NonFatal(e) => F.raiseError(e) - } - } - - override def close(): Unit = httpClient.shutdownNow() - - private def receiveStreamedFile(sha256: Sha256, dest: File, resp: Response[F]): F[Either[StorageException, GetResult]] = { - `Content-Length`.from(resp.headers) match { - case Some(clh) => - val fileCopier = new FileCopier - val fileOs = dest.newOutputStream - - resp.body.chunks - .map { bytes => - val bis = new ByteArrayInputStream(bytes.toArray) - try { - fileCopier.copy(bis, fileOs) - } finally { - bis.close() - } - } - .compile - .toVector - .onError { - case NonFatal(_) => F.delay(fileOs.close()) - } - .map { chunksSizes => - val transferred = chunksSizes.sum - - fileOs.close() // all data has been transferred - - if (clh.length != transferred) { - Left(StorageException.InvalidDataException(resp.status.code, "-stream-", s"Expected ${clh.length} B but got $transferred B")) - } else { - val transferredSha = fileCopier.finalSha256 - - if (transferredSha != sha256) { - Left { - StorageException.InvalidDataException(resp.status.code, "-stream-", s"Expected SHA256 $sha256 but got $transferredSha") - } - } else { - Right(GetResult.Downloaded(dest, transferred)) - } - } - } - - case None => F.pure(Left(StorageException.InvalidResponseException(resp.status.code, "-stream-", "Missing Content-Length header"))) - } - } -} - -object StorBackend { - private val RootConfigKey = "storBackendDefaults" - private val DefaultConfig = ConfigFactory.defaultReference().getConfig(RootConfigKey) - - // configure pureconfig: - private implicit val ph: ProductHint[StorBackendConfiguration] = ProductHint[StorBackendConfiguration]( - fieldMapping = ConfigFieldMapping(CamelCase, CamelCase) - ) - - def fromConfig[F[_]: Effect](config: Config)(implicit ec: ExecutionContext): Either[ConfigurationException, F[StorBackend[F]]] = { - pureconfig - .loadConfig[StorBackendConfiguration](config.withFallback(DefaultConfig)) - .map { conf => - Http1Client[F](conf.toBlazeConfig.copy(executionContext = ec)) - .map(new StorBackend[F](conf.uri, _)) - } - .leftMap { failures => - ConfigurationException("Could not load config", new ConfigReaderException[StorBackendConfiguration](failures)) - } - } -} - -private case class StorBackendConfiguration(uri: Uri, - requestTimeout: Duration, - socketTimeout: Duration, - responseHeaderTimeout: Duration, - maxConnections: Int, - userAgent: Option[String]) { - def toBlazeConfig: BlazeClientConfig = BlazeClientConfig.defaultConfig.copy( - requestTimeout = requestTimeout, - maxTotalConnections = maxConnections, - maxConnectionsPerRequestKey = _ => maxConnections, - responseHeaderTimeout = responseHeaderTimeout, - idleTimeout = socketTimeout, - userAgent = userAgent.map { - org.http4s.headers.`User-Agent`.parse(_).getOrElse(throw new IllegalArgumentException("Unsupported format of user-agent provided")) - } - ) -} diff --git a/stor/src/main/scala/com/avast/clients/storage/stor/stor.scala b/stor/src/main/scala/com/avast/clients/storage/stor/stor.scala deleted file mode 100644 index 2c48b30..0000000 --- a/stor/src/main/scala/com/avast/clients/storage/stor/stor.scala +++ /dev/null @@ -1,5 +0,0 @@ -package com.avast.clients.storage - -import scala.language.higherKinds - -package object stor {} diff --git a/stor/src/test/scala/com/avast/clients/storage/stor/StorBackendTest.scala b/stor/src/test/scala/com/avast/clients/storage/stor/StorBackendTest.scala deleted file mode 100644 index 63543a9..0000000 --- a/stor/src/test/scala/com/avast/clients/storage/stor/StorBackendTest.scala +++ /dev/null @@ -1,88 +0,0 @@ -package com.avast.clients.storage.stor - -import cats.effect.IO -import com.avast.clients.storage.stor.TestImplicits._ -import com.avast.clients.storage.{GetResult, HeadResult} -import com.avast.scala.hashes.Sha256 -import monix.eval.Task -import monix.execution.Scheduler.Implicits.global -import org.http4s.client.blaze.Http1Client -import org.http4s.dsl.io._ -import org.http4s.headers.`Content-Length` -import org.http4s.server.blaze.BlazeBuilder -import org.http4s.{HttpService, Uri} -import org.scalatest.FunSuite -import org.scalatest.concurrent.ScalaFutures -import org.scalatest.mockito.MockitoSugar -import org.scalatest.time.{Seconds, Span} - -class StorBackendTest extends FunSuite with ScalaFutures with MockitoSugar { - implicit val p: PatienceConfig = PatienceConfig(timeout = Span(5, Seconds)) - - test("head") { - val fileSize = 1000000 - val content = randomString(fileSize) - val sha = content.sha256 - - val service = HttpService[IO] { - case request @ HEAD -> fileSha => - request - .as[String] - .flatMap { body => - assertResult(sha)(Sha256(fileSha.toList.head)) - assertResult(0)(body.length) - - Ok().map(_.putHeaders(`Content-Length`.unsafeFromLong(fileSize))) - } - - } - - val server = BlazeBuilder[IO].bindHttp(port = 0).mountService(service).start.unsafeRunSync() - - val httpClient = Http1Client[Task]().runAsync.futureValue - - val client = new StorBackend( - Uri.fromString(s"http://localhost:${server.address.getPort}").getOrElse(fail()), - httpClient - ) - - val Right(HeadResult.Exists(size)) = client.head(sha).runAsync.futureValue - - assertResult(fileSize)(size) - } - - test("get") { - val fileSize = 1000000 - val content = randomString(fileSize) - val sha = content.sha256 - - val service = HttpService[IO] { - case request @ GET -> fileSha => - request - .as[String] - .flatMap { body => - assertResult(sha)(Sha256(fileSha.toList.head)) - assertResult(0)(body.length) - - Ok(content) - } - - } - - val server = BlazeBuilder[IO].bindHttp(port = 0).mountService(service).start.unsafeRunSync() - - val httpClient = Http1Client[Task]().runAsync.futureValue - - val client = new StorBackend( - Uri.fromString(s"http://localhost:${server.address.getPort}").getOrElse(fail()), - httpClient - ) - - val Right(GetResult.Downloaded(file, size)) = client.get(sha).runAsync.futureValue - - assertResult(sha.toString.toLowerCase)(file.sha256.toLowerCase) - assertResult(fileSize)(size) - assertResult(fileSize)(file.size) - } - -} diff --git a/stor/src/test/scala/com/avast/clients/storage/stor/TestImplicits.scala b/stor/src/test/scala/com/avast/clients/storage/stor/TestImplicits.scala deleted file mode 100644 index 03d05e7..0000000 --- a/stor/src/test/scala/com/avast/clients/storage/stor/TestImplicits.scala +++ /dev/null @@ -1,25 +0,0 @@ -package com.avast.clients.storage.stor - -import java.io.{ByteArrayInputStream, InputStream} -import java.security.MessageDigest - -import com.avast.scala.hashes.Sha256 - -import scala.util.Random - -/* Utils for testing. */ -object TestImplicits { - def randomSha: Sha256 = Sha256(Stream.continually(Random.nextInt(9)).take(64).mkString) - - def randomString(length: Int = 100): String = Random.alphanumeric.take(length).mkString - - implicit class StringOps(val s: String) extends AnyVal { - def sha256: Sha256 = { - val digest = MessageDigest.getInstance("SHA-256") - Sha256(digest.digest(s.getBytes)) - } - - def newInputStream: InputStream = new ByteArrayInputStream(s.getBytes) - } - -}