diff --git a/radar-commons-testing/src/main/java/org/radarbase/mock/data/RecordGenerator.kt b/radar-commons-testing/src/main/java/org/radarbase/mock/data/RecordGenerator.kt index 0037ad46..8b59acc6 100644 --- a/radar-commons-testing/src/main/java/org/radarbase/mock/data/RecordGenerator.kt +++ b/radar-commons-testing/src/main/java/org/radarbase/mock/data/RecordGenerator.kt @@ -34,37 +34,29 @@ import kotlin.random.Random /** * Generates records according to the specification in a [MockDataConfig]. + * Given key class must match the one specified in the config. * - * @param type of key to generate - */ -open class RecordGenerator(val config: MockDataConfig, keyClass: Class) { - val topic: AvroTopic + * @param K type of key to generate + * @param config configuration to use + */ +open class RecordGenerator( + val config: MockDataConfig, + keyClass: Class, +) { + val topic: AvroTopic = config.parseAvroTopic() private val timeField: Field private val timeReceivedField: Field? - private val valueFields: MutableList - private val unknownFields: MutableList - private val header: MutableList + private val valueFields: List + private val unknownFields: List + private val header: List - /** - * Generates records according to config. Given key class must match the one specified in the - * config. - * @param config configuration to use - */ init { - // doing type checking below. - topic = config.parseAvroTopic() require(topic.keyClass == keyClass) { - ( - "RecordGenerator only generates ObservationKey keys, not " + - topic.keyClass + " in topic " + topic - ) + "RecordGenerator only generates ObservationKey keys, not ${topic.keyClass} in topic $topic" } require(SpecificRecord::class.java.isAssignableFrom(topic.valueClass)) { - ( - "RecordGenerator only generates SpecificRecord values, not " + - topic.valueClass + " in topic " + topic - ) + "RecordGenerator only generates SpecificRecord values, not ${topic.valueClass} in topic $topic" } header = ArrayList() header.addAll(mutableListOf("projectId", "userId", "sourceId")) @@ -73,32 +65,28 @@ open class RecordGenerator(val config: MockDataConfig, keyCl val valueSchema = topic.valueSchema timeField = forceGetField(valueSchema, "time") timeReceivedField = valueSchema.getField("timeReceived") - var valueFieldNames = config.valueFields - if (valueFieldNames == null) { - valueFieldNames = emptyList() - } - valueFields = ArrayList(valueFieldNames.size) - for (fieldName in valueFieldNames) { - val field = forceGetField(valueSchema, fieldName) - valueFields.add(field) - val type = field.schema().type - require(ACCEPTABLE_VALUE_TYPES.contains(type)) { - ( - "Cannot generate data for type " + type + - " in field " + fieldName + " in topic " + topic - ) + + val valueFieldNames = config.valueFields ?: emptyList() + valueFields = valueFieldNames + .map { fieldName -> forceGetField(valueSchema, fieldName) } + .onEach { field -> + val type = field.schema().type + require(ACCEPTABLE_VALUE_TYPES.contains(type)) { + "Cannot generate data for type $type in field ${field.name()} in topic $topic" + } } - } + unknownFields = ArrayList(valueSchema.fields.size - valueFields.size - 2) + val existingNames = buildSet(valueFieldNames.size + 2) { + add("time") + add("timeReceived") + addAll(valueFieldNames) + } for (field in valueSchema.fields) { header.add(field.name()) - if (field.name() == "time" || field.name() == "timeReceived" || valueFieldNames.contains( - field.name(), - ) - ) { - continue + if (field.name() !in existingNames) { + unknownFields.add(field) } - unknownFields.add(field) } } @@ -130,12 +118,19 @@ open class RecordGenerator(val config: MockDataConfig, keyCl * @param key key to generate data with * @return list containing simulated values */ - open fun iteratableRawValues(key: K, duration: Long): Iterable> { - return Iterable { - val baseIterator = iterateValues(key, duration) - RecordArrayIterator(baseIterator) + open fun iteratableRawValues(key: K, duration: Long): Iterable> = iterateValues(key, duration).asSequence() + .map { record -> + val keyFieldsSize = record.key.schema.fields.size + val valueFieldsSize = record.value.schema.fields.size + Array(keyFieldsSize + valueFieldsSize) { idx -> + if (idx < keyFieldsSize) { + record.key[idx] + } else { + record.value[idx - keyFieldsSize] + }.toString() + } } - } + .asIterable() /** * Simulates data of a sensor with the given frequency for a time interval specified by @@ -144,8 +139,33 @@ open class RecordGenerator(val config: MockDataConfig, keyCl * @param key key to generate data with * @return list containing simulated values */ - fun iterateValues(key: K, duration: Long): Iterator> { - return RecordIterator(duration, key) + fun iterateValues(key: K, duration: Long): Iterator> = Metronome( + duration * config.frequency / 1000L, + config.frequency, + ) + .asSequence() + .map { time -> Record(key, createValue(time)) } + .iterator() + + private fun createValue(time: Long) = topic.newValueInstance().apply { + put(timeField.pos(), time / 1000.0) + if (timeReceivedField != null) { + put(timeReceivedField.pos(), getTimeReceived(time) / 1000.0) + } + for (f in valueFields) { + val fieldValue: Any = when (val type = f.schema().type) { + DOUBLE -> randomDouble + FLOAT -> randomDouble.toFloat() + LONG -> randomDouble.toLong() + INT -> randomDouble.toInt() + ENUM -> getRandomEnum(f.schema()) + else -> throw IllegalStateException("Cannot parse type $type") + } + put(f.pos(), fieldValue) + } + for (f in unknownFields) { + put(f.pos(), f.defaultVal()) + } } private val randomDouble: Double @@ -161,79 +181,7 @@ open class RecordGenerator(val config: MockDataConfig, keyCl * @return random `Double` representing the Round Trip Time for the given timestamp * using `ThreadLocalRandom` */ - private fun getTimeReceived(time: Long): Long { - return time + Random.nextLong(1, 10) - } - - private class RecordArrayIterator( - private val baseIterator: Iterator>, - ) : MutableIterator> { - override fun hasNext(): Boolean = baseIterator.hasNext() - - override fun next(): Array { - val record = baseIterator.next() - val keyFieldsSize = record.key.schema.fields.size - val valueFieldsSize = record.value.schema.fields.size - return Array(keyFieldsSize + valueFieldsSize) { idx -> - if (idx < keyFieldsSize) { - record.key[idx] - } else { - record.value[idx - keyFieldsSize] - }.toString() - } - } - - override fun remove() { - throw UnsupportedOperationException("remove") - } - } - - private inner class RecordIterator(duration: Long, private val key: K) : - MutableIterator> { - private val timestamps: Metronome - - init { - timestamps = Metronome( - duration * config.frequency / 1000L, - config.frequency, - ) - } - - override fun hasNext(): Boolean { - return timestamps.hasNext() - } - - override fun next(): Record { - check(hasNext()) { "Iterator done" } - val value = topic.newValueInstance() - val time = timestamps.next() - value.put(timeField.pos(), time / 1000.0) - if (timeReceivedField != null) { - value.put(timeReceivedField.pos(), getTimeReceived(time) / 1000.0) - } - for (f in valueFields) { - val type = f.schema().type - var fieldValue: Any? - when (type) { - DOUBLE -> fieldValue = randomDouble - FLOAT -> fieldValue = randomDouble.toFloat() - LONG -> fieldValue = randomDouble.toLong() - INT -> fieldValue = randomDouble.toInt() - ENUM -> fieldValue = getRandomEnum(f.schema()) - else -> throw IllegalStateException("Cannot parse type $type") - } - value.put(f.pos(), fieldValue) - } - for (f in unknownFields) { - value.put(f.pos(), f.defaultVal()) - } - return Record(key, value) - } - - override fun remove() { - throw UnsupportedOperationException() - } - } + private fun getTimeReceived(time: Long): Long = time + Random.nextLong(1, 10) companion object { private val ACCEPTABLE_VALUE_TYPES: Set = EnumSet.of(DOUBLE, FLOAT, INT, LONG, ENUM) diff --git a/radar-commons-testing/src/main/java/org/radarbase/util/Metronome.kt b/radar-commons-testing/src/main/java/org/radarbase/util/Metronome.kt index 3765d54d..0e832290 100644 --- a/radar-commons-testing/src/main/java/org/radarbase/util/Metronome.kt +++ b/radar-commons-testing/src/main/java/org/radarbase/util/Metronome.kt @@ -27,7 +27,7 @@ package org.radarbase.util class Metronome( private val samples: Long, private val frequency: Int, -) { +) : AbstractIterator() { private val baseTime: Long private var iteration: Long = 0 @@ -41,14 +41,12 @@ class Metronome( } } - /** Whether the metronome will generate another sample. */ - operator fun hasNext(): Boolean = samples == 0L || iteration < samples - - /** Generate the next sample. */ - operator fun next(): Long { - check(hasNext()) { "Iterator finished" } - - return baseTime + (iteration++).toTimeOffset() + override fun computeNext() { + if (samples != 0L && iteration >= samples) { + done() + } else { + setNext(baseTime + (iteration++).toTimeOffset()) + } } private fun Long.toTimeOffset(): Long = this * 1000 / frequency diff --git a/radar-commons-testing/src/test/java/org/radarbase/util/MetronomeTest.java b/radar-commons-testing/src/test/java/org/radarbase/util/MetronomeTest.java index ac66ea1b..60960445 100644 --- a/radar-commons-testing/src/test/java/org/radarbase/util/MetronomeTest.java +++ b/radar-commons-testing/src/test/java/org/radarbase/util/MetronomeTest.java @@ -22,6 +22,7 @@ import static org.hamcrest.core.Is.is; import static org.junit.jupiter.api.Assertions.assertThrows; +import java.util.NoSuchElementException; import org.junit.jupiter.api.Test; public class MetronomeTest { @@ -42,7 +43,7 @@ public void timestamps() { check(it, base - 500L); check(it, base); assertThat(it.hasNext(), is(false)); - assertThrows(IllegalStateException.class, it::next); + assertThrows(NoSuchElementException.class, it::next); } @Test