Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove skip storing MinSyncedTicket when the ticket is initial #655

Merged
merged 2 commits into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 173 additions & 0 deletions pkg/units/size.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
/*
* Copyright 2023 The Yorkie Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* copy from docker/go-units:
* https://github.com/docker/go-units/blob/master/size.go
*/

package units

import (
"fmt"
"strconv"
"strings"
)

// See: http://en.wikipedia.org/wiki/Binary_prefix
const (
// Decimal

KB = 1000
MB = 1000 * KB
GB = 1000 * MB
TB = 1000 * GB
PB = 1000 * TB

// Binary

KiB = 1024
MiB = 1024 * KiB
GiB = 1024 * MiB
TiB = 1024 * GiB
PiB = 1024 * TiB
)

type unitMap map[byte]int64

var (
decimalMap = unitMap{'k': KB, 'm': MB, 'g': GB, 't': TB, 'p': PB}
binaryMap = unitMap{'k': KiB, 'm': MiB, 'g': GiB, 't': TiB, 'p': PiB}
)

var (
decimapAbbrs = []string{"B", "kB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB"}
binaryAbbrs = []string{"B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB"}
)

func getSizeAndUnit(size float64, base float64, _map []string) (float64, string) {
i := 0
unitsLimit := len(_map) - 1
for size >= base && i < unitsLimit {
size = size / base
i++
}
return size, _map[i]
}

// CustomSize returns a human-readable approximation of a size
// using custom format.
func CustomSize(format string, size float64, base float64, _map []string) string {
size, unit := getSizeAndUnit(size, base, _map)
return fmt.Sprintf(format, size, unit)
}

// HumanSizeWithPrecision allows the size to be in any precision,
// instead of 4 digit precision used in units.HumanSize.
func HumanSizeWithPrecision(size float64, precision int) string {
size, unit := getSizeAndUnit(size, 1000.0, decimapAbbrs)
return fmt.Sprintf("%.*g%s", precision, size, unit)
}

// HumanSize returns a human-readable approximation of a size
// capped at 4 valid numbers (eg. "2.746 MB", "796 KB").
func HumanSize(size float64) string {
return HumanSizeWithPrecision(size, 4)
}

// BytesSize returns a human-readable size in bytes, kibibytes,
// mebibytes, gibibytes, or tebibytes (eg. "44kiB", "17MiB").
func BytesSize(size float64) string {
return CustomSize("%.4g%s", size, 1024.0, binaryAbbrs)
}

// FromHumanSize returns an integer from a human-readable specification of a
// size using SI standard (eg. "44kB", "17MB").
func FromHumanSize(size string) (int64, error) {
return parseSize(size, decimalMap)
}

// RAMInBytes parses a human-readable string representing an amount of RAM
// in bytes, kibibytes, mebibytes, gibibytes, or tebibytes and
// returns the number of bytes, or -1 if the string is unparseable.
// Units are case-insensitive, and the 'b' suffix is optional.
func RAMInBytes(size string) (int64, error) {
return parseSize(size, binaryMap)
}

// Parses the human-readable size string into the amount it represents.
func parseSize(sizeStr string, uMap unitMap) (int64, error) {
// TODO: rewrite to use strings.Cut if there's a space
// once Go < 1.18 is deprecated.
sep := strings.LastIndexAny(sizeStr, "01234567890. ")
if sep == -1 {
// There should be at least a digit.
return -1, fmt.Errorf("invalid size: '%s'", sizeStr)
}
var num, sfx string
if sizeStr[sep] != ' ' {
num = sizeStr[:sep+1]
sfx = sizeStr[sep+1:]
} else {
// Omit the space separator.
num = sizeStr[:sep]
sfx = sizeStr[sep+1:]
}

size, err := strconv.ParseFloat(num, 64)
if err != nil {
return -1, fmt.Errorf("parse size: %w", err)
}
// Backward compatibility: reject negative sizes.
if size < 0 {
return -1, fmt.Errorf("invalid size: '%s'", sizeStr)
}

if len(sfx) == 0 {
return int64(size), nil
}

// Process the suffix.

if len(sfx) > 3 { // Too long.
goto badSuffix
}
sfx = strings.ToLower(sfx)
// Trivial case: b suffix.
if sfx[0] == 'b' {
if len(sfx) > 1 { // no extra characters allowed after b.
goto badSuffix
}
return int64(size), nil
}
// A suffix from the map.
if mul, ok := uMap[sfx[0]]; ok {
size *= float64(mul)
} else {
goto badSuffix
}

// The suffix may have extra "b" or "ib" (e.g. KiB or MB).
switch {
case len(sfx) == 2 && sfx[1] != 'b':
goto badSuffix
case len(sfx) == 3 && sfx[1:] != "ib":
goto badSuffix
}

return int64(size), nil

badSuffix:
return -1, fmt.Errorf("invalid suffix: '%s'", sfx)
}
10 changes: 0 additions & 10 deletions server/backend/database/memory/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -1161,16 +1161,6 @@ func (d *DB) UpdateSyncedSeq(
return err
}

