From c1215549ccf5e6b40baff0bdee3e978ada684e45 Mon Sep 17 00:00:00 2001 From: Anqi Date: Wed, 24 Jan 2024 10:52:37 +0800 Subject: [PATCH 1/2] support to filter the datasource --- .../exchange/common/config/Configs.scala | 19 +++++- .../common/config/SchemaConfigs.scala | 9 ++- .../com/vesoft/nebula/exchange/Exchange.scala | 63 +++++-------------- .../com/vesoft/nebula/exchange/Exchange.scala | 63 +++++-------------- .../com/vesoft/nebula/exchange/Exchange.scala | 63 +++++-------------- 5 files changed, 74 insertions(+), 143 deletions(-) diff --git a/exchange-common/src/main/scala/com/vesoft/exchange/common/config/Configs.scala b/exchange-common/src/main/scala/com/vesoft/exchange/common/config/Configs.scala index 5552c1c3..6224f62b 100644 --- a/exchange-common/src/main/scala/com/vesoft/exchange/common/config/Configs.scala +++ b/exchange-common/src/main/scala/com/vesoft/exchange/common/config/Configs.scala @@ -187,6 +187,10 @@ case class UdfConfigEntry(sep: String, oldColNames: List[String], newColName: St } } +case class FilterConfigEntry(filter: String){ + override def toString(): String = s"filter:$filter" +} + /** * */ @@ -467,6 +471,10 @@ object Configs { Some(UdfConfigEntry(sep, cols, newCol)) } else None + val filterConfig = if(tagConfig.hasPath("filter")) { + Some(FilterConfigEntry(tagConfig.getString("filter"))) + } else None + LOG.info(s"name ${tagName} batch ${batch}") val entry = TagConfigEntry( tagName, @@ -485,7 +493,8 @@ object Configs { enableTagless, ignoreIndex, deleteEdge, - vertexUdf + vertexUdf, + filterConfig ) LOG.info(s"Tag Config: ${entry}") tags += entry @@ -608,6 +617,11 @@ object Configs { Some(UdfConfigEntry(sep, cols, newCol)) } else None + + val filterConfig = if (edgeConfig.hasPath("filter")) { + Some(FilterConfigEntry(edgeConfig.getString("filter"))) + } else None + val entry = EdgeConfigEntry( edgeName, sourceConfig, @@ -631,7 +645,8 @@ object Configs { repartitionWithNebula, ignoreIndex, srcUdf, - dstUdf + dstUdf, + filterConfig ) LOG.info(s"Edge Config: ${entry}") edges += entry diff --git a/exchange-common/src/main/scala/com/vesoft/exchange/common/config/SchemaConfigs.scala b/exchange-common/src/main/scala/com/vesoft/exchange/common/config/SchemaConfigs.scala index 08381303..3138e6dd 100644 --- a/exchange-common/src/main/scala/com/vesoft/exchange/common/config/SchemaConfigs.scala +++ b/exchange-common/src/main/scala/com/vesoft/exchange/common/config/SchemaConfigs.scala @@ -69,7 +69,8 @@ case class TagConfigEntry(override val name: String, enableTagless: Boolean = false, ignoreIndex: Boolean = false, deleteEdge: Boolean = false, - vertexUdf: Option[UdfConfigEntry] = None) + vertexUdf: Option[UdfConfigEntry] = None, + filterConfig: Option[FilterConfigEntry] = None) extends SchemaConfigEntry { require(name.trim.nonEmpty, "tag name cannot be empty") require(vertexField.trim.nonEmpty, "tag vertex id cannot be empty") @@ -89,7 +90,8 @@ case class TagConfigEntry(override val name: String, s"repartitionWithNebula: $repartitionWithNebula, " + s"enableTagless: $enableTagless, " + s"ignoreIndex: $ignoreIndex, " + - s"vertexUdf: $vertexUdf." + s"vertexUdf: $vertexUdf, " + + s"filter: $filterConfig." } } @@ -134,7 +136,8 @@ case class EdgeConfigEntry(override val name: String, repartitionWithNebula: Boolean = false, ignoreIndex: Boolean = false, srcVertexUdf: Option[UdfConfigEntry] = None, - dstVertexUdf: Option[UdfConfigEntry] = None) + dstVertexUdf: Option[UdfConfigEntry] = None, + filterConfig: Option[FilterConfigEntry] = None) extends SchemaConfigEntry { require(name.trim.nonEmpty, "edge name cannot be empty") require(sourceField.trim.nonEmpty, "edge source id cannot be empty") diff --git a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala index 447c085b..fd3f055d 100644 --- a/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala +++ b/nebula-exchange_spark_2.2/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala @@ -10,47 +10,8 @@ import org.apache.spark.sql.{Column, DataFrame, SparkSession} import java.io.File import com.vesoft.exchange.Argument import com.vesoft.exchange.common.{CheckPointHandler, ErrorHandler} -import com.vesoft.exchange.common.config.{ - ClickHouseConfigEntry, - Configs, - DataSourceConfigEntry, - EdgeConfigEntry, - FileBaseSourceConfigEntry, - HBaseSourceConfigEntry, - HiveSourceConfigEntry, - JanusGraphSourceConfigEntry, - JdbcConfigEntry, - KafkaSourceConfigEntry, - MaxComputeConfigEntry, - MySQLSourceConfigEntry, - Neo4JSourceConfigEntry, - OracleConfigEntry, - PostgreSQLSourceConfigEntry, - PulsarSourceConfigEntry, - SchemaConfigEntry, - SinkCategory, - SourceCategory, - TagConfigEntry, - UdfConfigEntry -} -import com.vesoft.nebula.exchange.reader.{ - CSVReader, - ClickhouseReader, - HBaseReader, - HiveReader, - JSONReader, - JanusGraphReader, - JdbcReader, - KafkaReader, - MaxcomputeReader, - MySQLReader, - Neo4JReader, - ORCReader, - OracleReader, - ParquetReader, - PostgreSQLReader, - PulsarReader -} +import com.vesoft.exchange.common.config.{ClickHouseConfigEntry, Configs, DataSourceConfigEntry, EdgeConfigEntry, FileBaseSourceConfigEntry, FilterConfigEntry, HBaseSourceConfigEntry, HiveSourceConfigEntry, JanusGraphSourceConfigEntry, JdbcConfigEntry, KafkaSourceConfigEntry, MaxComputeConfigEntry, MySQLSourceConfigEntry, Neo4JSourceConfigEntry, OracleConfigEntry, PostgreSQLSourceConfigEntry, PulsarSourceConfigEntry, SchemaConfigEntry, SinkCategory, SourceCategory, TagConfigEntry, UdfConfigEntry} +import com.vesoft.nebula.exchange.reader.{CSVReader, ClickhouseReader, HBaseReader, HiveReader, JSONReader, JanusGraphReader, JdbcReader, KafkaReader, MaxcomputeReader, MySQLReader, Neo4JReader, ORCReader, OracleReader, ParquetReader, PostgreSQLReader, PulsarReader} import com.vesoft.exchange.common.processor.ReloadProcessor import com.vesoft.exchange.common.utils.SparkValidate import com.vesoft.nebula.exchange.processor.{EdgeProcessor, VerticesProcessor} @@ -124,7 +85,7 @@ object Exchange { var totalClientBatchFailure: Long = 0L var totalClientRecordSuccess: Long = 0L var totalClientRecordFailure: Long = 0L - var totalSstRecordSuccess: Long = 0l + var totalSstRecordSuccess: Long = 0L var totalSstRecordFailure: Long = 0L // reload for failed import tasks @@ -171,10 +132,11 @@ object Exchange { data.get.show(truncate = false) } if (data.isDefined && !c.dry) { - val df = if (tagConfig.vertexUdf.isDefined) { - dataUdf(data.get, tagConfig.vertexUdf.get) + var df = filterDf(data.get, tagConfig.filterConfig) + df = if (tagConfig.vertexUdf.isDefined) { + dataUdf(df, tagConfig.vertexUdf.get) } else { - data.get + df } val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.${tagConfig.name}") @@ -238,7 +200,7 @@ object Exchange { data.get.show(truncate = false) } if (data.isDefined && !c.dry) { - var df = data.get + var df = filterDf(data.get, edgeConfig.filterConfig) if (edgeConfig.srcVertexUdf.isDefined) { df = dataUdf(df, edgeConfig.srcVertexUdf.get) } @@ -437,4 +399,13 @@ object Exchange { finalColNames.append(concat_ws(sep, oldCols.map(c => col(c)): _*).cast(StringType).as(newCol)) data.select(finalColNames: _*) } + + + private[this] def filterDf(data: DataFrame, filter: Option[FilterConfigEntry]): DataFrame = { + if (filter.isDefined && filter.get != null && filter.get.filter != null) { + data.filter(filter.get.filter) + } else { + data + } + } } diff --git a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala index 26622a16..ecc3374a 100644 --- a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala +++ b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala @@ -10,47 +10,8 @@ import org.apache.spark.sql.{Column, DataFrame, SparkSession} import java.io.File import com.vesoft.exchange.Argument import com.vesoft.exchange.common.{CheckPointHandler, ErrorHandler} -import com.vesoft.exchange.common.config.{ - ClickHouseConfigEntry, - Configs, - DataSourceConfigEntry, - EdgeConfigEntry, - FileBaseSourceConfigEntry, - HBaseSourceConfigEntry, - HiveSourceConfigEntry, - JanusGraphSourceConfigEntry, - JdbcConfigEntry, - KafkaSourceConfigEntry, - MaxComputeConfigEntry, - MySQLSourceConfigEntry, - Neo4JSourceConfigEntry, - OracleConfigEntry, - PostgreSQLSourceConfigEntry, - PulsarSourceConfigEntry, - SchemaConfigEntry, - SinkCategory, - SourceCategory, - TagConfigEntry, - UdfConfigEntry -} -import com.vesoft.nebula.exchange.reader.{ - CSVReader, - ClickhouseReader, - HBaseReader, - HiveReader, - JSONReader, - JanusGraphReader, - JdbcReader, - KafkaReader, - MaxcomputeReader, - MySQLReader, - Neo4JReader, - ORCReader, - OracleReader, - ParquetReader, - PostgreSQLReader, - PulsarReader -} +import com.vesoft.exchange.common.config.{ClickHouseConfigEntry, Configs, DataSourceConfigEntry, EdgeConfigEntry, FileBaseSourceConfigEntry, FilterConfigEntry, HBaseSourceConfigEntry, HiveSourceConfigEntry, JanusGraphSourceConfigEntry, JdbcConfigEntry, KafkaSourceConfigEntry, MaxComputeConfigEntry, MySQLSourceConfigEntry, Neo4JSourceConfigEntry, OracleConfigEntry, PostgreSQLSourceConfigEntry, PulsarSourceConfigEntry, SchemaConfigEntry, SinkCategory, SourceCategory, TagConfigEntry, UdfConfigEntry} +import com.vesoft.nebula.exchange.reader.{CSVReader, ClickhouseReader, HBaseReader, HiveReader, JSONReader, JanusGraphReader, JdbcReader, KafkaReader, MaxcomputeReader, MySQLReader, Neo4JReader, ORCReader, OracleReader, ParquetReader, PostgreSQLReader, PulsarReader} import com.vesoft.exchange.common.processor.ReloadProcessor import com.vesoft.exchange.common.utils.SparkValidate import com.vesoft.nebula.exchange.processor.{EdgeProcessor, VerticesProcessor} @@ -124,7 +85,7 @@ object Exchange { var totalClientBatchFailure: Long = 0L var totalClientRecordSuccess: Long = 0L var totalClientRecordFailure: Long = 0L - var totalSstRecordSuccess: Long = 0l + var totalSstRecordSuccess: Long = 0L var totalSstRecordFailure: Long = 0L // reload for failed import tasks @@ -170,10 +131,11 @@ object Exchange { data.get.show(truncate = false) } if (data.isDefined && !c.dry) { - val df = if (tagConfig.vertexUdf.isDefined) { - dataUdf(data.get, tagConfig.vertexUdf.get) + var df = filterDf(data.get, tagConfig.filterConfig) + df = if (tagConfig.vertexUdf.isDefined) { + dataUdf(df, tagConfig.vertexUdf.get) } else { - data.get + df } val batchSuccess = @@ -237,7 +199,7 @@ object Exchange { data.get.show(truncate = false) } if (data.isDefined && !c.dry) { - var df = data.get + var df = filterDf(data.get, edgeConfig.filterConfig) if (edgeConfig.srcVertexUdf.isDefined) { df = dataUdf(df, edgeConfig.srcVertexUdf.get) } @@ -436,4 +398,13 @@ object Exchange { finalColNames.append(concat_ws(sep, oldCols.map(c => col(c)): _*).cast(StringType).as(newCol)) data.select(finalColNames: _*) } + + private[this] def filterDf(data: DataFrame, filter: Option[FilterConfigEntry]): DataFrame = { + data.show() + if (filter.isDefined && filter.get != null && filter.get.filter != null) { + data.filter(filter.get.filter) + } else { + data + } + } } diff --git a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala index 6de542ee..bef41bb2 100644 --- a/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala +++ b/nebula-exchange_spark_3.0/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala @@ -10,47 +10,8 @@ import org.apache.spark.sql.{Column, DataFrame, SparkSession} import java.io.File import com.vesoft.exchange.Argument import com.vesoft.exchange.common.{CheckPointHandler, ErrorHandler} -import com.vesoft.exchange.common.config.{ - ClickHouseConfigEntry, - Configs, - DataSourceConfigEntry, - EdgeConfigEntry, - FileBaseSourceConfigEntry, - HBaseSourceConfigEntry, - HiveSourceConfigEntry, - JanusGraphSourceConfigEntry, - JdbcConfigEntry, - KafkaSourceConfigEntry, - MaxComputeConfigEntry, - MySQLSourceConfigEntry, - Neo4JSourceConfigEntry, - OracleConfigEntry, - PostgreSQLSourceConfigEntry, - PulsarSourceConfigEntry, - SchemaConfigEntry, - SinkCategory, - SourceCategory, - TagConfigEntry, - UdfConfigEntry -} -import com.vesoft.nebula.exchange.reader.{ - CSVReader, - ClickhouseReader, - HBaseReader, - HiveReader, - JSONReader, - JanusGraphReader, - JdbcReader, - KafkaReader, - MaxcomputeReader, - MySQLReader, - Neo4JReader, - ORCReader, - OracleReader, - ParquetReader, - PostgreSQLReader, - PulsarReader -} +import com.vesoft.exchange.common.config.{ClickHouseConfigEntry, Configs, DataSourceConfigEntry, EdgeConfigEntry, FileBaseSourceConfigEntry, FilterConfigEntry, HBaseSourceConfigEntry, HiveSourceConfigEntry, JanusGraphSourceConfigEntry, JdbcConfigEntry, KafkaSourceConfigEntry, MaxComputeConfigEntry, MySQLSourceConfigEntry, Neo4JSourceConfigEntry, OracleConfigEntry, PostgreSQLSourceConfigEntry, PulsarSourceConfigEntry, SchemaConfigEntry, SinkCategory, SourceCategory, TagConfigEntry, UdfConfigEntry} +import com.vesoft.nebula.exchange.reader.{CSVReader, ClickhouseReader, HBaseReader, HiveReader, JSONReader, JanusGraphReader, JdbcReader, KafkaReader, MaxcomputeReader, MySQLReader, Neo4JReader, ORCReader, OracleReader, ParquetReader, PostgreSQLReader, PulsarReader} import com.vesoft.exchange.common.processor.ReloadProcessor import com.vesoft.exchange.common.utils.SparkValidate import com.vesoft.nebula.exchange.processor.{EdgeProcessor, VerticesProcessor} @@ -124,7 +85,7 @@ object Exchange { var totalClientBatchFailure: Long = 0L var totalClientRecordSuccess: Long = 0L var totalClientRecordFailure: Long = 0L - var totalSstRecordSuccess: Long = 0l + var totalSstRecordSuccess: Long = 0L var totalSstRecordFailure: Long = 0L // reload for failed import tasks @@ -170,10 +131,11 @@ object Exchange { data.get.show(truncate = false) } if (data.isDefined && !c.dry) { - val df = if (tagConfig.vertexUdf.isDefined) { - dataUdf(data.get, tagConfig.vertexUdf.get) + var df = filterDf(data.get, tagConfig.filterConfig) + df = if (tagConfig.vertexUdf.isDefined) { + dataUdf(df, tagConfig.vertexUdf.get) } else { - data.get + df } val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.${tagConfig.name}") @@ -236,7 +198,7 @@ object Exchange { data.get.show(truncate = false) } if (data.isDefined && !c.dry) { - var df = data.get + var df = filterDf(data.get, edgeConfig.filterConfig) if (edgeConfig.srcVertexUdf.isDefined) { df = dataUdf(df, edgeConfig.srcVertexUdf.get) } @@ -434,4 +396,13 @@ object Exchange { finalColNames.append(concat_ws(sep, oldCols.map(c => col(c)): _*).cast(StringType).as(newCol)) data.select(finalColNames: _*) } + + + private[this] def filterDf(data: DataFrame, filter: Option[FilterConfigEntry]): DataFrame = { + if (filter.isDefined && filter.get != null && filter.get.filter != null) { + data.filter(filter.get.filter) + } else { + data + } + } } From 773c4c307c2e5fa928b25f61118aaa866c554e6d Mon Sep 17 00:00:00 2001 From: Anqi Date: Thu, 22 Feb 2024 16:15:10 +0800 Subject: [PATCH 2/2] support spark 2.3.2 & do not support kafka --- nebula-exchange_spark_2.4/pom.xml | 2 +- .../com/vesoft/nebula/exchange/Exchange.scala | 2 +- .../exchange/processor/EdgeProcessor.scala | 17 +---------------- .../exchange/processor/VerticesProcessor.scala | 17 +---------------- pom.xml | 2 +- 5 files changed, 5 insertions(+), 35 deletions(-) diff --git a/nebula-exchange_spark_2.4/pom.xml b/nebula-exchange_spark_2.4/pom.xml index ac794e9b..18bfec5f 100644 --- a/nebula-exchange_spark_2.4/pom.xml +++ b/nebula-exchange_spark_2.4/pom.xml @@ -16,7 +16,7 @@ 1.8 1.8 2.11.12 - 2.4.4 + 2.3.2 1.5.0 3.9.2 2.4.5-M1 diff --git a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala index ecc3374a..5dc3be2b 100644 --- a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala +++ b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/Exchange.scala @@ -79,7 +79,7 @@ object Exchange { val spark = session.getOrCreate() // check the spark version - SparkValidate.validate(spark.version, "2.4.*") + SparkValidate.validate(spark.version, "2.3.*") val startTime = System.currentTimeMillis() var totalClientBatchSuccess: Long = 0L var totalClientBatchFailure: Long = 0L diff --git a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala index 35c08cb7..de8f81a0 100644 --- a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala +++ b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/EdgeProcessor.scala @@ -163,22 +163,7 @@ class EdgeProcessor(spark: SparkSession, }(Encoders.kryo[Edge]) // streaming write - if (streamFlag) { - val streamingDataSourceConfig = - edgeConfig.dataSourceConfigEntry.asInstanceOf[StreamingDataSourceConfigEntry] - val wStream = edgeFrame.writeStream - if (edgeConfig.checkPointPath.isDefined) - wStream.option("checkpointLocation", edgeConfig.checkPointPath.get) - - wStream - .foreachBatch((edges, batchId) => { - LOG.info(s">>>>> ${edgeConfig.name} edge start batch ${batchId}.") - edges.foreachPartition(processEachPartition _) - }) - .trigger(Trigger.ProcessingTime(s"${streamingDataSourceConfig.intervalSeconds} seconds")) - .start() - .awaitTermination() - } else + if (streamFlag) {} else edgeFrame.foreachPartition(processEachPartition _) } } diff --git a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala index a79f6950..7ce3f9da 100644 --- a/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala +++ b/nebula-exchange_spark_2.4/src/main/scala/com/vesoft/nebula/exchange/processor/VerticesProcessor.scala @@ -174,22 +174,7 @@ class VerticesProcessor(spark: SparkSession, }(Encoders.kryo[Vertex]) // streaming write - if (streamFlag) { - val streamingDataSourceConfig = - tagConfig.dataSourceConfigEntry.asInstanceOf[StreamingDataSourceConfigEntry] - val wStream = vertices.writeStream - if (tagConfig.checkPointPath.isDefined) - wStream.option("checkpointLocation", tagConfig.checkPointPath.get) - - wStream - .foreachBatch((vertexSet, batchId) => { - LOG.info(s">>>>> ${tagConfig.name} tag start batch ${batchId}.") - vertexSet.foreachPartition(processEachPartition _) - }) - .trigger(Trigger.ProcessingTime(s"${streamingDataSourceConfig.intervalSeconds} seconds")) - .start() - .awaitTermination() - } else + if (streamFlag) {} else vertices.foreachPartition(processEachPartition _) } } diff --git a/pom.xml b/pom.xml index 6aecd197..3cd48343 100644 --- a/pom.xml +++ b/pom.xml @@ -243,7 +243,7 @@ spark-2.4 - 2.4.4 + 2.3.2 true