Skip to content

Commit

Permalink
Try to fix failing CI packaging jobs
Browse files Browse the repository at this point in the history
The validation job rarely fails... but the packaging one is the real tough nut here. I've switched it to wgetting the dataset and then reading it from disk – maybe this will help.

Issue: RiverBench/RiverBench#116
  • Loading branch information
Ostrzyciel committed Aug 29, 2024
1 parent 49f95f9 commit 62d9b62
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 15 deletions.
9 changes: 4 additions & 5 deletions src/main/scala/commands/PackageCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,15 @@ object PackageCommand extends Command:
override def name: String = "package"

override def description = "Packages a dataset.\n" +
"Args: <repo-dir> <source-rel-info-file> <output-dir>"
"Args: <repo-dir> <source archive file> <output-dir>"

override def validateArgs(args: Array[String]) = args.length == 4

override def run(args: Array[String]): Future[Unit] = Future {
val repoDir = FileSystems.getDefault.getPath(args(1))
val sourceRelInfoFile = FileSystems.getDefault.getPath(args(2))
val sourceArchiveFile = FileSystems.getDefault.getPath(args(2))
val outDir = FileSystems.getDefault.getPath(args(3))

val dataFileUrl = ReleaseInfoParser.getDatasetUrl(sourceRelInfoFile)

val metadata = MetadataReader.read(repoDir)
val stats = new StatCounterSuite(metadata.elementCount)
val packages = Constants.packageSizes
Expand All @@ -71,7 +70,7 @@ object PackageCommand extends Command:
{ implicit builder =>
(sStats, sStreamPackage, sFlatPackage, sJellyPackage, sSampleStream, sChecks) =>
import GraphDSL.Implicits.*
val in = FileHelper.readArchive(dataFileUrl)
val in = FileHelper.readArchiveFromFile(sourceArchiveFile)
.map((name, bytes) => (name, bytes.utf8String))
.async
.buffer(16, OverflowStrategy.backpressure)
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/commands/ValidateRepoCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ object ValidateRepoCommand extends Command:

private def validatePackage(relInfoFile: Path, metadataInfo: MetadataInfo): Future[Seq[String]] =
val dataFileUrl = ReleaseInfoParser.getDatasetUrl(relInfoFile)
val filesFuture = FileHelper.readArchive(dataFileUrl)
val filesFuture = FileHelper.readArchiveFromUrl(dataFileUrl)
.map((tarMeta, _) => {
tarMeta.filePath.split('/').last
})
Expand Down
25 changes: 16 additions & 9 deletions src/main/scala/util/io/FileHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@ import org.apache.commons.compress.archivers.tar.TarArchiveInputStream
import org.apache.commons.io.IOUtils
import org.apache.pekko.NotUsed
import org.apache.pekko.actor.typed.ActorSystem
import org.apache.pekko.http.scaladsl.Http
import org.apache.pekko.http.scaladsl.model.*
import org.apache.pekko.http.scaladsl.model.headers.Location
import org.apache.pekko.stream.*
import org.apache.pekko.stream.connectors.file.TarArchiveMetadata
import org.apache.pekko.stream.connectors.file.scaladsl.Archive
Expand All @@ -21,19 +18,29 @@ import scala.concurrent.{ExecutionContext, Future}

object FileHelper:

def readArchiveFromUrl(url: String)(implicit as: ActorSystem[_]):
Source[(TarArchiveMetadata, ByteString), NotUsed] =
val response = HttpHelper.getWithFollowRedirects(url)
given ExecutionContext = as.executionContext
readArchive(
Source.futureSource(response.map(r => r.entity.dataBytes))
)

def readArchiveFromFile(path: Path)(implicit as: ActorSystem[_]):
Source[(TarArchiveMetadata, ByteString), NotUsed] =
readArchive(FileIO.fromPath(path))

/**
* Reads a source data archive.
* @param url the URL from which to download the archive
* Reads a source data archive from a byte source.
* @param source the source of the archive data
* @return a source of the archive entries
*/
def readArchive(url: String)(implicit as: ActorSystem[_]):
def readArchive(source: Source[ByteString, _])(implicit as: ActorSystem[_]):
Source[(TarArchiveMetadata, ByteString), NotUsed] =
given ExecutionContext = as.executionContext
val response = HttpHelper.getWithFollowRedirects(url)

// Unfortunately, Pekko untar stage is glitchy with large archives, so we have to
// use the non-reactive Apache Commons implementation instead.
val tarIs = Source.futureSource(response.map(r => r.entity.dataBytes))
val tarIs = source
.via(Compression.gunzip())
.toMat(StreamConverters.asInputStream())(Keep.right)
.mapMaterializedValue(bytesIs => TarArchiveInputStream(bytesIs))
Expand Down

0 comments on commit 62d9b62

Please sign in to comment.