Skip to content

Commit

Permalink
feat: Add GCS backend (#182)
Browse files Browse the repository at this point in the history
feat: Add GCS backend
  • Loading branch information
mi-char authored Jun 5, 2023
1 parent 20fe175 commit 4b2fd68
Show file tree
Hide file tree
Showing 9 changed files with 384 additions and 2 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
Finally-tagless implementation of client for misc. storages represented by backends. Supports backends fallbacks.

Currently supported backends:
1. [HCP](hcp/README.md)
1. [HCP (Hitachi Content Platform)](hcp/README.md)
2. [GCS (Google Cloud Storage)](gcs/README.md)

## Dependency

Expand Down
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ plugins {
ext {
metricsVersion = "2.10.4"
http4sVersion = "0.22.12"
gcsVersion = "2.22.2"
monixVersion = "3.4.1" // Used only in tests.
}

Expand Down
36 changes: 36 additions & 0 deletions gcs/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# GCS (Google Cloud Storage) backend

## Dependency

```groovy
compile "com.avast.clients.storage:storage-client-gcs_2.13:x.x.x"
```

## Usage

Configuration:

```hocon
projectId = "my-project-id"
bucketName = "bucket-name"
```

Client init, example for `monix.eval.Task`:

```scala
import com.avast.clients.storage.gcs.GcsStorageBackend
import com.typesafe.config.Config
import monix.eval.Task
import monix.execution.Scheduler
import cats.effect.Blocker

implicit val scheduler: Scheduler = ???
val blocker: Blocker = ???
val config: Config = ???

GcsStorageBackend.fromConfig[Task](config, blocker).map{ resource =>
resource.use { client =>
client.get(sha256, destinationFile)
}
}
```
9 changes: 9 additions & 0 deletions gcs/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
archivesBaseName = "storage-client-gcs_2.13"

dependencies {
api project(":core")

implementation "com.google.cloud:google-cloud-storage:$gcsVersion"

testImplementation "io.monix:monix_2.13:$monixVersion"
}
5 changes: 5 additions & 0 deletions gcs/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
gcsBackendDefaults {
//projectId = "" // REQUIRED
//bucketName = "" // REQUIRED
//jsonKeyPath = "" // REQUIRED if using service account authentication (see https://github.com/googleapis/google-cloud-java#using-a-service-account-recommended)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
package com.avast.clients.storage.gcs

import better.files.File
import cats.data.EitherT
import cats.effect.implicits.catsEffectSyntaxBracket
import cats.effect.{Blocker, ContextShift, Resource, Sync}
import cats.syntax.all._
import com.avast.clients.storage.gcs.GcsStorageBackend.composeBlobPath
import com.avast.clients.storage.{ConfigurationException, GetResult, HeadResult, StorageBackend, StorageException}
import com.avast.scala.hashes.Sha256
import com.google.auth.oauth2.ServiceAccountCredentials
import com.google.cloud.ServiceOptions
import com.google.cloud.storage.{Blob, Bucket, Storage, StorageOptions, StorageException => GcStorageException}
import com.typesafe.config.{Config, ConfigFactory}
import com.typesafe.scalalogging.StrictLogging
import pureconfig.error.ConfigReaderException
import pureconfig.generic.ProductHint
import pureconfig.generic.auto._
import pureconfig.{CamelCase, ConfigFieldMapping}

import java.io.FileInputStream
import java.nio.file.StandardOpenOption
import java.security.{DigestOutputStream, MessageDigest}

class GcsStorageBackend[F[_]: Sync: ContextShift](bucket: Bucket)(blocker: Blocker) extends StorageBackend[F] with StrictLogging {
private val FileStreamOpenOptions = Seq(StandardOpenOption.WRITE, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING)

override def head(sha256: Sha256): F[Either[StorageException, HeadResult]] = {
{
for {
_ <- Sync[F].delay(logger.debug(s"Checking presence of file $sha256 in GCS"))
blob <- getBlob(sha256)
result = blob match {
case Some(blob) =>
HeadResult.Exists(blob.getSize)
case None =>
HeadResult.NotFound
}
} yield Either.right[StorageException, HeadResult](result)
}.recover {
case e: GcStorageException =>
logger.error(s"Error while checking presence of file $sha256 in GCS", e)
Either.left[StorageException, HeadResult] {
StorageException.InvalidResponseException(e.getCode, e.getMessage, e.getReason)
}
}
}

override def get(sha256: Sha256, dest: File): F[Either[StorageException, GetResult]] = {
{
for {
_ <- Sync[F].delay(logger.debug(s"Downloading file $sha256 from GCS"))
blob <- getBlob(sha256)
result <- blob match {
case Some(blob) =>
receiveStreamedFile(blob, dest, sha256)
case None =>
Sync[F].pure[Either[StorageException, GetResult]] {
Right(GetResult.NotFound)
}
}
} yield result
}.recover {
case e: GcStorageException =>
logger.error(s"Error while downloading file $sha256 from GCS", e)
Either.left[StorageException, GetResult] {
StorageException.InvalidResponseException(e.getCode, e.getMessage, e.getReason)
}
}
}

private def getBlob(sha256: Sha256): F[Option[Blob]] = {
for {
objectPath <- Sync[F].delay(composeBlobPath(sha256))
result <- blocker.delay {
Option(bucket.get(objectPath))
}
} yield result
}

private def receiveStreamedFile(blob: Blob, destination: File, expectedHash: Sha256): F[Either[StorageException, GetResult]] = {
Sync[F].delay(logger.debug(s"Downloading streamed data to $destination")) >>
blocker
.delay(destination.newOutputStream(FileStreamOpenOptions))
.bracket { fileStream =>
Sync[F]
.delay(new DigestOutputStream(fileStream, MessageDigest.getInstance("SHA-256")))
.bracket { stream =>
blocker.delay(blob.downloadTo(stream)).flatMap { _ =>
Sync[F].delay {
(blob.getSize, Sha256(stream.getMessageDigest.digest))
}
}
}(stream => blocker.delay(stream.close()))
}(fileStream => blocker.delay(fileStream.close()))
.map[Either[StorageException, GetResult]] {
case (size, hash) =>
if (expectedHash != hash) {
Left {
StorageException.InvalidDataException(200, "-stream-", s"Expected SHA256 $expectedHash but got $hash")
}
} else {
Right {
GetResult.Downloaded(destination, size)
}
}
}
}

override def close(): Unit = {
()
}
}

object GcsStorageBackend {
private val DefaultConfig = ConfigFactory.defaultReference().getConfig("gcsBackendDefaults")

def fromConfig[F[_]: Sync: ContextShift](config: Config,
blocker: Blocker): EitherT[F, ConfigurationException, Resource[F, GcsStorageBackend[F]]] = {

def composeConfig: EitherT[F, ConfigurationException, GcsBackendConfiguration] = EitherT {
Sync[F].delay {
pureconfig.ConfigSource
.fromConfig(config.withFallback(DefaultConfig))
.load[GcsBackendConfiguration]
.leftMap { failures =>
ConfigurationException("Could not load config", new ConfigReaderException[GcsBackendConfiguration](failures))
}
}
}

{
for {
conf <- composeConfig
storageClient <- prepareStorageClient(conf, blocker)
bucket <- getBucket(conf, storageClient, blocker)
} yield (storageClient, bucket)
}.map {
case (storage, bucket) =>
Resource
.fromAutoCloseable {
Sync[F].pure(storage)
}
.map { _ =>
new GcsStorageBackend[F](bucket)(blocker)
}
}
}

private[gcs] def composeBlobPath(sha256: Sha256): String = {
val sha256Hex = sha256.toHexString
String.join("/", sha256Hex.substring(0, 2), sha256Hex.substring(2, 4), sha256Hex.substring(4, 6), sha256Hex)
}

def prepareStorageClient[F[_]: Sync: ContextShift](conf: GcsBackendConfiguration,
blocker: Blocker): EitherT[F, ConfigurationException, Storage] = {
EitherT {
blocker.delay {
Either
.catchNonFatal {
val builder = conf.jsonKeyPath match {
case Some(jsonKeyPath) =>
StorageOptions.newBuilder
.setCredentials(ServiceAccountCredentials.fromStream(new FileInputStream(jsonKeyPath)))
case None =>
StorageOptions.getDefaultInstance.toBuilder
}

builder
.setProjectId(conf.projectId)
.setRetrySettings(ServiceOptions.getNoRetrySettings)

builder.build.getService
}
.leftMap { e =>
ConfigurationException("Could not create GCS client", e)
}
}
}
}

def getBucket[F[_]: Sync: ContextShift](conf: GcsBackendConfiguration,
storageClient: Storage,
blocker: Blocker): EitherT[F, ConfigurationException, Bucket] = {
EitherT {
blocker
.delay {
Either
.catchNonFatal {
Option(storageClient.get(conf.bucketName, Storage.BucketGetOption.userProject(conf.projectId)))
}
}
.map {
_.leftMap { e =>
ConfigurationException(s"Attempt to get bucket ${conf.bucketName} failed", e)
}.flatMap {
case Some(bucket) =>
Right(bucket)
case None =>
Left {
ConfigurationException(s"Bucket ${conf.bucketName} does not exist")
}
}
}
}
}
}

case class GcsBackendConfiguration(projectId: String, bucketName: String, jsonKeyPath: Option[String] = None)

object GcsBackendConfiguration {
// configure pureconfig:
implicit val productHint: ProductHint[GcsBackendConfiguration] = ProductHint[GcsBackendConfiguration](
fieldMapping = ConfigFieldMapping(CamelCase, CamelCase)
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package com.avast.clients.storage.gcs

import better.files.File
import cats.effect.Blocker
import com.avast.clients.storage.gcs.TestImplicits.{randomString, StringOps}
import com.avast.clients.storage.{GetResult, HeadResult}
import com.avast.scala.hashes.Sha256
import com.google.cloud.storage.{Blob, Bucket}
import monix.eval.Task
import monix.execution.Scheduler.Implicits.global
import org.junit.runner.RunWith
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito.when
import org.scalatest.FunSuite
import org.scalatest.concurrent.ScalaFutures
import org.scalatestplus.junit.JUnitRunner
import org.scalatestplus.mockito.MockitoSugar

import java.io.OutputStream
import scala.concurrent.duration._

@RunWith(classOf[JUnitRunner])
class GcsStorageBackendTest extends FunSuite with ScalaFutures with MockitoSugar {
test("head") {
val fileSize = 1001100
val content = randomString(fileSize)
val sha = content.sha256
val shaStr = sha.toString()

val blob = mock[Blob]
when(blob.getSize).thenReturn(fileSize.toLong)

val bucket = mock[Bucket]
when(bucket.get(any[String]())).thenAnswer { call =>
val blobPath = call.getArgument[String](0)
assertResult {
List(
shaStr.substring(0, 2),
shaStr.substring(2, 4),
shaStr.substring(4, 6),
shaStr,
)
}(blobPath.split("/").toList)
blob
}

val result = composeTestBackend(bucket).head(sha).runSyncUnsafe(10.seconds)

assertResult(Right(HeadResult.Exists(fileSize)))(result)
}

test("get") {
val fileSize = 1001200
val content = randomString(fileSize)
val sha = content.sha256
val shaStr = sha.toString()

val blob = mock[Blob]
when(blob.getSize).thenReturn(fileSize.toLong)
when(blob.downloadTo(any[OutputStream]())).thenAnswer { call =>
val outputStream = call.getArgument[OutputStream](0)
outputStream.write(content.getBytes())
}

val bucket = mock[Bucket]
when(bucket.get(any[String]())).thenAnswer { call =>
val blobPath = call.getArgument[String](0)
assertResult {
List(
shaStr.substring(0, 2),
shaStr.substring(2, 4),
shaStr.substring(4, 6),
shaStr,
)
}(blobPath.split("/").toList)
blob
}

File.usingTemporaryFile() { file =>
val result = composeTestBackend(bucket).get(sha, file).runSyncUnsafe(10.seconds)
assertResult(Right(GetResult.Downloaded(file, fileSize)))(result)
assertResult(sha.toString.toLowerCase)(file.sha256.toLowerCase)
assertResult(fileSize)(file.size)
}
}

test("composeObjectPath") {
val sha = Sha256("d05af9a8494696906e8eec79843ca1e4bf408c280616a121ed92f9e92e2de831")
assertResult("d0/5a/f9/d05af9a8494696906e8eec79843ca1e4bf408c280616a121ed92f9e92e2de831")(GcsStorageBackend.composeBlobPath(sha))
}

private def composeTestBackend(bucket: Bucket): GcsStorageBackend[Task] = {
val blocker = Blocker.liftExecutionContext(monix.execution.Scheduler.io())
new GcsStorageBackend[Task](bucket)(blocker)
}
}
Loading

0 comments on commit 4b2fd68

Please sign in to comment.