Skip to content

Commit

Permalink
Merge branch 'master' into 1.2.X-incubating
Browse files Browse the repository at this point in the history
  • Loading branch information
leerho committed Jan 17, 2020
2 parents bcefa34 + 43888f9 commit 97552af
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -172,11 +172,6 @@ private enum Flags { IS_BIG_ENDIAN, IS_IN_SAMPLING_MODE, IS_EMPTY, HAS_ENTRIES,
count = mem.getInt(offset);
offset += Integer.BYTES;
}
// if (version == serialVersionWithSummaryFactoryUID) {
// final DeserializeResult<SummaryFactory<S>> factoryResult =
// SerializerDeserializer.deserializeFromMemory(mem, offset);
// offset += factoryResult.getSize();
// }
final int currentCapacity = 1 << lgCurrentCapacity_;
keys_ = new long[currentCapacity];
for (int i = 0; i < count; i++) {
Expand Down
66 changes: 56 additions & 10 deletions src/main/java/org/apache/datasketches/tuple/Union.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,13 @@

package org.apache.datasketches.tuple;

import static java.lang.Math.min;
import static org.apache.datasketches.Util.DEFAULT_NOMINAL_ENTRIES;

import java.lang.reflect.Array;

import org.apache.datasketches.QuickSelect;

/**
* Compute a union of two or more tuple sketches.
* A new instance represents an empty set.
Expand All @@ -29,10 +34,10 @@
* @param <S> Type of Summary
*/
public class Union<S extends Summary> {
private final int nomEntries_;
private final SummarySetOperations<S> summarySetOps_;
private QuickSelectSketch<S> sketch_;
private long theta_; // need to maintain outside of the sketch
private boolean isEmpty_;

/**
* Creates new instance with default nominal entries
Expand All @@ -49,42 +54,83 @@ public Union(final SummarySetOperations<S> summarySetOps) {
* @param summarySetOps instance of SummarySetOperations
*/
public Union(final int nomEntries, final SummarySetOperations<S> summarySetOps) {
nomEntries_ = nomEntries;
summarySetOps_ = summarySetOps;
sketch_ = new QuickSelectSketch<S>(nomEntries, null);
sketch_ = new QuickSelectSketch<>(nomEntries, null);
theta_ = sketch_.getThetaLong();
isEmpty_ = true;
}

/**
* Updates the internal set by adding entries from the given sketch
* @param sketchIn input sketch to add to the internal set
*/
public void update(final Sketch<S> sketchIn) {
if (sketchIn == null || sketchIn.isEmpty()) { return; }
if ((sketchIn == null) || sketchIn.isEmpty()) { return; }
isEmpty_ = false;
if (sketchIn.theta_ < theta_) { theta_ = sketchIn.theta_; }
final SketchIterator<S> it = sketchIn.iterator();
while (it.next()) {
sketch_.merge(it.getKey(), it.getSummary(), summarySetOps_);
}
if (sketch_.theta_ < theta_) {
theta_ = sketch_.theta_;
}
}

/**
* Gets the internal set as a CompactSketch
* @return result of the unions so far
*/
@SuppressWarnings("unchecked")
public CompactSketch<S> getResult() {
sketch_.trim();
if (theta_ < sketch_.theta_) {
sketch_.setThetaLong(theta_);
sketch_.rebuild();
if (isEmpty_) {
return sketch_.compact();
}
if ((theta_ >= sketch_.theta_) && (sketch_.getRetainedEntries() <= sketch_.getNominalEntries())) {
return sketch_.compact();
}
long theta = min(theta_, sketch_.theta_);

int num = 0;
{
final SketchIterator<S> it = sketch_.iterator();
while (it.next()) {
if (it.getKey() < theta) { num++; }
}
}
return sketch_.compact();
if (num == 0) {
return new CompactSketch<>(null, null, theta, isEmpty_);
}
if (num > sketch_.getNominalEntries()) {
final long[] keys = new long[num]; // temporary since the order will be destroyed by quick select
final SketchIterator<S> it = sketch_.iterator();
int i = 0;
while (it.next()) {
if (it.getKey() < theta) { keys[i++] = it.getKey(); }
}
theta = QuickSelect.select(keys, 0, num - 1, sketch_.getNominalEntries());
num = sketch_.getNominalEntries();
}
final long[] keys = new long[num];
final S[] summaries = (S[]) Array.newInstance(sketch_.summaries_.getClass().getComponentType(), num);
final SketchIterator<S> it = sketch_.iterator();
int i = 0;
while (it.next()) {
if (it.getKey() < theta) {
keys[i] = it.getKey();
summaries[i] = (S) it.getSummary().copy();
i++;
}
}
return new CompactSketch<>(keys, summaries, theta, isEmpty_);
}

/**
* Resets the internal set to the initial state, which represents an empty set
*/
public void reset() {
sketch_ = new QuickSelectSketch<S>(nomEntries_, null);
sketch_.reset();
theta_ = sketch_.getThetaLong();
isEmpty_ = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ public class UpdatableSketch<U, S extends UpdatableSummary<U>> extends QuickSele
* <a href="{@docRoot}/resources/dictionary.html#p">See Sampling Probability</a>
* @param summaryFactory An instance of a SummaryFactory.
*/
public UpdatableSketch(final int nomEntries, final int lgResizeFactor, final float samplingProbability,
final SummaryFactory<S> summaryFactory) {
public UpdatableSketch(final int nomEntries, final int lgResizeFactor,
final float samplingProbability, final SummaryFactory<S> summaryFactory) {
super(nomEntries, lgResizeFactor, samplingProbability, summaryFactory);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,26 @@ public class DoubleSketch extends UpdatableSketch<Double, DoubleSummary> {
* @param mode The DoubleSummary mode to be used
*/
public DoubleSketch(final int lgK, final DoubleSummary.Mode mode) {
super(1 << lgK, ResizeFactor.X8.ordinal(), 1.0F, new DoubleSummaryFactory(mode));
this(lgK, ResizeFactor.X8.ordinal(), 1.0F, mode);
}

/**
* Creates this sketch with the following parameters:
* @param lgK Log_base2 of <i>Nominal Entries</i>.
* @param lgResizeFactor log2(resizeFactor) - value from 0 to 3:
* <pre>
* 0 - no resizing (max size allocated),
* 1 - double internal hash table each time it reaches a threshold
* 2 - grow four times
* 3 - grow eight times (default)
* </pre>
* @param samplingProbability
* <a href="{@docRoot}/resources/dictionary.html#p">See Sampling Probability</a>
* @param mode The DoubleSummary mode to be used
*/
public DoubleSketch(final int lgK, final int lgResizeFactor, final float samplingProbability,
final DoubleSummary.Mode mode) {
super(1 << lgK, lgResizeFactor, samplingProbability, new DoubleSummaryFactory(mode));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,26 @@ public class IntegerSketch extends UpdatableSketch<Integer, IntegerSummary> {
* @param mode The IntegerSummary mode to be used
*/
public IntegerSketch(final int lgK, final IntegerSummary.Mode mode) {
super(1 << lgK, ResizeFactor.X8.ordinal(), 1.0F, new IntegerSummaryFactory(mode));
this(lgK, ResizeFactor.X8.ordinal(), 1.0F, mode);
}

/**
* Creates this sketch with the following parameters:
* @param lgK Log_base2 of <i>Nominal Entries</i>.
* @param lgResizeFactor log2(resizeFactor) - value from 0 to 3:
* <pre>
* 0 - no resizing (max size allocated),
* 1 - double internal hash table each time it reaches a threshold
* 2 - grow four times
* 3 - grow eight times (default)
* </pre>
* @param samplingProbability
* <a href="{@docRoot}/resources/dictionary.html#p">See Sampling Probability</a>
* @param mode The IntegerSummary mode to be used
*/
public IntegerSketch(final int lgK, final int lgResizeFactor, final float samplingProbability,
final IntegerSummary.Mode mode) {
super(1 << lgK, lgResizeFactor, samplingProbability, new IntegerSummaryFactory(mode));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,13 +230,6 @@ public void updatesOfAllKeyTypes() {
Assert.assertEquals(sketch.getEstimate(), 6.0);
}

// @Test
// public void updateDoubleSummary() {
// DoubleSummary ds = new DoubleSummary();
// ds.update(1.0);
// Assert.assertEquals(ds.getValue(), 1.0);
// }

@Test
public void doubleSummaryDefaultSumMode() {
UpdatableSketch<Double, DoubleSummary> sketch =
Expand Down Expand Up @@ -402,6 +395,22 @@ public void serializeDeserializeSampling() throws Exception {
Assert.assertEquals(sketch1.getTheta(), sketch2.getTheta());
}

@Test
public void unionEmptySampling() {
UpdatableSketch<Double, DoubleSummary> sketch =
new UpdatableSketchBuilder<>(new DoubleSummaryFactory(mode)).setSamplingProbability(0.01f).build();
sketch.update(1, 1.0);
Assert.assertEquals(sketch.getRetainedEntries(), 0); // not retained due to low sampling probability

Union<DoubleSummary> union = new Union<>(new DoubleSummarySetOperations(mode));
union.update(sketch);
CompactSketch<DoubleSummary> result = union.getResult();
Assert.assertEquals(result.getRetainedEntries(), 0);
Assert.assertFalse(result.isEmpty());
Assert.assertTrue(result.isEstimationMode());
Assert.assertEquals(result.getEstimate(), 0.0);
}

@Test
public void unionExactMode() {
UpdatableSketch<Double, DoubleSummary> sketch1 =
Expand Down

0 comments on commit 97552af

Please sign in to comment.