mongoStream is a mongodb streaming service based on mongodb change streams feature.
The service allows to launch asynchronous streams in parallel.
-
System requirements:
- mongodb 4.2 or higher
- mongodb replica set available
-
Dependencies :
- mongodb driver
go get go.mongodb.org/mongo-driver/mongo
- mongodb driver
go get github.com/PierreKieffer/mongoStream
To get started, import mongoStream
package
import (
"github.com/PierreKieffer/mongoStream/services"
)
Set up the mongo connection string
mongoUri := "mongodb://localhost:27017"
Initialize messages buffer :
The buffer is a channel and allows to accumulate messages generated by the streaming service. The initialization takes the size of the buffer as a parameter (ie the number of messages that the buffer can contain).
var buffer = services.InitBuffer(10)
Start one or more workers in parallel :
Parameters :
- mongodb connection string
- database name
- collection name
- buffer
- stream options [optional]
go services.ListenerWorker(mongoUri, "database", "collection1", buffer)
go services.ListenerWorker(mongoUri, "database", "collection2", buffer)
Structure of messages generated by workers and received by the buffer :
{
"DocumentKey": {
"DocumentId": "string type"
},
"Namespace": {
"Database": "string type",
"Collection": "string type"
},
"OperationType": "string type",
"UpdateDescription": {
"updatedFields": "map[string]interface{} type"
}
}
To resume a stream from a timestamp :
cso := services.SetOptions(true, 1586360547)
go services.ListenerWorker(mongoUri, "database", "collection1", buffer,cso)
package main
import (
"github.com/PierreKieffer/mongoStream/dataModel"
"github.com/PierreKieffer/mongoStream/services"
"log"
)
var exit = make(chan bool)
func main() {
mongoUri := "mongodb://localhost:27017"
// Init oplog buffer channel
var buffer = services.InitBuffer(10)
// Start consumer
go Consumer(buffer)
// Start producers
go services.ListenerWorker(mongoUri, "database", "collection1", buffer)
go services.ListenerWorker(mongoUri, "database", "collection2", buffer)
<-exit
}
func Consumer(logBuffer chan dataModel.Oplog) {
for {
log.Println("reveived data : ", <-logBuffer)
}
}