diff --git a/cmd/main.go b/cmd/main.go index 93fafa9..09c51cd 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -1,7 +1,7 @@ package main import ( - common "github.com/mimiro-io/common-datalayer" + cdl "github.com/mimiro-io/common-datalayer" layer "github.com/mimiro-io/filesystem-datalayer" "os" ) @@ -14,5 +14,5 @@ func main() { if len(args) >= 1 { configFolderLocation = args[0] } - common.NewServiceRunner(layer.NewFileSystemDataLayer).WithConfigLocation(configFolderLocation).StartAndWait() + cdl.NewServiceRunner(layer.NewFileSystemDataLayer).WithConfigLocation(configFolderLocation).StartAndWait() } diff --git a/go.mod b/go.mod index d5b2960..b052baa 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,10 @@ require ( require ( github.com/DataDog/datadog-go/v5 v5.5.0 // indirect github.com/Microsoft/go-winio v0.6.1 // indirect + github.com/apache/thrift v0.16.0 // indirect + github.com/fraugster/parquet-go v0.12.0 // indirect github.com/golang-jwt/jwt v3.2.2+incompatible // indirect + github.com/golang/snappy v0.0.4 // indirect github.com/labstack/echo/v4 v4.11.4 // indirect github.com/labstack/gommon v0.4.2 // indirect github.com/mattn/go-colorable v0.1.13 // indirect diff --git a/go.sum b/go.sum index 255ef1c..c4b3025 100644 --- a/go.sum +++ b/go.sum @@ -1,58 +1,98 @@ +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/DataDog/datadog-go/v5 v5.5.0 h1:G5KHeB8pWBNXT4Jtw0zAkhdxEAWSpWH00geHI6LDrKU= github.com/DataDog/datadog-go/v5 v5.5.0/go.mod h1:K9kcYBlxkcPP8tvvjZZKs/m1edNAUFzBbdpTUKfCsuw= github.com/Microsoft/go-winio v0.5.0/go.mod h1:JPGBdM1cNvN/6ISo+n8V5iA4v8pBzdOpzfwIujj1a84= github.com/Microsoft/go-winio v0.6.1 h1:9/kr64B9VUZrLm5YYwbGtUJnMgqWVOdUAXu6Migciow= github.com/Microsoft/go-winio v0.6.1/go.mod h1:LRdKpFKfdobln8UmuiYcKPot9D2v6svN5+sAH+4kjUM= +github.com/apache/thrift v0.16.0 h1:qEy6UW60iVOlUy+b9ZR0d5WzUWYGOo4HfopoyBaNmoY= +github.com/apache/thrift v0.16.0/go.mod h1:PHK3hniurgQaNMZYaCLEqXKsYK8upmhPbmdP2FXSqgU= +github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de/go.mod h1:DCaWoUhZrYW9p1lxo/cm8EmUOOzAPSEZNGF2DK1dJgw= +github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= +github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= +github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk= +github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= +github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/fraugster/parquet-go v0.12.0 h1:1slnC5y2VWEOUSlzbeXatM0BvSWcLUDsR/EcZsXXCZc= +github.com/fraugster/parquet-go v0.12.0/go.mod h1:dGzUxdNqXsAijatByVgbAWVPlFirnhknQbdazcUIjY0= +github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/golang-jwt/jwt v3.2.2+incompatible h1:IfV12K8xAKAnZqdXVzCZ+TOjboZ2keLg81eXfW3O+oY= github.com/golang-jwt/jwt v3.2.2+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I= +github.com/golang/mock v1.5.0/go.mod h1:CWnOUgYIOo4TcNZ0wHX3YZCqsaM1I1Jvs6v3mP3KVu8= github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs= github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= +github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/labstack/echo/v4 v4.11.4 h1:vDZmA+qNeh1pd/cCkEicDMrjtrnMGQ1QFI9gWN1zGq8= github.com/labstack/echo/v4 v4.11.4/go.mod h1:noh7EvLwqDsmh/X/HWKPUl1AjzJrhyptRyEbQJfxen8= github.com/labstack/gommon v0.4.2 h1:F8qTUNXgG1+6WQmqoUWnz8WiEU60mXVVw0P4ht1WRA0= github.com/labstack/gommon v0.4.2/go.mod h1:QlUFxVM+SNXhDL/Z7YhocGIBYOiwB0mXm1+1bAPHPyU= +github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= -github.com/mimiro-io/common-datalayer v0.2.2 h1:UcU2nBYTPvuq0c50hmeJSzSenNNX4+tLCqT3oFQqo5g= -github.com/mimiro-io/common-datalayer v0.2.2/go.mod h1:z7/NzfQfYdzU+a9nEme0cyknl7R2D+JorgZrxr0iJYE= +github.com/mattn/go-runewidth v0.0.10/go.mod h1:RAqKPSqVFrSLVXbA8x7dzmKdmGzieGRCM46jaSJTDAk= +github.com/mimiro-io/common-datalayer v0.2.3 h1:+skqepLrP6j1wO1+JP23eObbF0JKut9DfjOPur8uSJU= +github.com/mimiro-io/common-datalayer v0.2.3/go.mod h1:vvJtWkOUuDuIiPewitWEfutA4M6oVFC+LM5iM48oa4E= github.com/mimiro-io/entity-graph-data-model v0.7.5 h1:LI+A7EqET/iZ+DrtgcQqxDib8UlCB3YNw9qc4xiYz4s= github.com/mimiro-io/entity-graph-data-model v0.7.5/go.mod h1:A76+PPQYwU1UkAl6OPcxh63gCnCIHXd47JLbTQxLNRA= +github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= +github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= +github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= github.com/rs/zerolog v1.32.0 h1:keLypqrlIjaFsbmJOBdB/qvyF8KEtCWHwobLp5l/mQ0= github.com/rs/zerolog v1.32.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss= +github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= +github.com/scylladb/termtables v0.0.0-20191203121021-c4c0b6d42ff4/go.mod h1:C1a7PQSMz9NShzorzCiG2fk9+xuCgLkPeCvMHYR2OWg= github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= +github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= +github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= +github.com/spf13/cobra v0.0.5/go.mod h1:3K3wKZymM7VvHMDS9+Akkh4K60UwM26emMESw8tLCHU= +github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo= +github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= +github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/fasttemplate v1.2.2 h1:lxLXG0uE3Qnshl9QyaK6XJxMXlQZELvChBOCmQD0Loo= github.com/valyala/fasttemplate v1.2.2/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ= +github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= +golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.19.0 h1:ENy+Az/9Y1vSrlrvBSyna3PITt4tiZLf7sgCjZBX7Wo= golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.15.0 h1:SernR4v+D55NyBH2QiEQrlBAnj1ECL6AGrA5+dPaMY8= golang.org/x/mod v0.15.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= @@ -65,6 +105,7 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ= golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -93,6 +134,8 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/layer.go b/layer.go index 0e4fd93..fa731e7 100644 --- a/layer.go +++ b/layer.go @@ -5,7 +5,7 @@ import ( "encoding/json" "fmt" "github.com/google/uuid" - layer "github.com/mimiro-io/common-datalayer" + cdl "github.com/mimiro-io/common-datalayer" "github.com/mimiro-io/common-datalayer/encoder" egdm "github.com/mimiro-io/entity-graph-data-model" "io/fs" @@ -16,13 +16,13 @@ import ( ) type FileSystemDataLayer struct { - config *layer.Config - logger layer.Logger - metrics layer.Metrics + config *cdl.Config + logger cdl.Logger + metrics cdl.Metrics datasets map[string]*FileSystemDataset } -func NewFileSystemDataLayer(conf *layer.Config, logger layer.Logger, metrics layer.Metrics) (layer.DataLayerService, error) { +func NewFileSystemDataLayer(conf *cdl.Config, logger cdl.Logger, metrics cdl.Metrics) (cdl.DataLayerService, error) { datalayer := &FileSystemDataLayer{config: conf, logger: logger, metrics: metrics} err := datalayer.UpdateConfiguration(conf) @@ -38,7 +38,7 @@ func (dl *FileSystemDataLayer) Stop(ctx context.Context) error { return nil } -func (dl *FileSystemDataLayer) UpdateConfiguration(config *layer.Config) layer.LayerError { +func (dl *FileSystemDataLayer) UpdateConfiguration(config *cdl.Config) cdl.LayerError { dl.config = config dl.datasets = make(map[string]*FileSystemDataset) @@ -47,28 +47,28 @@ func (dl *FileSystemDataLayer) UpdateConfiguration(config *layer.Config) layer.L dl.datasets[dataset.DatasetName], err = NewFileSystemDataset(dataset.DatasetName, config.NativeSystemConfig["path"].(string), dataset, dl.logger) if err != nil { - return layer.Err(fmt.Errorf("could not create dataset %s because %s", dataset.DatasetName, err.Error()), layer.LayerErrorInternal) + return cdl.Err(fmt.Errorf("could not create dataset %s because %s", dataset.DatasetName, err.Error()), cdl.LayerErrorInternal) } } return nil } -func (dl *FileSystemDataLayer) Dataset(dataset string) (layer.Dataset, layer.LayerError) { +func (dl *FileSystemDataLayer) Dataset(dataset string) (cdl.Dataset, cdl.LayerError) { ds, ok := dl.datasets[dataset] if !ok { - return nil, layer.Err(fmt.Errorf("dataset %s not found", dataset), layer.LayerErrorBadParameter) + return nil, cdl.Err(fmt.Errorf("dataset %s not found", dataset), cdl.LayerErrorBadParameter) } return ds, nil } -func (dl *FileSystemDataLayer) DatasetDescriptions() []*layer.DatasetDescription { - var datasetDescriptions []*layer.DatasetDescription +func (dl *FileSystemDataLayer) DatasetDescriptions() []*cdl.DatasetDescription { + var datasetDescriptions []*cdl.DatasetDescription // iterate over the datasest testconfig and create one for each for key := range dl.datasets { - datasetDescriptions = append(datasetDescriptions, &layer.DatasetDescription{Name: key}) + datasetDescriptions = append(datasetDescriptions, &cdl.DatasetDescription{Name: key}) } return datasetDescriptions @@ -130,7 +130,7 @@ type FileSystemDatasetConfig struct { WriteIncrementalAppend bool `json:"write_incremental_append"` } -func NewFileSystemDataset(name string, path string, datasetDefinition *layer.DatasetDefinition, logger layer.Logger) (*FileSystemDataset, error) { +func NewFileSystemDataset(name string, path string, datasetDefinition *cdl.DatasetDefinition, logger cdl.Logger) (*FileSystemDataset, error) { sourceConfig := datasetDefinition.SourceConfig encoding, ok := sourceConfig["encoding"].(string) @@ -150,9 +150,9 @@ func NewFileSystemDataset(name string, path string, datasetDefinition *layer.Dat } type FileSystemDataset struct { - logger layer.Logger + logger cdl.Logger name string // dataset name - datasetDefinition *layer.DatasetDefinition // the dataset definition with mappings etc + datasetDefinition *cdl.DatasetDefinition // the dataset definition with mappings etc config *FileSystemDatasetConfig // the dataset config } @@ -164,7 +164,7 @@ func (f FileSystemDataset) Name() string { return f.name } -func (f FileSystemDataset) FullSync(ctx context.Context, batchInfo layer.BatchInfo) (layer.DatasetWriter, layer.LayerError) { +func (f FileSystemDataset) FullSync(ctx context.Context, batchInfo cdl.BatchInfo) (cdl.DatasetWriter, cdl.LayerError) { var file *os.File var err error filePath := filepath.Join(f.config.WritePath, f.config.WriteFullSyncFileName) @@ -172,24 +172,24 @@ func (f FileSystemDataset) FullSync(ctx context.Context, batchInfo layer.BatchIn if batchInfo.IsStartBatch { file, err = os.Create(tmpFilePath) if err != nil { - return nil, layer.Err(fmt.Errorf("could not create file %s", tmpFilePath), layer.LayerErrorInternal) + return nil, cdl.Err(fmt.Errorf("could not create file %s", tmpFilePath), cdl.LayerErrorInternal) } } else { file, err = os.OpenFile(tmpFilePath, os.O_APPEND|os.O_WRONLY, 0644) if err != nil { - return nil, layer.Err(fmt.Errorf("could not open file %s", tmpFilePath), layer.LayerErrorInternal) + return nil, cdl.Err(fmt.Errorf("could not open file %s", tmpFilePath), cdl.LayerErrorInternal) } } enc, err := encoder.NewItemWriter(f.datasetDefinition.SourceConfig, f.logger, file, &batchInfo) factory, err := encoder.NewItemFactory(f.datasetDefinition.SourceConfig) - mapper := layer.NewMapper(f.logger, f.datasetDefinition.IncomingMappingConfig, f.datasetDefinition.OutgoingMappingConfig) + mapper := cdl.NewMapper(f.logger, f.datasetDefinition.IncomingMappingConfig, f.datasetDefinition.OutgoingMappingConfig) datasetWriter := &FileSystemDatasetWriter{logger: f.logger, enc: enc, mapper: mapper, factory: factory, tmpFullSyncPath: tmpFilePath, fullSyncFilePath: filePath, closeFullSync: batchInfo.IsLastBatch} return datasetWriter, nil } -func (f FileSystemDataset) Incremental(ctx context.Context) (layer.DatasetWriter, layer.LayerError) { +func (f FileSystemDataset) Incremental(ctx context.Context) (cdl.DatasetWriter, cdl.LayerError) { var file *os.File var err error @@ -201,12 +201,12 @@ func (f FileSystemDataset) Incremental(ctx context.Context) (layer.DatasetWriter if os.IsNotExist(err) { file, err = os.Create(filePath) if err != nil { - return nil, layer.Err(fmt.Errorf("could not create file %s", filePath), layer.LayerErrorInternal) + return nil, cdl.Err(fmt.Errorf("could not create file %s", filePath), cdl.LayerErrorInternal) } } else { file, err = os.OpenFile(filePath, os.O_APPEND|os.O_WRONLY, 0644) if err != nil { - return nil, layer.Err(fmt.Errorf("could not open file for appending %s", filePath), layer.LayerErrorInternal) + return nil, cdl.Err(fmt.Errorf("could not open file for appending %s", filePath), cdl.LayerErrorInternal) } } } else { @@ -215,53 +215,53 @@ func (f FileSystemDataset) Incremental(ctx context.Context) (layer.DatasetWriter filePath := filepath.Join(f.config.WritePath, partfileName) file, err = os.Create(filePath) if err != nil { - return nil, layer.Err(fmt.Errorf("could not create file %s", filePath), layer.LayerErrorInternal) + return nil, cdl.Err(fmt.Errorf("could not create file %s", filePath), cdl.LayerErrorInternal) } } enc, err := encoder.NewItemWriter(f.datasetDefinition.SourceConfig, f.logger, file, nil) factory, err := encoder.NewItemFactory(f.datasetDefinition.SourceConfig) - mapper := layer.NewMapper(f.logger, f.datasetDefinition.IncomingMappingConfig, f.datasetDefinition.OutgoingMappingConfig) + mapper := cdl.NewMapper(f.logger, f.datasetDefinition.IncomingMappingConfig, f.datasetDefinition.OutgoingMappingConfig) datasetWriter := &FileSystemDatasetWriter{logger: f.logger, enc: enc, mapper: mapper, factory: factory} return datasetWriter, nil } type FileSystemDatasetWriter struct { - logger layer.Logger + logger cdl.Logger enc encoder.ItemWriter factory encoder.ItemFactory - mapper *layer.Mapper + mapper *cdl.Mapper tmpFullSyncPath string fullSyncFilePath string closeFullSync bool } -func (f FileSystemDatasetWriter) Write(entity *egdm.Entity) layer.LayerError { +func (f FileSystemDatasetWriter) Write(entity *egdm.Entity) cdl.LayerError { item := f.factory.NewItem() err := f.mapper.MapEntityToItem(entity, item) if err != nil { - return layer.Err(fmt.Errorf("could not map entity to item because %s", err.Error()), layer.LayerErrorInternal) + return cdl.Err(fmt.Errorf("could not map entity to item because %s", err.Error()), cdl.LayerErrorInternal) } err = f.enc.Write(item) if err != nil { - return layer.Err(fmt.Errorf("could not write item to file because %s", err.Error()), layer.LayerErrorInternal) + return cdl.Err(fmt.Errorf("could not write item to file because %s", err.Error()), cdl.LayerErrorInternal) } return nil } -func (f FileSystemDatasetWriter) Close() layer.LayerError { +func (f FileSystemDatasetWriter) Close() cdl.LayerError { err := f.enc.Close() if err != nil { - return layer.Err(fmt.Errorf("could not close file because %s", err.Error()), layer.LayerErrorInternal) + return cdl.Err(fmt.Errorf("could not close file because %s", err.Error()), cdl.LayerErrorInternal) } if f.closeFullSync { err = os.Rename(f.tmpFullSyncPath, f.fullSyncFilePath) if err != nil { - return layer.Err(fmt.Errorf("could not rename file because %s", err.Error()), layer.LayerErrorInternal) + return cdl.Err(fmt.Errorf("could not rename file because %s", err.Error()), cdl.LayerErrorInternal) } } @@ -273,10 +273,10 @@ type FileInfo struct { Path string } -func (f FileSystemDataset) Changes(since string, limit int, latestOnly bool) (layer.EntityIterator, layer.LayerError) { +func (f FileSystemDataset) Changes(since string, limit int, latestOnly bool) (cdl.EntityIterator, cdl.LayerError) { // get root folder if _, err := os.Stat(f.config.ReadPath); os.IsNotExist(err) { - return nil, layer.Err(fmt.Errorf("path %s does not exist", f.config.ReadPath), layer.LayerErrorBadParameter) + return nil, cdl.Err(fmt.Errorf("path %s does not exist", f.config.ReadPath), cdl.LayerErrorBadParameter) } // check if we are recursive and get all folders @@ -306,7 +306,7 @@ func (f FileSystemDataset) Changes(since string, limit int, latestOnly bool) (la }) if err != nil { - return nil, layer.Err(fmt.Errorf("could not read directory %s", f.config.ReadPath), layer.LayerErrorBadParameter) + return nil, cdl.Err(fmt.Errorf("could not read directory %s", f.config.ReadPath), cdl.LayerErrorBadParameter) } } else { folders = append(folders, f.config.ReadPath) @@ -319,7 +319,7 @@ func (f FileSystemDataset) Changes(since string, limit int, latestOnly bool) (la for _, folder := range folders { files, err := os.ReadDir(folder) if err != nil { - return nil, layer.Err(fmt.Errorf("could not read directory %s", folder), layer.LayerErrorBadParameter) + return nil, cdl.Err(fmt.Errorf("could not read directory %s", folder), cdl.LayerErrorBadParameter) } for _, file := range files { @@ -332,19 +332,19 @@ func (f FileSystemDataset) Changes(since string, limit int, latestOnly bool) (la fileName := file.Entry.Name() isMatch, err := filepath.Match(f.config.ReadFilePattern, fileName) if err != nil { - return nil, layer.Err(fmt.Errorf("could not match file pattern %s", f.config.ReadFilePattern), layer.LayerErrorInternal) + return nil, cdl.Err(fmt.Errorf("could not match file pattern %s", f.config.ReadFilePattern), cdl.LayerErrorInternal) } if isMatch { if f.config.SupportSinceByFileTimestamp && since != "" { finfo, err := file.Entry.Info() if err != nil { - return nil, layer.Err(fmt.Errorf("could not get file info for %s", fileName), layer.LayerErrorInternal) + return nil, cdl.Err(fmt.Errorf("could not get file info for %s", fileName), cdl.LayerErrorInternal) } fileModTime := finfo.ModTime().UnixMicro() sinceTimeAsInt, err := strconv.ParseInt(since, 10, 64) if err != nil { - return nil, layer.Err(fmt.Errorf("could not parse since time %s", since), layer.LayerErrorInternal) + return nil, cdl.Err(fmt.Errorf("could not parse since time %s", since), cdl.LayerErrorInternal) } if sinceTimeAsInt < fileModTime { @@ -365,27 +365,27 @@ func (f FileSystemDataset) Changes(since string, limit int, latestOnly bool) (la }) } - mapper := layer.NewMapper(f.logger, nil, f.datasetDefinition.OutgoingMappingConfig) + mapper := cdl.NewMapper(f.logger, nil, f.datasetDefinition.OutgoingMappingConfig) iterator := NewFileCollectionEntityIterator(f.datasetDefinition.SourceConfig, f.logger, dataFileInfos, mapper, "") return iterator, nil } -func (f FileSystemDataset) Entities(from string, limit int) (layer.EntityIterator, layer.LayerError) { - return nil, layer.Err(fmt.Errorf("operation not supported"), layer.LayerNotSupported) +func (f FileSystemDataset) Entities(from string, limit int) (cdl.EntityIterator, cdl.LayerError) { + return nil, cdl.Err(fmt.Errorf("operation not supported"), cdl.LayerNotSupported) } -func NewFileCollectionEntityIterator(sourceConfig map[string]any, logger layer.Logger, files []FileInfo, mapper *layer.Mapper, token string) *FileCollectionEntityIterator { +func NewFileCollectionEntityIterator(sourceConfig map[string]any, logger cdl.Logger, files []FileInfo, mapper *cdl.Mapper, token string) *FileCollectionEntityIterator { return &FileCollectionEntityIterator{sourceConfig: sourceConfig, mapper: mapper, token: token, files: files, filesIndex: 0, logger: logger} } type FileCollectionEntityIterator struct { - mapper *layer.Mapper + mapper *cdl.Mapper token string files []FileInfo filesIndex int currentItemReader encoder.ItemIterator sourceConfig map[string]any - logger layer.Logger + logger cdl.Logger } func (f *FileCollectionEntityIterator) Context() *egdm.Context { @@ -393,7 +393,7 @@ func (f *FileCollectionEntityIterator) Context() *egdm.Context { return ctx.AsContext() } -func (f *FileCollectionEntityIterator) Next() (*egdm.Entity, layer.LayerError) { +func (f *FileCollectionEntityIterator) Next() (*egdm.Entity, cdl.LayerError) { if f.currentItemReader == nil { if f.filesIndex < len(f.files) { // initialize the current file entity iterator @@ -406,7 +406,7 @@ func (f *FileCollectionEntityIterator) Next() (*egdm.Entity, layer.LayerError) { itemReader, err := f.NewItemReadCloser(file, f.sourceConfig) if err != nil { - return nil, layer.Err(fmt.Errorf("could not create item reader for file %s becuase %s", file, err.Error()), layer.LayerErrorInternal) + return nil, cdl.Err(fmt.Errorf("could not create item reader for file %s becuase %s", file, err.Error()), cdl.LayerErrorInternal) } f.currentItemReader = itemReader @@ -418,14 +418,14 @@ func (f *FileCollectionEntityIterator) Next() (*egdm.Entity, layer.LayerError) { // read the next entity from the current file item, err := f.currentItemReader.Read() if err != nil { - return nil, layer.Err(fmt.Errorf("could not read item from file because %s", err.Error()), layer.LayerErrorInternal) + return nil, cdl.Err(fmt.Errorf("could not read item from file because %s", err.Error()), cdl.LayerErrorInternal) } if item == nil { // close the current file and move to the next err := f.currentItemReader.Close() if err != nil { - return nil, layer.Err(fmt.Errorf("could not close item reader for file because %s", err.Error()), layer.LayerErrorInternal) + return nil, cdl.Err(fmt.Errorf("could not close item reader for file because %s", err.Error()), cdl.LayerErrorInternal) } f.filesIndex++ if f.filesIndex < len(f.files) { @@ -437,13 +437,13 @@ func (f *FileCollectionEntityIterator) Next() (*egdm.Entity, layer.LayerError) { itemReader, err := f.NewItemReadCloser(file, f.sourceConfig) if err != nil { - return nil, layer.Err(fmt.Errorf("could not create item reader for file %s becuase %s", file, err.Error()), layer.LayerErrorInternal) + return nil, cdl.Err(fmt.Errorf("could not create item reader for file %s becuase %s", file, err.Error()), cdl.LayerErrorInternal) } f.currentItemReader = itemReader item, err = f.currentItemReader.Read() if err != nil { - return nil, layer.Err(fmt.Errorf("could not read item from file because %s", err.Error()), layer.LayerErrorInternal) + return nil, cdl.Err(fmt.Errorf("could not read item from file because %s", err.Error()), cdl.LayerErrorInternal) } } } @@ -454,7 +454,7 @@ func (f *FileCollectionEntityIterator) Next() (*egdm.Entity, layer.LayerError) { entity := &egdm.Entity{Properties: make(map[string]any)} err := f.mapper.MapItemToEntity(item, entity) if err != nil { - return nil, layer.Err(fmt.Errorf("could not map item to entity because %s", err.Error()), layer.LayerErrorInternal) + return nil, cdl.Err(fmt.Errorf("could not map item to entity because %s", err.Error()), cdl.LayerErrorInternal) } return entity, nil } @@ -463,28 +463,28 @@ func (f *FileCollectionEntityIterator) Next() (*egdm.Entity, layer.LayerError) { func (f *FileCollectionEntityIterator) NewItemReadCloser(filePath string, sourceConfig map[string]any) (encoder.ItemIterator, error) { file, err := os.Open(filePath) if err != nil { - return nil, layer.Err(fmt.Errorf("could not open file %s", filePath), layer.LayerErrorInternal) + return nil, cdl.Err(fmt.Errorf("could not open file %s", filePath), cdl.LayerErrorInternal) } // get encoder for the file itemReader, err := encoder.NewItemIterator(sourceConfig, f.logger, file) if err != nil { - return nil, layer.Err(fmt.Errorf("could not create encoder specified in dataset source testconfig"), layer.LayerErrorBadParameter) + return nil, cdl.Err(fmt.Errorf("could not create encoder specified in dataset source testconfig"), cdl.LayerErrorBadParameter) } return itemReader, nil } -func (f *FileCollectionEntityIterator) Token() (*egdm.Continuation, layer.LayerError) { +func (f *FileCollectionEntityIterator) Token() (*egdm.Continuation, cdl.LayerError) { cont := egdm.NewContinuation() cont.Token = f.token return cont, nil } -func (f *FileCollectionEntityIterator) Close() layer.LayerError { +func (f *FileCollectionEntityIterator) Close() cdl.LayerError { err := f.currentItemReader.Close() if err != nil { - return layer.Err(fmt.Errorf("could not close item reader because %s", err.Error()), layer.LayerErrorInternal) + return cdl.Err(fmt.Errorf("could not close item reader because %s", err.Error()), cdl.LayerErrorInternal) } return nil }