Skip to content

Commit

Permalink
use field updater for current field
Browse files Browse the repository at this point in the history
The common operations are now available directly on the
StepLong/StepDouble.
  • Loading branch information
brharrington committed Oct 11, 2024
1 parent 645a878 commit e9f7062
Show file tree
Hide file tree
Showing 11 changed files with 594 additions and 101 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2019 Netflix, Inc.
* Copyright 2014-2024 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,8 +28,8 @@ public class AtomicDouble extends Number {

private volatile long value;

private static final AtomicLongFieldUpdater<AtomicDouble> VALUE_UPDATER = AtomicLongFieldUpdater.newUpdater(
AtomicDouble.class, "value");
private static final AtomicLongFieldUpdater<AtomicDouble> VALUE_UPDATER =
AtomicLongFieldUpdater.newUpdater(AtomicDouble.class, "value");

/** Create an instance with an initial value of 0. */
public AtomicDouble() {
Expand Down Expand Up @@ -98,6 +98,20 @@ public void set(double amount) {
value = Double.doubleToLongBits(amount);
}

private static boolean isLessThan(double v1, double v2) {
return v1 < v2 || Double.isNaN(v2);
}

/** Set the current value to the maximum of the current value or the provided value. */
public void min(double v) {
if (Double.isFinite(v)) {
double min = get();
while (isLessThan(v, min) && !compareAndSet(min, v)) {
min = get();
}
}
}

private static boolean isGreaterThan(double v1, double v2) {
return v1 > v2 || Double.isNaN(v2);
}
Expand Down Expand Up @@ -128,8 +142,7 @@ public void max(double v) {
return get();
}

@Override
public String toString() {
@Override public String toString() {
return Double.toString(get());
}
}
115 changes: 103 additions & 12 deletions spectator-api/src/main/java/com/netflix/spectator/impl/StepDouble.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2019 Netflix, Inc.
* Copyright 2014-2024 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -35,44 +35,135 @@ public class StepDouble implements StepValue {
private final long step;

private volatile double previous;
private final AtomicDouble current;
private volatile long current;

private static final AtomicLongFieldUpdater<StepDouble> CURRENT_UPDATER =
AtomicLongFieldUpdater.newUpdater(StepDouble.class, "current");

private volatile long lastInitPos;

private static final AtomicLongFieldUpdater<StepDouble> LAST_INIT_POS_UPDATER = AtomicLongFieldUpdater.newUpdater(
StepDouble.class, "lastInitPos");
private static final AtomicLongFieldUpdater<StepDouble> LAST_INIT_POS_UPDATER =
AtomicLongFieldUpdater.newUpdater(StepDouble.class, "lastInitPos");

/** Create a new instance. */
public StepDouble(double init, Clock clock, long step) {
this.init = init;
this.clock = clock;
this.step = step;
previous = init;
current = new AtomicDouble(init);
current = Double.doubleToLongBits(init);
lastInitPos = clock.wallTime() / step;
}

private void rollCount(long now) {
final long stepTime = now / step;
final long lastInit = lastInitPos;
if (lastInit < stepTime && LAST_INIT_POS_UPDATER.compareAndSet(this, lastInit, stepTime)) {
final double v = current.getAndSet(init);
final double v = Double.longBitsToDouble(
CURRENT_UPDATER.getAndSet(this, Double.doubleToLongBits(init)));
// Need to check if there was any activity during the previous step interval. If there was
// then the init position will move forward by 1, otherwise it will be older. No activity
// means the previous interval should be set to the `init` value.
previous = (lastInit == stepTime - 1) ? v : init;
}
}

/** Get the AtomicDouble for the current bucket. */
public AtomicDouble getCurrent() {
/** Get the value for the current bucket. */
public double getCurrent() {
return getCurrent(clock.wallTime());
}

/** Get the AtomicDouble for the current bucket. */
public AtomicDouble getCurrent(long now) {
/** Get the value for the current bucket. */
public double getCurrent(long now) {
rollCount(now);
return Double.longBitsToDouble(current);
}

/** Set the value for the current bucket. */
public void setCurrent(long now, double value) {
rollCount(now);
current = Double.doubleToLongBits(value);
}

/** Increment the current value and return the result. */
public double addAndGet(long now, double amount) {
rollCount(now);
long v;
double d;
double n;
long next;
do {
v = current;
d = Double.longBitsToDouble(v);
n = d + amount;
next = Double.doubleToLongBits(n);
} while (!CURRENT_UPDATER.compareAndSet(this, v, next));
return n;
}

/** Increment the current value and return the value before incrementing. */
public double getAndAdd(long now, double amount) {
rollCount(now);
long v;
double d;
double n;
long next;
do {
v = current;
d = Double.longBitsToDouble(v);
n = d + amount;
next = Double.doubleToLongBits(n);
} while (!CURRENT_UPDATER.compareAndSet(this, v, next));
return d;
}

/** Set the current value and return the previous value. */
public double getAndSet(long now, double value) {
rollCount(now);
long v = CURRENT_UPDATER.getAndSet(this, Double.doubleToLongBits(value));
return Double.longBitsToDouble(v);
}

private boolean compareAndSet(double expect, double update) {
long e = Double.doubleToLongBits(expect);
long u = Double.doubleToLongBits(update);
return CURRENT_UPDATER.compareAndSet(this, e, u);
}

/** Set the current value and return the previous value. */
public boolean compareAndSet(long now, double expect, double update) {
rollCount(now);
return current;
return compareAndSet(expect, update);
}

private static boolean isLessThan(double v1, double v2) {
return v1 < v2 || Double.isNaN(v2);
}

/** Set the current value to the minimum of the current value or the provided value. */
public void min(long now, double value) {
if (Double.isFinite(value)) {
rollCount(now);
double min = Double.longBitsToDouble(current);
while (isLessThan(value, min) && !compareAndSet(min, value)) {
min = Double.longBitsToDouble(current);
}
}
}

private static boolean isGreaterThan(double v1, double v2) {
return v1 > v2 || Double.isNaN(v2);
}

/** Set the current value to the maximum of the current value or the provided value. */
public void max(long now, double value) {
if (Double.isFinite(value)) {
rollCount(now);
double max = Double.longBitsToDouble(current);
while (isGreaterThan(value, max) && !compareAndSet(max, value)) {
max = Double.longBitsToDouble(current);
}
}
}

/** Get the value for the last completed interval. */
Expand Down Expand Up @@ -106,7 +197,7 @@ public double poll(long now) {
@Override public String toString() {
return "StepDouble{init=" + init
+ ", previous=" + previous
+ ", current=" + current.get()
+ ", current=" + Double.longBitsToDouble(current)
+ ", lastInitPos=" + lastInitPos + '}';
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2019 Netflix, Inc.
* Copyright 2014-2024 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,7 +17,6 @@

import com.netflix.spectator.api.Clock;

import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;

/**
Expand All @@ -36,46 +35,109 @@ public class StepLong implements StepValue {
private final long step;

private volatile long previous;
private final AtomicLong current;
private volatile long current;

private static final AtomicLongFieldUpdater<StepLong> CURRENT_UPDATER =
AtomicLongFieldUpdater.newUpdater(StepLong.class, "current");

private volatile long lastInitPos;

private static final AtomicLongFieldUpdater<StepLong> LAST_INIT_POS_UPDATER = AtomicLongFieldUpdater.newUpdater(
StepLong.class, "lastInitPos");
private static final AtomicLongFieldUpdater<StepLong> LAST_INIT_POS_UPDATER =
AtomicLongFieldUpdater.newUpdater(StepLong.class, "lastInitPos");

/** Create a new instance. */
public StepLong(long init, Clock clock, long step) {
this.init = init;
this.clock = clock;
this.step = step;
previous = init;
current = new AtomicLong(init);
current = init;
lastInitPos = clock.wallTime() / step;
}

private void rollCount(long now) {
final long stepTime = now / step;
final long lastInit = lastInitPos;
if (lastInit < stepTime && LAST_INIT_POS_UPDATER.compareAndSet(this, lastInit, stepTime)) {
final long v = current.getAndSet(init);
final long v = CURRENT_UPDATER.getAndSet(this, init);
// Need to check if there was any activity during the previous step interval. If there was
// then the init position will move forward by 1, otherwise it will be older. No activity
// means the previous interval should be set to the `init` value.
previous = (lastInit == stepTime - 1) ? v : init;
}
}

/** Get the AtomicLong for the current bucket. */
public AtomicLong getCurrent() {
/** Get the value for the current bucket. */
public long getCurrent() {
return getCurrent(clock.wallTime());
}

/** Get the AtomicLong for the current bucket. */
public AtomicLong getCurrent(long now) {
/** Get the value for the current bucket. */
public long getCurrent(long now) {
rollCount(now);
return current;
}

/** Set the value for the current bucket. */
public void setCurrent(long now, long value) {
rollCount(now);
current = value;
}

/** Increment the current value and return the result. */
public long incrementAndGet(long now) {
rollCount(now);
return CURRENT_UPDATER.incrementAndGet(this);
}

/** Increment the current value and return the value before incrementing. */
public long getAndIncrement(long now) {
rollCount(now);
return CURRENT_UPDATER.getAndIncrement(this);
}

/** Increment the current value and return the result. */
public long addAndGet(long now, long value) {
rollCount(now);
return CURRENT_UPDATER.addAndGet(this, value);
}

/** Increment the current value and return the value before incrementing. */
public long getAndAdd(long now, long value) {
rollCount(now);
return CURRENT_UPDATER.getAndAdd(this, value);
}

/** Set the current value and return the previous value. */
public long getAndSet(long now, long value) {
rollCount(now);
return CURRENT_UPDATER.getAndSet(this, value);
}

/** Set the current value and return the previous value. */
public boolean compareAndSet(long now, long expect, long update) {
rollCount(now);
return CURRENT_UPDATER.compareAndSet(this, expect, update);
}

/** Set the current value to the minimum of the current value or the provided value. */
public void min(long now, long value) {
rollCount(now);
long min = current;
while (value < min && !CURRENT_UPDATER.compareAndSet(this, min, value)) {
min = current;
}
}

/** Set the current value to the maximum of the current value or the provided value. */
public void max(long now, long value) {
rollCount(now);
long max = current;
while (value > max && !CURRENT_UPDATER.compareAndSet(this, max, value)) {
max = current;
}
}

/** Get the value for the last completed interval. */
public long poll() {
return poll(clock.wallTime());
Expand Down Expand Up @@ -107,7 +169,7 @@ public long poll(long now) {
@Override public String toString() {
return "StepLong{init=" + init
+ ", previous=" + previous
+ ", current=" + current.get()
+ ", current=" + current
+ ", lastInitPos=" + lastInitPos + '}';
}
}
Loading

0 comments on commit e9f7062

Please sign in to comment.