Skip to content

Commit

Permalink
Merge pull request #47 from bento-platform/releases/v3.8
Browse files Browse the repository at this point in the history
v3.8
  • Loading branch information
brouillette authored Jun 3, 2023
2 parents dcef2db + 905f7f5 commit 1994d7f
Show file tree
Hide file tree
Showing 16 changed files with 370 additions and 162 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@
# move vcf.gz files to `$GOHAN_API_VCF_PATH`
# ingest vcf.gz
curl -k https://gohan.local/variants/ingestion/run\?fileNames=<filename>\&assemblyId=GRCh37\&filterOutHomozygousReferences=true\&tableId=<table id>
curl -k https://gohan.local/variants/ingestion/run\?fileNames=<filename>\&assemblyId=GRCh37\&filterOutReferences=true\&tableId=<table id>
# monitor progress:
curl -k https://gohan.local/variants/ingestion/requests
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ services:
- DATA=/drs/bento_drs/data/obj/ # DRS file objects, vs. the database
- INTERNAL_PORT=${GOHAN_DRS_INTERNAL_PORT}
volumes:
- ${GOHAN_DRS_DATA_DIR}:/drs/data
- ${GOHAN_DRS_DATA_DIR}:/drs/bento_drs/data
- ${GOHAN_API_DRS_BRIDGE_HOST_DIR}:${GOHAN_DRS_API_DRS_BRIDGE_DIR_CONTAINERIZED}
healthcheck:
test: [ "CMD", "curl", "http://localhost:${GOHAN_DRS_INTERNAL_PORT}" ]
Expand Down
4 changes: 2 additions & 2 deletions etc/example.env
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

GOHAN_DEBUG=false
GOHAN_SERVICE_CONTACT=someone@somewhere.ca
GOHAN_SEMVER=3.7.2
GOHAN_SEMVER=3.8.0
GOHAN_SERVICES="gateway api elasticsearch kibana drs authorization"

# GOOS=linux
Expand Down Expand Up @@ -112,7 +112,7 @@ GOHAN_KB_ES_PORT=9200

# DRS
GOHAN_DRS_IMAGE=ghcr.io/bento-platform/bento_drs
GOHAN_DRS_VERSION=0.8.0
GOHAN_DRS_VERSION=0.9.0
GOHAN_DRS_CONTAINER_NAME=gohan-drs
GOHAN_DRS_INTERNAL_PORT=5000
GOHAN_DRS_EXTERNAL_PORT=6000
Expand Down
9 changes: 5 additions & 4 deletions src/api/models/constants/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ var ValidTableDataTypes = []string{"variant"}
var VcfHeaders = []string{"chrom", "pos", "id", "ref", "alt", "qual", "filter", "info", "format"}

/*
Defines a set of base level
constants and enums to be used
throughout Gohan and it's
associated services.
Defines a set of base level
constants and enums to be used
throughout Gohan and it's
associated services.
*/
type AssemblyId string
type Chromosome string
Expand All @@ -16,3 +16,4 @@ type SearchOperation string
type SortDirection string

type Zygosity int
type Ploidy int
16 changes: 16 additions & 0 deletions src/api/models/constants/ploidy/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package ploidy

import (
"gohan/api/models/constants"
)

const (
Unknown constants.Ploidy = iota

Haploid
Diploid
)

