Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat/bg-22 Adding ClinVar Vep table #200

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions datalake-spark3/src/main/resources/reference_kf.conf
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,26 @@ datalake {
"valid_to_column"="valid_to"
}
},
{
format=VCF
id="raw_clinvar_vep"
keys=[]
loadtype=OverWrite
partitionby=[]
path="/raw/landing/clinvar_vep/clinvar.vep.vcf.gz"
readoptions {
flattenInfoFields="true"
"split_multiallelics"="true"
}
storageid="public_database"
writeoptions {
"created_on_column"="created_on"
"is_current_column"="is_current"
"updated_on_column"="updated_on"
"valid_from_column"="valid_from"
"valid_to_column"="valid_to"
}
},
{
format=VCF
id="raw_dbsnp"
Expand Down Expand Up @@ -539,6 +559,35 @@ datalake {
"valid_to_column"="valid_to"
}
},
{
format=DELTA
id="normalized_clinvar_vep"
keys=[]
loadtype=OverWrite
partitionby=[]
path="/public/clinvar_vep"
readoptions {}
repartition {
kind=Coalesce
n=1
}
storageid="public_database"
table {
database=variant
name=clinvar_vep
}
view {
database="variant_live"
name=clinvar_vep
}
writeoptions {
"created_on_column"="created_on"
"is_current_column"="is_current"
"updated_on_column"="updated_on"
"valid_from_column"="valid_from"
"valid_to_column"="valid_to"
}
},
{
format=DELTA
id="normalized_cosmic_gene_set"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ object ImportPublicTable {
@main
def clinvar(rc: RuntimeETLContext): Unit = Clinvar.run(rc)

@main
def clinvar_vep(rc: RuntimeETLContext): Unit = ClinvarVep.run(rc)

@main
def cosmic_gene_set(rc: RuntimeETLContext): Unit = CosmicGeneSet.run(rc)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ case class PublicDatasets(alias: String, tableDatabase: Option[String], viewData
val sources: List[DatasetConf] = List(
//raw
DatasetConf("raw_clinvar" , alias, "/raw/landing/clinvar/clinvar.vcf.gz" , VCF , OverWrite , readoptions = Map("flattenInfoFields" -> "true", "split_multiallelics" -> "true")),
DatasetConf("raw_clinvar_vep" , alias, "/raw/landing/clinvar_vep/clinvar.vep.vcf.gz" , VCF , OverWrite , readoptions = Map("flattenInfoFields" -> "true", "split_multiallelics" -> "true")),
DatasetConf("raw_dbsnp" , alias, "/raw/landing/dbsnp/GCF_000001405.40.gz" , VCF , OverWrite , readoptions = Map("flattenInfoFields" -> "true", "split_multiallelics" -> "true")),
DatasetConf("raw_gnomad_genomes_v3" , alias, "/release/3.1/vcf/genomes/gnomad.genomes.v3.1.sites.chr[^M]*.vcf.bgz", VCF , OverWrite , readoptions = Map("flattenInfoFields" -> "true", "split_multiallelics" -> "true")).copy(storageid = gnomadStorageId),
DatasetConf("raw_gnomad_constraint_v2_1_1" , alias, "/raw/landing/gnomad_v2_1_1/gnomad.v2.1.1.lof_metrics.by_gene.txt.gz", CSV , OverWrite , readoptions = Map("header" -> "true", "sep" -> "\t")),
Expand Down Expand Up @@ -38,6 +39,7 @@ case class PublicDatasets(alias: String, tableDatabase: Option[String], viewData
DatasetConf("normalized_1000_genomes" , alias, "/public/1000_genomes" , DELTA, OverWrite , partitionby = List() , table = table("1000_genomes") , view = view("variant_live")),
DatasetConf("normalized_cancer_hotspots" , alias, "/public/cancer_hotspots" , DELTA, OverWrite , partitionby = List() , table = table("cancer_hotspots") , view = view("cancer_hotspots")),
DatasetConf("normalized_clinvar" , alias, "/public/clinvar" , DELTA, OverWrite , partitionby = List() , repartition=Some(Coalesce()), table = table("clinvar") , view = view("clinvar")),
DatasetConf("normalized_clinvar_vep" , alias, "/public/clinvar_vep" , DELTA, OverWrite , partitionby = List() , repartition=Some(Coalesce()), table = table("clinvar_vep") , view = view("clinvar_vep")),
DatasetConf("normalized_cosmic_gene_set" , alias, "/public/cosmic_gene_set" , DELTA, OverWrite , partitionby = List() , table = table("cosmic_gene_set") , view = view("cosmic_gene_set")),
DatasetConf("normalized_dbnsfp" , alias, "/public/dbnsfp/variant" , DELTA, OverWrite , partitionby = List("chromosome"), table = table("dbnsfp") , view = view("dbnsfp")),
DatasetConf("normalized_dbnsfp_annovar" , alias, "/public/annovar/dbnsfp" , DELTA, OverWrite , partitionby = List("chromosome"), table = table("dbnsfp_annovar") , view = view("dbnsfp_annovar")),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package bio.ferlab.datalake.spark3.publictables.normalized

import bio.ferlab.datalake.commons.config.{Coalesce, DatasetConf, RuntimeETLContext}
import bio.ferlab.datalake.spark3.etl.v3.SimpleETLP
import bio.ferlab.datalake.spark3.implicits.DatasetConfImplicits._
import bio.ferlab.datalake.spark3.implicits.GenomicImplicits.columns._
import mainargs.{ParserForMethods, main}
import org.apache.spark.sql._
import org.apache.spark.sql.functions._

import java.time.LocalDateTime

case class ClinvarVep(rc: RuntimeETLContext) extends SimpleETLP(rc) {

override val mainDestination: DatasetConf = conf.getDataset("normalized_clinvar_vep")

val clinvar_vep_vcf: DatasetConf = conf.getDataset("raw_clinvar_vep")

override def extract(lastRunDateTime: LocalDateTime,
currentRunDateTime: LocalDateTime): Map[String, DataFrame] = {
Map(clinvar_vep_vcf.id -> clinvar_vep_vcf.read)
}

override def transformSingle(data: Map[String, DataFrame],
lastRunDateTime: LocalDateTime,
currentRunDateTime: LocalDateTime): DataFrame = {

val df = data(clinvar_vep_vcf.id)

df.select(
chromosome,
start,
end,
reference,
alternate,
name,
(col("INFO_CLNSIG") as "clin_sig"),
col("INFO_CSQ") as "csq"
)
.withColumn(
"clin_sig",
split(regexp_replace(concat_ws("|", col("clin_sig")), "^_|\\|_|/", "|"), "\\|")
)
.withColumn("annotation", explode(col("csq")))
.drop("csq")
.select(
col("chromosome"),
col("start"),
col("end"),
col("reference"),
col("alternate"),
col("name"),
col("clin_sig"),
consequences,
biotype,
impact,
symbol,
ensembl_gene_id,
ensembl_transcript_id,
regexp_replace(hgvsp, "%3D", "=") as "hgvsp",
hgvsc,
hgvsg,
)

}

override val defaultRepartition: DataFrame => DataFrame = Coalesce()

}

object ClinvarVep {
@main
def run(rc: RuntimeETLContext): Unit = {
Clinvar(rc).run()
}

def main(args: Array[String]): Unit = ParserForMethods(this).runOrThrow(args)
}

49 changes: 49 additions & 0 deletions datalake-spark3/src/test/resources/config/reference_kf.conf
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,26 @@ datalake {
"valid_to_column"="valid_to"
}
},
{
format=VCF
id="raw_clinvar_vep"
keys=[]
loadtype=OverWrite
partitionby=[]
path="/raw/landing/clinvar_vep/clinvar.vep.vcf.gz"
readoptions {
flattenInfoFields="true"
"split_multiallelics"="true"
}
storageid="public_database"
writeoptions {
"created_on_column"="created_on"
"is_current_column"="is_current"
"updated_on_column"="updated_on"
"valid_from_column"="valid_from"
"valid_to_column"="valid_to"
}
},
{
format=VCF
id="raw_dbsnp"
Expand Down Expand Up @@ -539,6 +559,35 @@ datalake {
"valid_to_column"="valid_to"
}
},
{
format=DELTA
id="normalized_clinvar_vep"
keys=[]
loadtype=OverWrite
partitionby=[]
path="/public/clinvar_vep"
readoptions {}
repartition {
kind=Coalesce
n=1
}
storageid="public_database"
table {
database=variant
name=clinvar_vep
}
view {
database="variant_live"
name=clinvar_vep
}
writeoptions {
"created_on_column"="created_on"
"is_current_column"="is_current"
"updated_on_column"="updated_on"
"valid_from_column"="valid_from"
"valid_to_column"="valid_to"
}
},
{
format=DELTA
id="normalized_cosmic_gene_set"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package bio.ferlab.datalake.spark3.publictables.normalized

import bio.ferlab.datalake.commons.config.DatasetConf
import bio.ferlab.datalake.spark3.testmodels.normalized.NormalizedClinvarVep
import bio.ferlab.datalake.spark3.testmodels.raw.RawClinvarVep
import bio.ferlab.datalake.spark3.testutils.WithTestConfig
import bio.ferlab.datalake.testutils.{TestETLContext, WithSparkSession}
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import org.scalatest.{BeforeAndAfterAll, GivenWhenThen}


class ClinvarVepSpec extends AnyFlatSpec with GivenWhenThen with WithSparkSession with WithTestConfig with Matchers with BeforeAndAfterAll {

import spark.implicits._

val source: DatasetConf = conf.getDataset("raw_clinvar_vep")

"ClinvarVepSpec" should "transform ClinvarVep input to ClinvarVep output" in {

val df = Seq(RawClinvarVep()).toDF()
val result = new ClinvarVep(TestETLContext()).transformSingle(Map(source.id -> df))

result.as[NormalizedClinvarVep].collect() should contain theSameElementsAs Seq(NormalizedClinvarVep())

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/**
* Generated by [[bio.ferlab.datalake.testutils.ClassGenerator]]
* on 2023-08-10T16:07:51.090087
*/
package bio.ferlab.datalake.spark3.testmodels.normalized




case class NormalizedClinvarVep(`chromosome`: String = "1",
`start`: Long = 63411317,
`end`: Long = 63411318,
`reference`: String = "G",
`alternate`: String = "A",
`name`: String = "rs200676709",
`clin_sig`: Seq[String] = Seq("Likely_benign"),
`consequences`: Seq[String] = Seq("missense_variant"),
`biotype`: String = "protein_coding",
`impact`: String = "MODERATE",
`symbol`: String = "ALG6",
`ensembl_gene_id`: String = "ENSG00000088035",
`ensembl_transcript_id`: String = "ENST00000263440",
`hgvsp`: String = "ENSP00000263440.5:p.Gly222Asp",
`hgvsc`: String = "ENST00000263440.6:c.665G>A",
`hgvsg`: String = "1:g.63411316G>A")

Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/**
* Generated by [[bio.ferlab.datalake.testutils.ClassGenerator]]
* on 2023-08-10T16:50:15.592397
*/
package bio.ferlab.datalake.spark3.testmodels.raw

import bio.ferlab.datalake.spark3.testmodels.normalized.{Exon, Intron, AminoAcids, CODONS}


case class RawClinvarVep(`contigName`: String = "1",
`start`: Long = 63411316,
`end`: Long = 63411317,
`names`: Seq[String] = Seq("rs200676709"),
`referenceAllele`: String = "G",
`alternateAlleles`: Seq[String] = Seq("A"),
`qual`: Option[Double] = None,
`filters`: Option[Seq[String]] = None,
`splitFromMultiAllelic`: Boolean = false,
`INFO_AF_EXAC`: Double = 3.4E-4,
`INFO_CLNVCSO`: String = "SO:0001483",
`INFO_GENEINFO`: String = "ALG6:29929",
`INFO_CLNSIGINCL`: Option[Seq[String]] = None,
`INFO_CLNVI`: Seq[String] = Seq("Illumina_Clinical_Services_Laboratory", "Illumina:905420"),
`INFO_CLNDISDB`: Seq[String] = Seq("MONDO:MONDO:0011291", "MedGen:C2930997", "OMIM:603147", "Orphanet:ORPHA79320"),
`INFO_CLNREVSTAT`: Seq[String] = Seq("criteria_provided", "_single_submitter"),
`INFO_CLNDN`: Seq[String] = Seq("Congenital_disorder_of_glycosylation_type_1C"),
`INFO_ALLELEID`: Int = 746612,
`INFO_ORIGIN`: Seq[String] = Seq("1"),
`INFO_SSR`: Option[Int] = None,
`INFO_CLNDNINCL`: Option[Seq[String]] = None,
`INFO_CLNSIG`: Seq[String] = Seq("Likely_benign"),
`INFO_RS`: Seq[String] = Seq("532466353"),
`INFO_DBVARID`: Option[Seq[String]] = None,
`INFO_AF_TGP`: Double = 6.0E-4,
`INFO_CLNVC`: String = "single_nucleotide_variant",
`INFO_CLNHGVS`: Seq[String] = Seq("NC_000001.11:g.63411316G>A"),
`INFO_MC`: Seq[String] = Seq("SO:0001583|missense_variant"),
`INFO_CLNSIGCONF`: Seq[String] = Seq("Likely_benign(1)", "Uncertain_significance(2)"),
`INFO_CSQ`: Seq[INFO_CSQ_VEP] = Seq(INFO_CSQ_VEP()),
`INFO_AF_ESP`: Double = 1.5E-4,
`INFO_CLNDISDBINCL`: Option[Seq[String]] = None,
`genotypes`: Seq[GENOTYPES] = Seq(GENOTYPES()))

case class INFO_CSQ_VEP(`Allele`: String = "C",
`Consequence`: Seq[String] = Seq("missense_variant"),
`IMPACT`: String = "MODERATE",
`SYMBOL`: String = "ALG6",
`Gene`: String = "ENSG00000088035",
`Feature_type`: String = "Transcript",
`Feature`: String = "ENST00000263440",
`BIOTYPE`: String = "protein_coding",
`EXON`: Exon = Exon(),
`INTRON`: Intron = Intron(),
`HGVSc`: String = "ENST00000263440.6:c.665G>A",
`HGVSp`: String = "ENSP00000263440.5:p.Gly222Asp",
`cDNA_position`: Int = 843,
`CDS_position`: Int = 807,
`Protein_position`: Int = 269,
`Amino_acids`: AminoAcids = AminoAcids(),
`Codons`: CODONS = CODONS(),
`Existing_variation`: Seq[String] = Seq("rs200676709"),
`DISTANCE`: Option[Int] = None,
`STRAND`: Int = 1,
`FLAGS`: Option[Seq[String]] = None,
`PICK`: String = "1",
`VARIANT_CLASS`: String = "SNV",
`SYMBOL_SOURCE`: String = "HGNC",
`HGNC_ID`: String = "HGNC:14825",
`CANONICAL`: String = "YES",
`RefSeq`: String = "NM_001005484.1&NM_001005484.1&NM_001005484.2",
`HGVS_OFFSET`: Option[String] = None,
`HGVSg`: String = "1:g.63411316G>A",
`CLIN_SIG`: Option[String] = None,
`SOMATIC`: Option[String] = None,
`PHENO`: Option[String] = None,
`PUBMED`: String = "29135816",
`CADD_raw_rankscore`: Option[String] = None,
`DANN_rankscore`: Option[String] = None,
`Ensembl_geneid`: Option[String] = None,
`Ensembl_transcriptid`: Option[String] = None,
`ExAC_AC`: Option[String] = None,
`ExAC_AF`: Option[String] = None,
`FATHMM_converted_rankscore`: Option[String] = None,
`FATHMM_pred`: Option[String] = None,
`GTEx_V7_tissue`: Option[String] = None,
`Interpro_domain`: Option[String] = None,
`LRT_converted_rankscore`: Option[String] = None,
`LRT_pred`: Option[String] = None,
`Polyphen2_HVAR_pred`: Option[String] = None,
`Polyphen2_HVAR_rankscore`: Option[String] = None,
`REVEL_rankscore`: Option[String] = None,
`SIFT_converted_rankscore`: Option[String] = None,
`SIFT_pred`: Option[String] = None,
`UK10K_AC`: Option[String] = None,
`UK10K_AF`: Option[String] = None,
`clinvar_MedGen_id`: Option[String] = None,
`clinvar_OMIM_id`: Option[String] = None,
`clinvar_Orphanet_id`: Option[String] = None,
`clinvar_clnsig`: Option[String] = None,
`clinvar_id`: Option[String] = None,
`clinvar_trait`: Option[String] = None,
`gnomAD_exomes_AC`: Option[String] = None,
`gnomAD_exomes_AF`: Option[String] = None,
`gnomAD_genomes_AC`: Option[String] = None,
`gnomAD_genomes_AF`: Option[String] = None,
`phyloP17way_primate_rankscore`: Option[String] = None,
`rs_dbSNP151`: Option[String] = None)