Skip to content

Commit

Permalink
improrved log, added 1 level for recovery bad chunks, released 0.6.3 …
Browse files Browse the repository at this point in the history
…version
  • Loading branch information
toni-moreno committed Jun 11, 2019
1 parent a966d8f commit 9d69de4
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 24 deletions.
9 changes: 8 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
# v 0.6.2 (2019-06-05)
# v 0.6.3 (2019-06-07)

## New features

* added max-points-on-single-write to limit write queries
* added 1 recovery level with bad chunks

# v 0.6.2 (2019-06-06)

## fixes

Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ func BpSplit(bp client.BatchPoints, splitnum int) []client.BatchPoints {
pointchunk := make([]*client.Point, splitnum)
init := i * splitnum
end := min((i+1)*splitnum, len)
log.Debugf("Splitting %s batchpoints into 50000 points chunks from %d to %d ", len, init, end)
log.Debugf("Splitting %d batchpoints into %d points chunks from %d to %d ", len, splitnum, init, end)
copy(points[init:end], pointchunk)
newbp.AddPoints(pointchunk)

Expand Down
21 changes: 14 additions & 7 deletions pkg/agent/hacluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,9 +220,13 @@ func (hac *HACluster) ReplicateData(schema []*InfluxSchDb, start time.Time, end
rn.Name = db.NewDefRp
}
//log.Debugf("%s RP %s... SCHEMA %#+v.", db.Name, rp.Name, db)
err := SyncDBRP(hac.Master, hac.Slave, db.Name, db.NewName, rp, &rn, start, end, db, hac.ChunkDuration, hac.MaxRetentionInterval)
if err != nil {
log.Errorf("Data Replication error in DB [%s] RP [%s] | Error: %s", db, rn.Name, err)
report := SyncDBRP(hac.Master, hac.Slave, db.Name, db.NewName, rp, &rn, start, end, db, hac.ChunkDuration, hac.MaxRetentionInterval)
if report == nil {
log.Errorf("Data Replication error in DB [%s] RP [%s] ", db, rn.Name)
}
if len(report.BadChunks) > 0 {
r, w, t := report.RWErrors()
log.Errorf("Data Replication error in DB [%s] RP [%s] | Registered %d Read %d Write | %d Total Errors", db, r, w, t)
}
}
}
Expand All @@ -238,10 +242,13 @@ func (hac *HACluster) ReplicateDataFull(schema []*InfluxSchDb) error {
if rn.Def {
rn.Name = db.NewDefRp
}
err := SyncDBRP(hac.Master, hac.Slave, db.Name, db.NewName, rp, &rn, start, end, db, hac.ChunkDuration, hac.MaxRetentionInterval)
//err := SyncDBFull(hac.Master, hac.Slave, db.Name, rp, db, hac.ChunkDuration, hac.MaxRetentionInterval)
if err != nil {
log.Errorf("Data Replication error in DB [%s] RP [%s] | Error: %s", db, rn.Name, err)
report := SyncDBRP(hac.Master, hac.Slave, db.Name, db.NewName, rp, &rn, start, end, db, hac.ChunkDuration, hac.MaxRetentionInterval)
if report == nil {
log.Errorf("Data Replication error in DB [%s] RP [%s] ", db, rn.Name)
}
if len(report.BadChunks) > 0 {
r, w, t := report.RWErrors()
log.Errorf("Data Replication error in DB [%s] RP [%s] | Registered %d Read %d Write | %d Total Errors", db, r, w, t)
}
}
}
Expand Down
68 changes: 53 additions & 15 deletions pkg/agent/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ type ChunkReport struct {
TimeTaken time.Duration
}