func IsKnown(value int) bool {
return value > int(Unknown) && value <= int(Diploid)
}
18 changes: 15 additions & 3 deletions src/api/models/constants/zygosity/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,35 @@ import (

const (
Unknown constants.Zygosity = iota
// Diploid or higher
Heterozygous
HomozygousReference
HomozygousAlternate

// Haploid (deliberately below diploid for sequential id'ing purposes)
Reference
Alternate
)

func IsKnown(value int) bool {
return value > int(Unknown) && value <= int(HomozygousAlternate)
return value > int(Unknown) && value <= int(Alternate)
}

func ZygosityToString(zyg constants.Zygosity) string {
switch zyg {
// Haploid
case Reference:
return "REFERENCE"
case Alternate:
return "ALTERNATE"

// Diploid or higher
case Heterozygous:
return "HETEROZYGOUS"
case HomozygousAlternate:
return "HOMOZYGOUS_ALTERNATE"
case HomozygousReference:
return "HOMOZYGOUS_REFERENCE"
case HomozygousAlternate:
return "HOMOZYGOUS_ALTERNATE"
default:
return "UNKNOWN"
}
Expand Down
30 changes: 26 additions & 4 deletions src/api/mvc/data-types/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,32 @@ package dataTypes
import (
"net/http"

"gohan/api/contexts"
"gohan/api/models/schemas"

variantService "gohan/api/services/variants"

"github.com/labstack/echo"
)

var variantDataTypeJson = map[string]interface{}{
"id": "variant",
"label": "Variants",
"schema": schemas.VARIANT_SCHEMA,
"id": "variant",
"label": "Variants",
"queryable": true,
"schema": schemas.VARIANT_SCHEMA,
}

// "metadata_schema": schemas.VARIANT_TABLE_METADATA_SCHEMA,

func GetDataTypes(c echo.Context) error {
es := c.(*contexts.GohanContext).Es7Client
cfg := c.(*contexts.GohanContext).Config

// accumulate number of variants associated with each
// sampleId fetched from the variants overview
// TODO: refactor to handle errors better
resultsMap := variantService.GetVariantsOverview(es, cfg)
variantDataTypeJson["count"] = sumAllValues(resultsMap["sampleIDs"])

// Data types are basically stand-ins for schema blocks
return c.JSON(http.StatusOK, []map[string]interface{}{
variantDataTypeJson,
Expand All @@ -34,3 +46,13 @@ func GetVariantDataTypeSchema(c echo.Context) error {
func GetVariantDataTypeMetadataSchema(c echo.Context) error {
return c.JSON(http.StatusOK, schemas.VARIANT_TABLE_METADATA_SCHEMA)
}

// - helpers
func sumAllValues(keyedValues interface{}) float64 {
tmpValueStrings := keyedValues.(map[string]interface{})
sum := 0.0
for _, k := range tmpValueStrings {
sum += k.(float64)
}
return sum
}
16 changes: 11 additions & 5 deletions src/api/mvc/service-info/main.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package serviceInfo

import (
"fmt"
"gohan/api/contexts"
serviceInfo "gohan/api/models/constants/service-info"

Expand All @@ -13,10 +12,17 @@ import (
// Spec: https://github.com/ga4gh-discovery/ga4gh-service-info
func GetServiceInfo(c echo.Context) error {
return c.JSON(http.StatusOK, map[string]interface{}{
"id": serviceInfo.SERVICE_ID,
"name": serviceInfo.SERVICE_NAME,
"type": fmt.Sprintf("%s:%s", serviceInfo.SERVICE_TYPE_NO_VER, c.(*contexts.GohanContext).Config.SemVer),

"bento": map[string]interface{}{
"dataService": true,
"serviceKind": serviceInfo.SERVICE_ARTIFACT,
},
"type": map[string]interface{}{
"artifact": serviceInfo.SERVICE_ARTIFACT,
"group": serviceInfo.SERVICE_TYPE_NO_VER,
"version": c.(*contexts.GohanContext).Config.SemVer,
},
"id": serviceInfo.SERVICE_ID,
"name": serviceInfo.SERVICE_NAME,
"description": serviceInfo.SERVICE_DESCRIPTION,
"organization": map[string]string{
"name": "C3G",
Expand Down
150 changes: 89 additions & 61 deletions src/api/mvc/variants/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,57 +111,13 @@ func VariantsIngest(c echo.Context) error {
ingestionService := c.(*contexts.GohanContext).IngestionService

// retrieve query parameters (comman separated)
fileNames := strings.Split(c.QueryParam("fileNames"), ",")
for i, fileName := range fileNames {
if fileName == "" {
// TODO: create a standard response object
return c.JSON(http.StatusBadRequest, "{\"error\" : \"Missing 'fileNames' query parameter!\"}")
} else {
// remove DRS bridge directory base path from the requested filenames (if present)
if strings.HasPrefix(fileName, cfg.Drs.BridgeDirectory) {
replaced := strings.Replace(fileName, cfg.Drs.BridgeDirectory, "", 1)

replacedDirectory, replacedFileName := path.Split(replaced)
// strip the leading '/' away
if replacedDirectory == "/" {
fileNames[i] = replacedFileName
} else {
fileNames[i] = replaced
}
}
}
}

assemblyId := a.CastToAssemblyId(c.QueryParam("assemblyId"))
tableId := c.QueryParam("tableId")
// TODO: validate table exists in elasticsearch

// -- optional filter
var (
filterOutHomozygousReferences bool = false // default
fohrErr error
)
filterOutHomozygousReferencesQP := c.QueryParam("filterOutHomozygousReferences")
if len(filterOutHomozygousReferencesQP) > 0 {
filterOutHomozygousReferences, fohrErr = strconv.ParseBool(filterOutHomozygousReferencesQP)
if fohrErr != nil {
fmt.Printf("Error parsing filterOutHomozygousReferences: %s, [%s] - defaulting to 'false'\n", filterOutHomozygousReferencesQP, fohrErr)
// defaults to false
}
}

startTime := time.Now()
fmt.Printf("Ingest Start: %s\n", startTime)

var fileNames []string
// get vcf files
var vcfGzfiles []string

// TODO: simply load files by filename provided
// rather than load all available files and looping over them
// -----
// Read all files and temporarily catalog all .vcf.gz files
err := filepath.Walk(vcfPath,
func(absoluteFileName string, info os.FileInfo, err error) error {
// helper function
accumulatorWalkFunc := func(bucket *[]string) func(absoluteFileName string, info os.FileInfo, err error) error {
return func(absoluteFileName string, info os.FileInfo, err error) error {
if err != nil {
return err
}
Expand All @@ -181,28 +137,98 @@ func VariantsIngest(c echo.Context) error {
}

// Filter only .vcf.gz files
// if fileName != "" {
if matched, _ := regexp.MatchString(".vcf.gz", relativePathFileName); matched {
vcfGzfiles = append(vcfGzfiles, relativePathFileName)
*bucket = append(*bucket, relativePathFileName)
} else {
fmt.Printf("Skipping %s\n", relativePathFileName)
}
// }

return nil
})
if err != nil {
log.Println(err)
}
}
//

// Locate fileName from request inside found files
for _, fileName := range fileNames {
if !utils.StringInSlice(fileName, vcfGzfiles) {
return c.JSON(http.StatusBadRequest, "{\"error\" : \"file "+fileName+" not found! Aborted -- \"}")
dirName := c.QueryParam("directory")
if dirName != "" {
if strings.HasPrefix(dirName, cfg.Drs.BridgeDirectory) {
replaced := strings.Replace(dirName, cfg.Drs.BridgeDirectory, "", 1)

replacedFullPath, replacedDirName := path.Split(replaced)
// strip the leading '/' away
if replacedFullPath == "/" {
dirName = replacedDirName
} else {
dirName = replaced
}
}

err := filepath.Walk(fmt.Sprintf("%s/%s", vcfPath, dirName), accumulatorWalkFunc(&fileNames))
if err != nil {
log.Println(err)
}
} else {
fileNames = strings.Split(c.QueryParam("fileNames"), ",")
for i, fileName := range fileNames {
if fileName == "" {
// TODO: create a standard response object
return c.JSON(http.StatusBadRequest, "{\"error\" : \"Missing 'fileNames' query parameter!\"}")
} else {
// remove DRS bridge directory base path from the requested filenames (if present)
if strings.HasPrefix(fileName, cfg.Drs.BridgeDirectory) {
replaced := strings.Replace(fileName, cfg.Drs.BridgeDirectory, "", 1)

replacedDirectory, replacedFileName := path.Split(replaced)
// strip the leading '/' away
if replacedDirectory == "/" {
fileNames[i] = replacedFileName
} else {
fileNames[i] = replaced
}
}
}
}

// TODO: simply load files by filename provided
// rather than load all available files and looping over them
// -----
// Read all files and temporarily catalog all .vcf.gz files
err := filepath.Walk(vcfPath, accumulatorWalkFunc(&vcfGzfiles))
if err != nil {
log.Println(err)
}

// Locate fileName from request inside found files
for _, fileName := range fileNames {
if !utils.StringInSlice(fileName, vcfGzfiles) {
return c.JSON(http.StatusBadRequest, "{\"error\" : \"file "+fileName+" not found! Aborted -- \"}")
}
}
// -----
}

assemblyId := a.CastToAssemblyId(c.QueryParam("assemblyId"))
tableId := c.QueryParam("tableId")
// TODO: validate table exists in elasticsearch

// -- optional filter
var (
filterOutReferences bool = false // default
fohrErr error
)
filterOutReferencesQP := c.QueryParam("filterOutReferences")
if len(filterOutReferencesQP) > 0 {
filterOutReferences, fohrErr = strconv.ParseBool(filterOutReferencesQP)
if fohrErr != nil {
fmt.Printf("Error parsing filterOutReferences: %s, [%s] - defaulting to 'false'\n", filterOutReferencesQP, fohrErr)
// defaults to false
}
}
// -----

startTime := time.Now()
fmt.Printf("Ingest Start: %s\n", startTime)

// ingest vcf
// ingserviceMux := sync.RWMutex{}
responseDtos := []ingest.IngestResponseDTO{}
for _, fileName := range fileNames {

Expand Down Expand Up @@ -239,7 +265,9 @@ func VariantsIngest(c echo.Context) error {
ingestionService.ConcurrentFileIngestionQueue <- true
go func(gzippedFileName string, reqStat *ingest.VariantIngestRequest) {
// free up a spot in the queue
defer func() { <-ingestionService.ConcurrentFileIngestionQueue }()
defer func() {
<-ingestionService.ConcurrentFileIngestionQueue
}()

fmt.Printf("Begin running %s !\n", gzippedFileName)
reqStat.State = ingest.Running
Expand Down Expand Up @@ -382,7 +410,7 @@ func VariantsIngest(c echo.Context) error {
// --- load vcf into memory and ingest the vcf file into elasticsearch
beginProcessingTime := time.Now()
fmt.Printf("Begin processing %s at [%s]\n", gzippedFilePath, beginProcessingTime)
ingestionService.ProcessVcf(gzippedFilePath, drsFileId, tableId, assemblyId, filterOutHomozygousReferences, cfg.Api.LineProcessingConcurrencyLevel)
ingestionService.ProcessVcf(gzippedFilePath, drsFileId, tableId, assemblyId, filterOutReferences, cfg.Api.LineProcessingConcurrencyLevel)
fmt.Printf("Ingest duration for file at %s : %s\n", gzippedFilePath, time.Since(beginProcessingTime))

reqStat.State = ingest.Done
Expand Down
Loading

0 comments on commit 1994d7f

Please sign in to comment.