Skip to content

Commit

Permalink
Simplified RecordGenerator
Browse files Browse the repository at this point in the history
  • Loading branch information
blootsvoets committed Sep 27, 2023
1 parent e5e7abf commit 5a4dfb6
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 133 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,37 +34,29 @@ import kotlin.random.Random

/**
* Generates records according to the specification in a [MockDataConfig].
* Given key class must match the one specified in the config.
*
* @param <K> type of key to generate
</K> */
open class RecordGenerator<K : SpecificRecord>(val config: MockDataConfig, keyClass: Class<K>) {
val topic: AvroTopic<K, SpecificRecord>
* @param K type of key to generate
* @param config configuration to use
*/
open class RecordGenerator<K : SpecificRecord>(
val config: MockDataConfig,
keyClass: Class<K>,
) {
val topic: AvroTopic<K, SpecificRecord> = config.parseAvroTopic()
private val timeField: Field
private val timeReceivedField: Field?
private val valueFields: MutableList<Field>
private val unknownFields: MutableList<Field>
private val header: MutableList<String>
private val valueFields: List<Field>
private val unknownFields: List<Field>
private val header: List<String>

/**
* Generates records according to config. Given key class must match the one specified in the
* config.
* @param config configuration to use
*/
init {

// doing type checking below.
topic = config.parseAvroTopic()
require(topic.keyClass == keyClass) {
(
"RecordGenerator only generates ObservationKey keys, not " +
topic.keyClass + " in topic " + topic
)
"RecordGenerator only generates ObservationKey keys, not ${topic.keyClass} in topic $topic"
}
require(SpecificRecord::class.java.isAssignableFrom(topic.valueClass)) {
(
"RecordGenerator only generates SpecificRecord values, not " +
topic.valueClass + " in topic " + topic
)
"RecordGenerator only generates SpecificRecord values, not ${topic.valueClass} in topic $topic"
}
header = ArrayList()
header.addAll(mutableListOf("projectId", "userId", "sourceId"))
Expand All @@ -73,32 +65,28 @@ open class RecordGenerator<K : SpecificRecord>(val config: MockDataConfig, keyCl
val valueSchema = topic.valueSchema
timeField = forceGetField(valueSchema, "time")
timeReceivedField = valueSchema.getField("timeReceived")
var valueFieldNames = config.valueFields
if (valueFieldNames == null) {
valueFieldNames = emptyList()
}
valueFields = ArrayList(valueFieldNames.size)
for (fieldName in valueFieldNames) {
val field = forceGetField(valueSchema, fieldName)
valueFields.add(field)
val type = field.schema().type
require(ACCEPTABLE_VALUE_TYPES.contains(type)) {
(
"Cannot generate data for type " + type +
" in field " + fieldName + " in topic " + topic
)

val valueFieldNames = config.valueFields ?: emptyList()
valueFields = valueFieldNames
.map { fieldName -> forceGetField(valueSchema, fieldName) }
.onEach { field ->
val type = field.schema().type
require(ACCEPTABLE_VALUE_TYPES.contains(type)) {
"Cannot generate data for type $type in field ${field.name()} in topic $topic"
}
}
}

unknownFields = ArrayList(valueSchema.fields.size - valueFields.size - 2)
val existingNames = buildSet(valueFieldNames.size + 2) {
add("time")
add("timeReceived")
addAll(valueFieldNames)
}
for (field in valueSchema.fields) {
header.add(field.name())
if (field.name() == "time" || field.name() == "timeReceived" || valueFieldNames.contains(
field.name(),
)
) {
continue
if (field.name() !in existingNames) {
unknownFields.add(field)
}
unknownFields.add(field)
}
}

Expand Down Expand Up @@ -130,12 +118,19 @@ open class RecordGenerator<K : SpecificRecord>(val config: MockDataConfig, keyCl
* @param key key to generate data with
* @return list containing simulated values
*/
open fun iteratableRawValues(key: K, duration: Long): Iterable<Array<String>> {
return Iterable {
val baseIterator = iterateValues(key, duration)
RecordArrayIterator(baseIterator)
open fun iteratableRawValues(key: K, duration: Long): Iterable<Array<String>> = iterateValues(key, duration).asSequence()
.map { record ->
val keyFieldsSize = record.key.schema.fields.size
val valueFieldsSize = record.value.schema.fields.size
Array(keyFieldsSize + valueFieldsSize) { idx ->
if (idx < keyFieldsSize) {
record.key[idx]
} else {
record.value[idx - keyFieldsSize]
}.toString()
}
}
}
.asIterable()

/**
* Simulates data of a sensor with the given frequency for a time interval specified by
Expand All @@ -144,8 +139,33 @@ open class RecordGenerator<K : SpecificRecord>(val config: MockDataConfig, keyCl
* @param key key to generate data with
* @return list containing simulated values
*/
fun iterateValues(key: K, duration: Long): Iterator<Record<K, SpecificRecord>> {
return RecordIterator(duration, key)
fun iterateValues(key: K, duration: Long): Iterator<Record<K, SpecificRecord>> = Metronome(
duration * config.frequency / 1000L,
config.frequency,
)
.asSequence()
.map { time -> Record(key, createValue(time)) }
.iterator()

private fun createValue(time: Long) = topic.newValueInstance().apply {
put(timeField.pos(), time / 1000.0)
if (timeReceivedField != null) {
put(timeReceivedField.pos(), getTimeReceived(time) / 1000.0)
}
for (f in valueFields) {
val fieldValue: Any = when (val type = f.schema().type) {
DOUBLE -> randomDouble
FLOAT -> randomDouble.toFloat()
LONG -> randomDouble.toLong()
INT -> randomDouble.toInt()
ENUM -> getRandomEnum(f.schema())
else -> throw IllegalStateException("Cannot parse type $type")
}
put(f.pos(), fieldValue)
}
for (f in unknownFields) {
put(f.pos(), f.defaultVal())
}
}

private val randomDouble: Double
Expand All @@ -161,79 +181,7 @@ open class RecordGenerator<K : SpecificRecord>(val config: MockDataConfig, keyCl
* @return random `Double` representing the Round Trip Time for the given timestamp
* using `ThreadLocalRandom`
*/
private fun getTimeReceived(time: Long): Long {
return time + Random.nextLong(1, 10)
}

private class RecordArrayIterator<K : SpecificRecord>(
private val baseIterator: Iterator<Record<K, SpecificRecord>>,
) : MutableIterator<Array<String>> {
override fun hasNext(): Boolean = baseIterator.hasNext()

override fun next(): Array<String> {
val record = baseIterator.next()
val keyFieldsSize = record.key.schema.fields.size
val valueFieldsSize = record.value.schema.fields.size
return Array(keyFieldsSize + valueFieldsSize) { idx ->
if (idx < keyFieldsSize) {
record.key[idx]
} else {
record.value[idx - keyFieldsSize]
}.toString()
}
}

override fun remove() {
throw UnsupportedOperationException("remove")
}
}

private inner class RecordIterator(duration: Long, private val key: K) :
MutableIterator<Record<K, SpecificRecord>> {
private val timestamps: Metronome

init {
timestamps = Metronome(
duration * config.frequency / 1000L,
config.frequency,
)
}

override fun hasNext(): Boolean {
return timestamps.hasNext()
}

override fun next(): Record<K, SpecificRecord> {
check(hasNext()) { "Iterator done" }
val value = topic.newValueInstance()
val time = timestamps.next()
value.put(timeField.pos(), time / 1000.0)
if (timeReceivedField != null) {
value.put(timeReceivedField.pos(), getTimeReceived(time) / 1000.0)
}
for (f in valueFields) {
val type = f.schema().type
var fieldValue: Any?
when (type) {
DOUBLE -> fieldValue = randomDouble
FLOAT -> fieldValue = randomDouble.toFloat()
LONG -> fieldValue = randomDouble.toLong()
INT -> fieldValue = randomDouble.toInt()
ENUM -> fieldValue = getRandomEnum(f.schema())
else -> throw IllegalStateException("Cannot parse type $type")
}
value.put(f.pos(), fieldValue)
}
for (f in unknownFields) {
value.put(f.pos(), f.defaultVal())
}
return Record(key, value)
}

override fun remove() {
throw UnsupportedOperationException()
}
}
private fun getTimeReceived(time: Long): Long = time + Random.nextLong(1, 10)

companion object {
private val ACCEPTABLE_VALUE_TYPES: Set<Type> = EnumSet.of(DOUBLE, FLOAT, INT, LONG, ENUM)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ package org.radarbase.util
class Metronome(
private val samples: Long,
private val frequency: Int,
) {
) : AbstractIterator<Long>() {
private val baseTime: Long
private var iteration: Long = 0

Expand All @@ -41,14 +41,12 @@ class Metronome(
}
}

/** Whether the metronome will generate another sample. */
operator fun hasNext(): Boolean = samples == 0L || iteration < samples

/** Generate the next sample. */
operator fun next(): Long {
check(hasNext()) { "Iterator finished" }

return baseTime + (iteration++).toTimeOffset()
override fun computeNext() {
if (samples != 0L && iteration >= samples) {
done()
} else {
setNext(baseTime + (iteration++).toTimeOffset())
}
}

private fun Long.toTimeOffset(): Long = this * 1000 / frequency
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.hamcrest.core.Is.is;
import static org.junit.jupiter.api.Assertions.assertThrows;

import java.util.NoSuchElementException;
import org.junit.jupiter.api.Test;

public class MetronomeTest {
Expand All @@ -42,7 +43,7 @@ public void timestamps() {
check(it, base - 500L);
check(it, base);
assertThat(it.hasNext(), is(false));
assertThrows(IllegalStateException.class, it::next);
assertThrows(NoSuchElementException.class, it::next);
}

@Test
Expand Down

0 comments on commit 5a4dfb6

Please sign in to comment.