func (cr *ChunkReport) Log() {
percent := 100 * (cr.Num + 1) / cr.Total
log.Infof("Processed Chunk [%d/%d](%d%%) from [%d][%s] to [%d][%s] (%d) Points Took [%s] ERRORS[R:%d|W:%d]",
func (cr *ChunkReport) format() string {
percent := 100 * cr.Num / cr.Total
return fmt.Sprintf("[%d/%d](%d%%) from [%d][%s] to [%d][%s] (%d) Points Took [%s] ERRORS[R:%d|W:%d]",
cr.Num,
cr.Total,
percent,
Expand All @@ -37,6 +37,21 @@ func (cr *ChunkReport) Log() {
cr.WriteErrors)
}

func (cr *ChunkReport) Log(prefix string) {

log.Infof("%s %s", prefix, cr.format())
}

func (cr *ChunkReport) Warn(prefix string) {

log.Warnf("%s %s", prefix, cr.format())
}

func (cr *ChunkReport) Error(prefix string) {

log.Warnf("%s %s", prefix, cr.format())
}

type SyncReport struct {
SrcSrv string
DstSrv string
Expand All @@ -52,9 +67,10 @@ type SyncReport struct {
BadChunks []*ChunkReport
}

func (sr *SyncReport) Log() {
func (sr *SyncReport) Log(prefix string) {

log.Printf("Processed DB data from %s[%s|%s] to %s[%s|%s] has done #Points (%d) Took [%s] ERRORS [%d]!\n",
log.Printf("%s data from %s[%s|%s] to %s[%s|%s] has done #Points (%d) Took [%s] ERRORS [%d]!\n",
prefix,
sr.SrcSrv,
sr.SrcDB,
sr.SrcRP,
Expand All @@ -66,12 +82,22 @@ func (sr *SyncReport) Log() {
len(sr.BadChunks))
}

func Sync(src *InfluxMonitor, dst *InfluxMonitor, sdb string, ddb string, srp *RetPol, drp *RetPol, sEpoch time.Time, eEpoch time.Time, dbschema *InfluxSchDb, chunk time.Duration, maxret time.Duration) (*SyncReport, error) {
func (sr *SyncReport) RWErrors() (uint64, uint64, uint64) {
var readErrors, writeErrors uint64
for _, b := range sr.BadChunks {
readErrors += b.ReadErrors
writeErrors += b.WriteErrors

}
return readErrors, writeErrors, readErrors + writeErrors
}

func Sync(src *InfluxMonitor, dst *InfluxMonitor, sdb string, ddb string, srp *RetPol, drp *RetPol, sEpoch time.Time, eEpoch time.Time, dbschema *InfluxSchDb, chunk time.Duration, maxret time.Duration) *SyncReport {

if dbschema == nil {
err := fmt.Errorf("DBSChema for DB %s is null", sdb)
log.Errorf("%s", err.Error())
return nil, err
return nil
}

Report := &SyncReport{
Expand Down Expand Up @@ -169,26 +195,38 @@ func Sync(src *InfluxMonitor, dst *InfluxMonitor, sdb string, ddb string, srp *R
TimeTaken: chunkElapsed,
}

chrep.Log()
chrep.Log("Processed Chunk")
chuckReport = append(chuckReport, chrep)
if readErrors+writeErrors > 0 {
badChunkReport = append(badChunkReport, chrep)
}
//log.Infof("Processed Chunk [%d/%d](%d%%) from [%d][%s] to [%d][%s] (%d) Points Took [%s] ERRORS[R:%d|W:%d]",i+1, hLength, 100*(i+1)/hLength, startsec, time.Unix(startsec, 0).String(), endsec, time.Unix(endsec, 0).String(), totalpoints, chunkElapsed.String(), readErrors, writeErrors)

}

Report.TotalElapsed = time.Since(dbs)
Report.TotalPoints = dbpoints
Report.ChunkReport = chuckReport
Report.BadChunks = badChunkReport
Report.Log()
//log.Printf("Processed DB data from %s[%s|%s] to %s[%s|%s] has done #Points (%d) Took [%s] !\n", src.cfg.Name, sdb, srp.Name, dst.cfg.Name, ddb, drp.Name, dbpoints, dbElapsed.String())
Report.Log("Processed DB")

return Report, nil
return Report
}

func SyncDBRP(src *InfluxMonitor, dst *InfluxMonitor, sdb string, ddb string, srp *RetPol, drp *RetPol, sEpoch time.Time, eEpoch time.Time, dbschema *InfluxSchDb, chunk time.Duration, maxret time.Duration) error {
func SyncDBRP(src *InfluxMonitor, dst *InfluxMonitor, sdb string, ddb string, srp *RetPol, drp *RetPol, sEpoch time.Time, eEpoch time.Time, dbschema *InfluxSchDb, chunk time.Duration, maxret time.Duration) *SyncReport {

report := Sync(src, dst, sdb, ddb, srp, drp, sEpoch, eEpoch, dbschema, chunk, maxret)
if len(report.BadChunks) > 0 {
log.Warnf("Initializing Recovery for %d chunks", len(report.BadChunks))
newBadChunks := make([]*ChunkReport, 0)
for _, bc := range report.BadChunks {
bc.Warn("Recovery for Bad Chunk")
start := time.Unix(bc.TimeStart, 0)
end := time.Unix(bc.TimeEnd, 0)

_, err := Sync(src, dst, sdb, ddb, srp, drp, sEpoch, eEpoch, dbschema, chunk, maxret)
return err
recoveryrep := Sync(src, dst, sdb, ddb, srp, drp, start, end, dbschema, chunk/10, maxret)
newBadChunks = append(newBadChunks, recoveryrep.BadChunks...)
}
report.BadChunks = newBadChunks
}
return report
}

0 comments on commit 9d69de4

Please sign in to comment.