// NOTE: skip storing the initial ticket to prevent GC interruption.
// Documents in this state do not need to be saved because they do not
// have any tombstones to be referenced by other documents.
//
// (The initial ticket is used as the creation time of the root
// element that operations can not remove.)
if ticket.Compare(time.InitialTicket) == 0 {
return nil
}

raw, err := txn.First(
tblSyncedSeqs,
"doc_id_client_id",
Expand Down
10 changes: 0 additions & 10 deletions server/backend/database/mongo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1349,16 +1349,6 @@ func (c *Client) UpdateSyncedSeq(
return err
}

// NOTE: skip storing the initial ticket to prevent GC interruption.
// Documents in this state do not need to be saved because they do not
// have any tombstones to be referenced by other documents.
//
// (The initial ticket is used as the creation time of the root
// element that operations can not remove.)
if ticket.Compare(time.InitialTicket) == 0 {
return nil
}

if _, err = c.collection(colSyncedSeqs).UpdateOne(ctx, bson.M{
"doc_id": encodedDocID,
"client_id": encodedClientID,
Expand Down
15 changes: 15 additions & 0 deletions server/packs/packs.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package packs
import (
"context"
"fmt"
"strconv"
gotime "time"

"go.uber.org/zap"
Expand All @@ -29,6 +30,7 @@ import (
"github.com/yorkie-team/yorkie/pkg/document"
"github.com/yorkie-team/yorkie/pkg/document/change"
"github.com/yorkie-team/yorkie/pkg/document/key"
"github.com/yorkie-team/yorkie/pkg/units"
"github.com/yorkie-team/yorkie/server/backend"
"github.com/yorkie-team/yorkie/server/backend/database"
"github.com/yorkie-team/yorkie/server/backend/sync"
Expand Down Expand Up @@ -119,6 +121,19 @@ func PushPull(
respPack.MinSyncedTicket = minSyncedTicket
respPack.ApplyDocInfo(docInfo)

pullLog := strconv.Itoa(respPack.ChangesLen())
if respPack.SnapshotLen() > 0 {
pullLog = units.HumanSize(float64(respPack.SnapshotLen()))
}
logging.From(ctx).Infof(
"SYNC: '%s' is synced by '%s', push: %d, pull: %s, min: %s",
docInfo.Key,
clientInfo.Key,
len(pushedChanges),
pullLog,
minSyncedTicket.StructureAsString(),
)

// 05. publish document change event then store snapshot asynchronously.
if len(pushedChanges) > 0 || reqPack.IsRemoved {
be.Background.AttachGoroutine(func(ctx context.Context) {
Expand Down
6 changes: 3 additions & 3 deletions server/packs/pushpull.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func pushChanges(
}

if len(reqPack.Changes) > 0 {
logging.From(ctx).Infof(
logging.From(ctx).Debugf(
"PUSH: '%s' pushes %d changes into '%s', rejected %d changes, serverSeq: %d -> %d, cp: %s",
clientInfo.Key,
len(pushedChanges),
Expand Down Expand Up @@ -162,7 +162,7 @@ func pullSnapshot(
return nil, err
}

logging.From(ctx).Infof(
logging.From(ctx).Debugf(
"PULL: '%s' build snapshot with changes(%d~%d) from '%s', cp: %s",
clientInfo.Key,
reqPack.Checkpoint.ServerSeq+1,
Expand Down Expand Up @@ -211,7 +211,7 @@ func pullChangeInfos(
cpAfterPull := cpAfterPush.NextServerSeq(docInfo.ServerSeq)

if len(pulledChanges) > 0 {
logging.From(ctx).Infof(
logging.From(ctx).Debugf(
"PULL: '%s' pulls %d changes(%d~%d) from '%s', cp: %s, filtered changes: %d",
clientInfo.Key,
len(pulledChanges),
Expand Down
7 changes: 6 additions & 1 deletion server/rpc/interceptors/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ func NewDefaultInterceptor() *DefaultInterceptor {
return &DefaultInterceptor{}
}

const (
// SlowThreshold is the threshold for slow RPC.
SlowThreshold = 100 * gotime.Millisecond
)

// Unary creates a unary server interceptor for default.
func (i *DefaultInterceptor) Unary() grpc.UnaryServerInterceptor {
return func(
Expand All @@ -50,7 +55,7 @@ func (i *DefaultInterceptor) Unary() grpc.UnaryServerInterceptor {
return nil, grpchelper.ToStatusError(err)
}

if gotime.Since(start) > 100*gotime.Millisecond {
if gotime.Since(start) > SlowThreshold {
reqLogger.Infof("RPC : %q %s", info.FullMethod, gotime.Since(start))
}
return resp, nil
Expand Down
Loading