Skip to content

Commit

Permalink
feature(223) overlay plugin (#225)
Browse files Browse the repository at this point in the history
Co-authored-by: Dr. Ernie Prabhakar <[email protected]>
  • Loading branch information
drernie and drernie authored Aug 29, 2024
1 parent 08a8fcc commit 6f0aecf
Show file tree
Hide file tree
Showing 13 changed files with 158 additions and 44 deletions.
15 changes: 12 additions & 3 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,20 @@ jobs:
env:
LOG4J_DEBUG: true

- name: Archive production artifacts
- name: Archive production artifacts (Windows only)
uses: actions/upload-artifact@v4
if: ${{ always() }}
if: ${{ always() && matrix.os == 'windows-latest' }}
with:
name: nf-quilt-test-reports-${{ matrix.os }}-${{ matrix.java_version }}
path: |
./nf-quilt/plugins/nf-quilt/build/reports/tests/test/
D:\a\nf-quilt\nf-quilt\plugins\nf-quilt\build\reports\tests\test\
overwrite: true
- name: Archive production artifacts (Linux and MacOS)
uses: actions/upload-artifact@v4
if: ${{ always() && matrix.os != 'windows-latest' }}
with:
name: nf-quilt-test-reports-${{ matrix.os }}-${{ matrix.java_version }}
path: |
${{ github.workspace }}/nf-quilt/plugins/nf-quilt/build/reports/tests/test/
overwrite: true

3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ pkg-test: compile #-all
echo "$(TEST_URI)"
$(NF_BIN) run ./main.nf -profile standard -plugins $(PROJECT) --outdir "$(TEST_URI)"

s3-overlay: compile
$(NF_BIN) run ./main.nf --plugins $(PROJECT) --outdir "$(S3_BASE)/s3-overlay" --input "$(S3_BASE)/s3-in"

s3-test: compile
$(NF_BIN) run ./main.nf --outdir "$(S3_BASE)/s3-test" --input "$(S3_BASE)/s3-in"

Expand Down
12 changes: 8 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ Nextflow plugin for reading and writing Quilt packages as a FileSystem
developed by [Quilt Data](https://quiltdata.com/) that enables you read and write directly
to Quilt packages using `quilt+s3` URIs wherever your Nextflow pipeline currently use `s3` URIs.

In v0.8.0, the plugin can even be used with "native" URIs, and it will automatically register a Quilt package at the root of the bucket.

Inspired by the original [`nf-quilt`](https://github.com/nextflow-io/nf-quilt) plugin (v0.2.0) developed by Seqera labs.

## I. Using the nf-quilt plugin in Production
Expand Down Expand Up @@ -35,6 +37,8 @@ You can alternatively specify the plugin as part of the command-line, .e.g.:
nextflow run ./main.nf -profile standard -plugins nf-quilt --outdir 'quilt+s3://bucket#package=prefix/suffix'
```

## II. Working With Quilt+ URIs (optional for output)

1. Obtain a Quilt+ URI for each package

Each Quilt+ package URI you read or write from has the form:
Expand Down Expand Up @@ -82,7 +86,7 @@ nextflow run main.nf -plugins [email protected]

For Tower, you can use the "Pre-run script" to set the environment variables.

## II. Advanced URI Options
## III. Advanced URI Options

There are a number of additional parameters you can add to Quilt+ URIs,
in order to customize the behavior of the plugin:
Expand Down Expand Up @@ -148,7 +152,7 @@ To quickly run `nf-quilt` from this GitHub repository:
```bash
git clone https://github.com/quiltdata/nf-quilt.git
cd nf-quilt
make test-all # runs unit tests and installs depdencies
make test-all # runs unit tests and installs dependencies
export WRITE_BUCKET=bucket-with-write-access
make pkg-test # create "test/hurdat" package on s3://$WRITE_BUCKET
./launch.sh run nf-core/sarek -profile test,docker -plugins nf-quilt \
Expand Down Expand Up @@ -195,9 +199,9 @@ type:
./launch.sh run ./main.nf -profile standard -plugins $(PROJECT) --pub "quilt+s3://bucket#package=test/hurdat"
```

Replace "bucket" with an S3 bucket those credentials can write to.
Replace "bucket" with an S3 bucket that those credentials can write to.

### Running a Pipeine Locally
### Running a Pipeline Locally

From inside the `nf-quilt` directory, call `./launch.sh` with a path to your pipeline.

Expand Down
66 changes: 57 additions & 9 deletions plugins/nf-quilt/src/main/nextflow/quilt/QuiltObserver.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,18 @@
*/
package nextflow.quilt

import nextflow.Session
import nextflow.quilt.jep.QuiltParser
import nextflow.quilt.nio.QuiltPath
import nextflow.quilt.nio.QuiltPathFactory
import nextflow.trace.TraceObserver

import java.nio.file.Path
import java.util.concurrent.locks.Lock
import java.util.concurrent.locks.ReentrantLock

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import nextflow.Session
import nextflow.trace.TraceObserver

/**
* Plugin observer of workflow events
Expand All @@ -39,6 +41,10 @@ class QuiltObserver implements TraceObserver {
final private Map<String,String> uniqueURIs = [:]
final private Map<String,String> publishedURIs = [:]

// Is this overkill? Do we only ever have one output package per run?
final private Map<String, Map<String, Path>> packageOverlays = [:]
final private Lock lock = new ReentrantLock() // Need this because of threads

static QuiltPath asQuiltPath(Path path) {
if (path in QuiltPath) {
return (QuiltPath) path
Expand Down Expand Up @@ -73,6 +79,44 @@ class QuiltObserver implements TraceObserver {
return uniqueURIs[key]
}

String extractPackageURI(Path nonQuiltPath) {
String pathString = nonQuiltPath.toUri()
// println("extractPackageURI.pathString[${nonQuiltPath}] -> $pathString")
String[] partsArray = pathString.split('/')
List<String> parts = new ArrayList(partsArray.toList())
// parts.eachWithIndex { p, i -> println("extractPackageURI.parts[$i]: $p") }

if (parts.size() < 3) {
throw new IllegalArgumentException("Invalid pathString: $pathString ($nonQuiltPath)")
}
parts = parts.drop(3)
if (parts[0].endsWith(':')) {
parts = parts.drop(1)
}
String bucket = parts.remove(0)
String file_path = parts.remove(parts.size() - 1)
String prefix = parts.size() > 0 ? parts.remove(0) : 'default_prefix'
String suffix = parts.size() > 0 ? parts.remove(0) : 'default_suffix'
if (parts.size() > 0) {
String folder_path = parts.join('/')
file_path = folder_path + '/' + file_path
}

// TODO: should overlay packages always force to new versions?
String base = "quilt+s3://${bucket}#package=${prefix}%2f${suffix}"
String uri = "${base}&path=${file_path}"

String key = pkgKey(QuiltPathFactory.parse(uri))
Map<String, Path> current = packageOverlays.get(key, [:]) as Map<String, Path>
current[file_path] = nonQuiltPath
lock.withLock {
uniqueURIs[key] = base
publishedURIs[key] = base
packageOverlays[key] = current
}
return uri
}

void checkParams(Map params) {
log.debug("checkParams[$params]")
params.each { k, value ->
Expand All @@ -94,23 +138,27 @@ class QuiltObserver implements TraceObserver {

// NOTE: TraceFileObserver calls onFilePublish _before_ onFlowCreate
@Override
void onFilePublish(Path path) { //, Path source
log.debug("onFilePublish.Path[$path]") //.Source[$source]
QuiltPath qPath = asQuiltPath(path)
void onFilePublish(Path destination, Path source) {
// Path source may be null, won't work with older versions of Nextflow
log.debug("onFilePublish.Path[$destination] <- $source")
QuiltPath qPath = asQuiltPath(destination)
if (qPath) {
checkPath(qPath, true)
} else {
log.warn("onFilePublish.not.QuiltPath: $path")
String uri = extractPackageURI(destination)
log.debug("onFilePublish.NonQuiltPath[$destination]: $uri")
}
}

@Override
void onFlowComplete() {
log.debug("`onFlowComplete` ${publishedURIs}")
log.debug("onFlowComplete.publishedURIs[${publishedURIs.size()}]: $publishedURIs")
// create QuiltProduct for each unique package URI
publishedURIs.each { k, uri ->
publishedURIs.each { key, uri ->
QuiltPath path = QuiltPathFactory.parse(uri)
new QuiltProduct(path, session)
Map<String, Path> overlays = packageOverlays.get(key, [:]) as Map<String, Path>
// log.debug("onFlowComplete.pkg: $path overlays[${overlays?.size()}]: $overlays")
new QuiltProduct(path, session, overlays)
}
}

Expand Down
32 changes: 21 additions & 11 deletions plugins/nf-quilt/src/main/nextflow/quilt/QuiltProduct.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,11 @@ ${nextflow}
}
}

static void writeString(String text, QuiltPackage pkg, String filename) {
static void writeString(String text, QuiltPackage pkg, String filepath) {
String dir = pkg.packageDest()
Path path = Paths.get(dir, filename.split('/') as String[])
Path path = Paths.get(dir, filepath.split('/') as String[])
try {
// ensure directories exist first
path.getParent().toFile().mkdirs()
path.getParent().toFile().mkdirs() // ensure directories exist first
Files.write(path, text.bytes)
}
catch (Exception e) {
Expand All @@ -113,31 +112,42 @@ ${nextflow}
}

private final QuiltPath path
private final List<Path> overlays
private final QuiltPackage pkg
private final Session session
private String msg
private Map meta

QuiltProduct(QuiltPath path, Session session) {
QuiltProduct(QuiltPath path, Session session, Map<String, Path> overlays = [:]) {
this.path = path
this.pkg = path.pkg()
this.msg = pkg.toString()
this.meta = [pkg: msg, time_start: now()]
this.session = session

if (session.isSuccess() || pkg.is_force()) {
if (overlays) {
log.info("publishing overlays: ${overlays.size()}")
publishOverlays(overlays)
} else {
log.info('No overlays to publish.')
}
publish()
} else {
log.info("not publishing: ${pkg} [unsuccessful session]")
}
}

void publishOverlays(Map<String, Path> overlays) {
overlays.each { key, overlay ->
log.info("publishing overlay[$key]: ${overlay}")
writeString(overlay.text, pkg, key)
}
}

void publish() {
log.debug("publish($msg)")
meta = setupMeta()
String text = setupReadme()
log.debug("setupReadme: $text")
List<Map> quilt_summarize = setupSummarize()
log.debug("setupSummarize: $quilt_summarize")
try {
log.info("publish.pushing: ${pkg}")
def m = pkg.push(msg, meta)
Expand Down Expand Up @@ -173,7 +183,7 @@ ${nextflow}

String writeNextflowMetadata(Map map, String suffix) {
String filename = "nf-quilt/${suffix}.json"
log.debug("writeNextflowMetadata[$suffix]: ${filename}")
// log.debug("writeNextflowMetadata[$suffix]: ${filename}")
writeString(QuiltPackage.toJson(map), pkg, filename)
return filename
}
Expand Down Expand Up @@ -255,7 +265,7 @@ ${nextflow}
now: now(),
pkg: pkg.packageName,
])
log.debug("readme.template: ${template}")
// log.debug("readme.template: ${template}")
return template
}

Expand Down
25 changes: 18 additions & 7 deletions plugins/nf-quilt/src/main/nextflow/quilt/jep/QuiltPackage.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class QuiltPackage {
List<String> entries = dict.collect { key, value ->
String prefix = JsonOutput.toJson(key)
String suffix = "toJson.error: ${value}"
log.debug("QuiltPackage.toJson: ${prefix} [${suffix.length()}]")
// log.debug("QuiltPackage.toJson: ${prefix} [${suffix.length()}]")
try {
suffix = JsonOutput.toJson(value)
}
Expand Down Expand Up @@ -172,7 +172,10 @@ class QuiltPackage {
void setup() {
Files.createDirectories(this.folder)
this.installed = false
install() // FIXME: only needed for nextflow < 23.12?
if (!this.is_force()) {
log.debug("QuiltPackage.setup.install.options: $parsed.options")
install(true) // FIXME: only needed for nextflow < 23.12?
}
}

boolean is_force() {
Expand All @@ -191,15 +194,16 @@ class QuiltPackage {
return folder
}

Path install() {
Path install(boolean implicit=false) {
if (isNull()) {
log.debug('null bucket: no need to install')
return null
}
Path dest = packageDest()
String implicitStr = implicit ? 'implicitly ' : ''

try {
log.info("installing $packageName from $bucket...")
log.info("${implicitStr}installing $packageName from $bucket...")
S3PhysicalKey registryPath = new S3PhysicalKey(bucket, '', null)
Registry registry = new Registry(registryPath)
Namespace namespace = registry.getNamespace(packageName)
Expand All @@ -210,10 +214,17 @@ class QuiltPackage {
Manifest manifest = namespace.getManifest(resolvedHash)

manifest.install(dest)
log.debug("done: installed into $dest)")
println("Children: ${relativeChildren('')}")
log.info("install: ${implicitStr}installed into $dest)")
println("QuiltPackage.install.Children: ${relativeChildren('')}")
} catch (IOException e) {
log.error("failed to install $packageName")
if (!implicit) {
log.error("failed to install $packageName", e)
print("INSTALL FAILED: ${this.parsed}\n")
e.printStackTrace()
/* groovylint-disable-next-line ThrowRuntimeException */
throw new RuntimeException(e)
}
log.warn("failed to (implicitly) install $packageName")
// this is non-fatal error, so we don't want to stop the pipeline
/* groovylint-disable-next-line ReturnNullFromCatchBlock */
return null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ class QuiltParser {
this.catalogName = options.get(P_CAT)
this.options = options
this.metadata = metadata
log.debug("QuiltParser[${bucket}] for ${packageName} in ${path}")
// log.debug("QuiltParser[${bucket}] for ${packageName} in ${path}")
}

String parsePkg(String pkg) {
Expand Down Expand Up @@ -292,7 +292,7 @@ class QuiltParser {
if (tag) { pkg += ":$tag" }
str += "#package=${pkg.replace('/', '%2f')}"
}
log.debug("toPackageString: ${str}")
// log.debug("toPackageString: ${str}")
return str
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,12 @@ class QuiltFileSystemProvider extends FileSystemProvider implements FileSystemTr
*/

boolean canUpload(Path source, Path target) {
// log.debug "QuiltFileSystemProvider.canUpload: ${source} -> ${target}"
log.debug "QuiltFileSystemProvider.canUpload: ${source} -> ${target}"
return FileSystems.getDefault().equals(source.getFileSystem()) && target instanceof QuiltPath
}

boolean canDownload(Path source, Path target) {
// log.debug "QuiltFileSystemProvider.canDownload: ${source} -> ${target}"
log.debug "QuiltFileSystemProvider.canDownload: ${source} -> ${target}"
return source instanceof QuiltPath && FileSystems.getDefault().equals(target.getFileSystem())
}

Expand Down Expand Up @@ -143,7 +143,7 @@ class QuiltFileSystemProvider extends FileSystemProvider implements FileSystemTr
}

void upload(Path localFile, Path remoteDestination, CopyOption... options) throws IOException {
// log.debug "QuiltFileSystemProvider.upload: ${localFile} -> ${remoteDestination}"
log.debug "QuiltFileSystemProvider.upload: ${localFile} -> ${remoteDestination}"
final CopyOptions opts = CopyOptions.parse(options)
// delete target if it exists and REPLACE_EXISTING is specified
if (opts.replaceExisting()) {
Expand Down
2 changes: 1 addition & 1 deletion plugins/nf-quilt/src/resources/META-INF/MANIFEST.MF
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Manifest-Version: 1.0
Plugin-Class: nextflow.quilt.QuiltPlugin
Plugin-Id: nf-quilt
Plugin-Version: 0.7.17
Plugin-Version: 0.8.0
Plugin-Provider: Quilt Data
Plugin-Requires: >=22.10.6
Loading

0 comments on commit 6f0aecf

Please sign in to comment.