From 4b2fd6832fdfbd7c0bd83aef95b3585ab4d7c127 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Charv=C3=A1t?= <20499772+mi-char@users.noreply.github.com> Date: Mon, 5 Jun 2023 15:14:36 +0200 Subject: [PATCH] feat: Add GCS backend (#182) feat: Add GCS backend --- README.md | 3 +- build.gradle | 1 + gcs/README.md | 36 +++ gcs/build.gradle | 9 + gcs/src/main/resources/reference.conf | 5 + .../storage/gcs/GcsStorageBackend.scala | 216 ++++++++++++++++++ .../storage/gcs/GcsStorageBackendTest.scala | 96 ++++++++ .../clients/storage/gcs/TestImplicits.scala | 18 ++ settings.gradle | 2 +- 9 files changed, 384 insertions(+), 2 deletions(-) create mode 100644 gcs/README.md create mode 100644 gcs/build.gradle create mode 100644 gcs/src/main/resources/reference.conf create mode 100644 gcs/src/main/scala/com/avast/clients/storage/gcs/GcsStorageBackend.scala create mode 100644 gcs/src/test/scala/com/avast/clients/storage/gcs/GcsStorageBackendTest.scala create mode 100644 gcs/src/test/scala/com/avast/clients/storage/gcs/TestImplicits.scala diff --git a/README.md b/README.md index ab9f6cf..3be1ba8 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,8 @@ Finally-tagless implementation of client for misc. storages represented by backends. Supports backends fallbacks. Currently supported backends: -1. [HCP](hcp/README.md) +1. [HCP (Hitachi Content Platform)](hcp/README.md) +2. [GCS (Google Cloud Storage)](gcs/README.md) ## Dependency diff --git a/build.gradle b/build.gradle index 7e07844..2966070 100644 --- a/build.gradle +++ b/build.gradle @@ -13,6 +13,7 @@ plugins { ext { metricsVersion = "2.10.4" http4sVersion = "0.22.12" + gcsVersion = "2.22.2" monixVersion = "3.4.1" // Used only in tests. } diff --git a/gcs/README.md b/gcs/README.md new file mode 100644 index 0000000..3e67701 --- /dev/null +++ b/gcs/README.md @@ -0,0 +1,36 @@ +# GCS (Google Cloud Storage) backend + +## Dependency + +```groovy +compile "com.avast.clients.storage:storage-client-gcs_2.13:x.x.x" +``` + +## Usage + +Configuration: + +```hocon +projectId = "my-project-id" +bucketName = "bucket-name" +``` + +Client init, example for `monix.eval.Task`: + +```scala +import com.avast.clients.storage.gcs.GcsStorageBackend +import com.typesafe.config.Config +import monix.eval.Task +import monix.execution.Scheduler +import cats.effect.Blocker + +implicit val scheduler: Scheduler = ??? +val blocker: Blocker = ??? +val config: Config = ??? + +GcsStorageBackend.fromConfig[Task](config, blocker).map{ resource => + resource.use { client => + client.get(sha256, destinationFile) + } +} +``` \ No newline at end of file diff --git a/gcs/build.gradle b/gcs/build.gradle new file mode 100644 index 0000000..beb00a6 --- /dev/null +++ b/gcs/build.gradle @@ -0,0 +1,9 @@ +archivesBaseName = "storage-client-gcs_2.13" + +dependencies { + api project(":core") + + implementation "com.google.cloud:google-cloud-storage:$gcsVersion" + + testImplementation "io.monix:monix_2.13:$monixVersion" +} \ No newline at end of file diff --git a/gcs/src/main/resources/reference.conf b/gcs/src/main/resources/reference.conf new file mode 100644 index 0000000..7b52f38 --- /dev/null +++ b/gcs/src/main/resources/reference.conf @@ -0,0 +1,5 @@ +gcsBackendDefaults { + //projectId = "" // REQUIRED + //bucketName = "" // REQUIRED + //jsonKeyPath = "" // REQUIRED if using service account authentication (see https://github.com/googleapis/google-cloud-java#using-a-service-account-recommended) +} \ No newline at end of file diff --git a/gcs/src/main/scala/com/avast/clients/storage/gcs/GcsStorageBackend.scala b/gcs/src/main/scala/com/avast/clients/storage/gcs/GcsStorageBackend.scala new file mode 100644 index 0000000..8b25ed6 --- /dev/null +++ b/gcs/src/main/scala/com/avast/clients/storage/gcs/GcsStorageBackend.scala @@ -0,0 +1,216 @@ +package com.avast.clients.storage.gcs + +import better.files.File +import cats.data.EitherT +import cats.effect.implicits.catsEffectSyntaxBracket +import cats.effect.{Blocker, ContextShift, Resource, Sync} +import cats.syntax.all._ +import com.avast.clients.storage.gcs.GcsStorageBackend.composeBlobPath +import com.avast.clients.storage.{ConfigurationException, GetResult, HeadResult, StorageBackend, StorageException} +import com.avast.scala.hashes.Sha256 +import com.google.auth.oauth2.ServiceAccountCredentials +import com.google.cloud.ServiceOptions +import com.google.cloud.storage.{Blob, Bucket, Storage, StorageOptions, StorageException => GcStorageException} +import com.typesafe.config.{Config, ConfigFactory} +import com.typesafe.scalalogging.StrictLogging +import pureconfig.error.ConfigReaderException +import pureconfig.generic.ProductHint +import pureconfig.generic.auto._ +import pureconfig.{CamelCase, ConfigFieldMapping} + +import java.io.FileInputStream +import java.nio.file.StandardOpenOption +import java.security.{DigestOutputStream, MessageDigest} + +class GcsStorageBackend[F[_]: Sync: ContextShift](bucket: Bucket)(blocker: Blocker) extends StorageBackend[F] with StrictLogging { + private val FileStreamOpenOptions = Seq(StandardOpenOption.WRITE, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING) + + override def head(sha256: Sha256): F[Either[StorageException, HeadResult]] = { + { + for { + _ <- Sync[F].delay(logger.debug(s"Checking presence of file $sha256 in GCS")) + blob <- getBlob(sha256) + result = blob match { + case Some(blob) => + HeadResult.Exists(blob.getSize) + case None => + HeadResult.NotFound + } + } yield Either.right[StorageException, HeadResult](result) + }.recover { + case e: GcStorageException => + logger.error(s"Error while checking presence of file $sha256 in GCS", e) + Either.left[StorageException, HeadResult] { + StorageException.InvalidResponseException(e.getCode, e.getMessage, e.getReason) + } + } + } + + override def get(sha256: Sha256, dest: File): F[Either[StorageException, GetResult]] = { + { + for { + _ <- Sync[F].delay(logger.debug(s"Downloading file $sha256 from GCS")) + blob <- getBlob(sha256) + result <- blob match { + case Some(blob) => + receiveStreamedFile(blob, dest, sha256) + case None => + Sync[F].pure[Either[StorageException, GetResult]] { + Right(GetResult.NotFound) + } + } + } yield result + }.recover { + case e: GcStorageException => + logger.error(s"Error while downloading file $sha256 from GCS", e) + Either.left[StorageException, GetResult] { + StorageException.InvalidResponseException(e.getCode, e.getMessage, e.getReason) + } + } + } + + private def getBlob(sha256: Sha256): F[Option[Blob]] = { + for { + objectPath <- Sync[F].delay(composeBlobPath(sha256)) + result <- blocker.delay { + Option(bucket.get(objectPath)) + } + } yield result + } + + private def receiveStreamedFile(blob: Blob, destination: File, expectedHash: Sha256): F[Either[StorageException, GetResult]] = { + Sync[F].delay(logger.debug(s"Downloading streamed data to $destination")) >> + blocker + .delay(destination.newOutputStream(FileStreamOpenOptions)) + .bracket { fileStream => + Sync[F] + .delay(new DigestOutputStream(fileStream, MessageDigest.getInstance("SHA-256"))) + .bracket { stream => + blocker.delay(blob.downloadTo(stream)).flatMap { _ => + Sync[F].delay { + (blob.getSize, Sha256(stream.getMessageDigest.digest)) + } + } + }(stream => blocker.delay(stream.close())) + }(fileStream => blocker.delay(fileStream.close())) + .map[Either[StorageException, GetResult]] { + case (size, hash) => + if (expectedHash != hash) { + Left { + StorageException.InvalidDataException(200, "-stream-", s"Expected SHA256 $expectedHash but got $hash") + } + } else { + Right { + GetResult.Downloaded(destination, size) + } + } + } + } + + override def close(): Unit = { + () + } +} + +object GcsStorageBackend { + private val DefaultConfig = ConfigFactory.defaultReference().getConfig("gcsBackendDefaults") + + def fromConfig[F[_]: Sync: ContextShift](config: Config, + blocker: Blocker): EitherT[F, ConfigurationException, Resource[F, GcsStorageBackend[F]]] = { + + def composeConfig: EitherT[F, ConfigurationException, GcsBackendConfiguration] = EitherT { + Sync[F].delay { + pureconfig.ConfigSource + .fromConfig(config.withFallback(DefaultConfig)) + .load[GcsBackendConfiguration] + .leftMap { failures => + ConfigurationException("Could not load config", new ConfigReaderException[GcsBackendConfiguration](failures)) + } + } + } + + { + for { + conf <- composeConfig + storageClient <- prepareStorageClient(conf, blocker) + bucket <- getBucket(conf, storageClient, blocker) + } yield (storageClient, bucket) + }.map { + case (storage, bucket) => + Resource + .fromAutoCloseable { + Sync[F].pure(storage) + } + .map { _ => + new GcsStorageBackend[F](bucket)(blocker) + } + } + } + + private[gcs] def composeBlobPath(sha256: Sha256): String = { + val sha256Hex = sha256.toHexString + String.join("/", sha256Hex.substring(0, 2), sha256Hex.substring(2, 4), sha256Hex.substring(4, 6), sha256Hex) + } + + def prepareStorageClient[F[_]: Sync: ContextShift](conf: GcsBackendConfiguration, + blocker: Blocker): EitherT[F, ConfigurationException, Storage] = { + EitherT { + blocker.delay { + Either + .catchNonFatal { + val builder = conf.jsonKeyPath match { + case Some(jsonKeyPath) => + StorageOptions.newBuilder + .setCredentials(ServiceAccountCredentials.fromStream(new FileInputStream(jsonKeyPath))) + case None => + StorageOptions.getDefaultInstance.toBuilder + } + + builder + .setProjectId(conf.projectId) + .setRetrySettings(ServiceOptions.getNoRetrySettings) + + builder.build.getService + } + .leftMap { e => + ConfigurationException("Could not create GCS client", e) + } + } + } + } + + def getBucket[F[_]: Sync: ContextShift](conf: GcsBackendConfiguration, + storageClient: Storage, + blocker: Blocker): EitherT[F, ConfigurationException, Bucket] = { + EitherT { + blocker + .delay { + Either + .catchNonFatal { + Option(storageClient.get(conf.bucketName, Storage.BucketGetOption.userProject(conf.projectId))) + } + } + .map { + _.leftMap { e => + ConfigurationException(s"Attempt to get bucket ${conf.bucketName} failed", e) + }.flatMap { + case Some(bucket) => + Right(bucket) + case None => + Left { + ConfigurationException(s"Bucket ${conf.bucketName} does not exist") + } + } + } + } + } +} + +case class GcsBackendConfiguration(projectId: String, bucketName: String, jsonKeyPath: Option[String] = None) + +object GcsBackendConfiguration { + // configure pureconfig: + implicit val productHint: ProductHint[GcsBackendConfiguration] = ProductHint[GcsBackendConfiguration]( + fieldMapping = ConfigFieldMapping(CamelCase, CamelCase) + ) +} diff --git a/gcs/src/test/scala/com/avast/clients/storage/gcs/GcsStorageBackendTest.scala b/gcs/src/test/scala/com/avast/clients/storage/gcs/GcsStorageBackendTest.scala new file mode 100644 index 0000000..70cc8d2 --- /dev/null +++ b/gcs/src/test/scala/com/avast/clients/storage/gcs/GcsStorageBackendTest.scala @@ -0,0 +1,96 @@ +package com.avast.clients.storage.gcs + +import better.files.File +import cats.effect.Blocker +import com.avast.clients.storage.gcs.TestImplicits.{randomString, StringOps} +import com.avast.clients.storage.{GetResult, HeadResult} +import com.avast.scala.hashes.Sha256 +import com.google.cloud.storage.{Blob, Bucket} +import monix.eval.Task +import monix.execution.Scheduler.Implicits.global +import org.junit.runner.RunWith +import org.mockito.ArgumentMatchers.any +import org.mockito.Mockito.when +import org.scalatest.FunSuite +import org.scalatest.concurrent.ScalaFutures +import org.scalatestplus.junit.JUnitRunner +import org.scalatestplus.mockito.MockitoSugar + +import java.io.OutputStream +import scala.concurrent.duration._ + +@RunWith(classOf[JUnitRunner]) +class GcsStorageBackendTest extends FunSuite with ScalaFutures with MockitoSugar { + test("head") { + val fileSize = 1001100 + val content = randomString(fileSize) + val sha = content.sha256 + val shaStr = sha.toString() + + val blob = mock[Blob] + when(blob.getSize).thenReturn(fileSize.toLong) + + val bucket = mock[Bucket] + when(bucket.get(any[String]())).thenAnswer { call => + val blobPath = call.getArgument[String](0) + assertResult { + List( + shaStr.substring(0, 2), + shaStr.substring(2, 4), + shaStr.substring(4, 6), + shaStr, + ) + }(blobPath.split("/").toList) + blob + } + + val result = composeTestBackend(bucket).head(sha).runSyncUnsafe(10.seconds) + + assertResult(Right(HeadResult.Exists(fileSize)))(result) + } + + test("get") { + val fileSize = 1001200 + val content = randomString(fileSize) + val sha = content.sha256 + val shaStr = sha.toString() + + val blob = mock[Blob] + when(blob.getSize).thenReturn(fileSize.toLong) + when(blob.downloadTo(any[OutputStream]())).thenAnswer { call => + val outputStream = call.getArgument[OutputStream](0) + outputStream.write(content.getBytes()) + } + + val bucket = mock[Bucket] + when(bucket.get(any[String]())).thenAnswer { call => + val blobPath = call.getArgument[String](0) + assertResult { + List( + shaStr.substring(0, 2), + shaStr.substring(2, 4), + shaStr.substring(4, 6), + shaStr, + ) + }(blobPath.split("/").toList) + blob + } + + File.usingTemporaryFile() { file => + val result = composeTestBackend(bucket).get(sha, file).runSyncUnsafe(10.seconds) + assertResult(Right(GetResult.Downloaded(file, fileSize)))(result) + assertResult(sha.toString.toLowerCase)(file.sha256.toLowerCase) + assertResult(fileSize)(file.size) + } + } + + test("composeObjectPath") { + val sha = Sha256("d05af9a8494696906e8eec79843ca1e4bf408c280616a121ed92f9e92e2de831") + assertResult("d0/5a/f9/d05af9a8494696906e8eec79843ca1e4bf408c280616a121ed92f9e92e2de831")(GcsStorageBackend.composeBlobPath(sha)) + } + + private def composeTestBackend(bucket: Bucket): GcsStorageBackend[Task] = { + val blocker = Blocker.liftExecutionContext(monix.execution.Scheduler.io()) + new GcsStorageBackend[Task](bucket)(blocker) + } +} diff --git a/gcs/src/test/scala/com/avast/clients/storage/gcs/TestImplicits.scala b/gcs/src/test/scala/com/avast/clients/storage/gcs/TestImplicits.scala new file mode 100644 index 0000000..0c1579b --- /dev/null +++ b/gcs/src/test/scala/com/avast/clients/storage/gcs/TestImplicits.scala @@ -0,0 +1,18 @@ +package com.avast.clients.storage.gcs + +import com.avast.scala.hashes.Sha256 + +import java.security.MessageDigest +import scala.util.Random + +/* Utils for testing. */ +object TestImplicits { + def randomString(length: Int = 100): String = Random.alphanumeric.take(length).mkString + + implicit class StringOps(val s: String) extends AnyVal { + def sha256: Sha256 = { + val digest = MessageDigest.getInstance("SHA-256") + Sha256(digest.digest(s.getBytes)) + } + } +} diff --git a/settings.gradle b/settings.gradle index 3de701b..30cced1 100644 --- a/settings.gradle +++ b/settings.gradle @@ -1,3 +1,3 @@ rootProject.name = 'storage-client' -include "core", "hcp" \ No newline at end of file +include "core", "hcp", "gcs" \ No newline at end of file