Skip to content

Commit

Permalink
Merge pull request #324 from InterestingLab/rickyhuo.enhance.output.k…
Browse files Browse the repository at this point in the history
…afka

Rickyhuo.enhance.output.kafka
  • Loading branch information
garyelephant authored Jun 2, 2019
2 parents d42de85 + 756a2dd commit 205c9d5
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 19 deletions.
4 changes: 1 addition & 3 deletions docs/zh-cn/configuration/input-plugins/KafkaStream.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ Kafka集群地址,多个用","隔开

Spark Structured Streaming 中 Kafka Source 可选参数参考 [Structured Streaming + Kafka Integration Guide](https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#reading-data-from-kafka)

指定参数的方式是在原参数名称上加上前缀"consumer.",如指定`rebalance.max.retries`的方式是: `consumer.rebalance.max.retries = 100`。如果不指定这些非必须参数,它们将使用Kafka官方文档给出的默认值。
指定参数的方式是在原参数名称上加上前缀"consumer.",如指定`auto.offset.reset`的方式是: `consumer.auto.offset.reset = latest`。如果不指定这些非必须参数,它们将使用Kafka官方文档给出的默认值。

##### offset.location [string]

Expand All @@ -53,7 +53,6 @@ kafkaStream {
topics = "waterdrop"
consumer.bootstrap.servers = "localhost:9092"
consumer.group.id = "waterdrop_group"
consumer.rebalance.max.retries = 100
}
```

Expand All @@ -64,7 +63,6 @@ kafkaStream {
topics = "waterdrop"
consumer.bootstrap.servers = "localhost:9092"
consumer.group.id = "waterdrop_group"
consumer.rebalance.max.retries = 100
consumer.failOnDataLoss = false
}
```
Expand Down
14 changes: 11 additions & 3 deletions docs/zh-cn/configuration/output-plugins/Kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
| [producer.bootstrap.servers](#producerbootstrapservers-string) | string | yes | - | all streaming |
| [topic](#topic-string) | string | yes | - | all streaming |
| [producer.*](#producer-string) | string | no | - | all streaming |
| [serializer](#serializer-string) | string | no | json | all streaming |
| [streaming_output_mode](#streaming_output_mode-string) | string | no | append | structured streaming |
| [checkpointLocation](#checkpointLocation-string) | string | no | - | structured streaming |

Expand All @@ -34,15 +35,22 @@ Kafka Topic
指定参数的方式是在原参数名称上加上前缀"producer.",如指定`request.timeout.ms`的方式是: `producer.request.timeout.ms = 60000`。如果不指定这些非必须参数,它们将使用Kafka官方文档给出的默认值。


### Notes
###### Notes

在作为structured streaming 的output的时候,你可以添加一些额外的参数,来达到相应的效果

#### checkpointLocation [string]
##### checkpointLocation [string]

你可以指定是否启用checkpoint,通过配置**checkpointLocation**这个参数

#### streaming_output_mode [string]
##### streaming_output_mode [string]

你可以指定输出模式,complete|append|update三种,详见Spark文档http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes

##### serializer [string]

序列化方法,当前支持json和text,如果选择text方式,需保证数据结构中仅有一个字段。

### Examples

> spark streaming or batch
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.github.interestinglab.waterdrop;

/**
*/
public class UserRuntimeException extends RuntimeException {

public UserRuntimeException() {
super();
}

public UserRuntimeException(String message) {
super(message);
}

public UserRuntimeException(String message, Throwable cause) {
super(message, cause);
}

public UserRuntimeException(Throwable cause) {
super(cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ object Waterdrop extends Logging {
case Success(_) => {}
case Failure(exception) => {
exception match {
case e: ConfigRuntimeException => showConfigError(e)
case e @ (_: ConfigRuntimeException | _: UserRuntimeException) => Waterdrop.showConfigError(e)
case e: Exception => showFatalError(e)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ object WaterdropStructuredStreaming extends Logging {
case Success(_) => {}
case Failure(exception) => {
exception match {
case e: ConfigRuntimeException => Waterdrop.showConfigError(e)
case e @ (_: ConfigRuntimeException | _: UserRuntimeException) => Waterdrop.showConfigError(e)
case e: OffsetOutOfRangeException => showKnownError("Please remove checkpoint dir.", e)
case e: Exception => Waterdrop.showFatalError(e)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package io.github.interestinglab.waterdrop.output.batch
import java.util.Properties

import com.typesafe.config.{Config, ConfigFactory}
import io.github.interestinglab.waterdrop.UserRuntimeException
import io.github.interestinglab.waterdrop.apis.BaseOutput
import io.github.interestinglab.waterdrop.config.TypesafeConfigUtils
import io.github.interestinglab.waterdrop.output.utils.KafkaProducerUtil
Expand Down Expand Up @@ -77,12 +78,29 @@ class Kafka extends BaseOutput {

override def process(df: Dataset[Row]) {

val dataSet = df.toJSON
dataSet.foreach { row =>
kafkaSink.foreach { ks =>
ks.value.send(config.getString("topic"), row)
val topic = config.getString("topic")
config.getString("serializer") match {
case "text" => {
if (df.schema.size != 1) {
throw new UserRuntimeException(
s"Text data source supports only a single column," +
s" and you have ${df.schema.size} columns.")
} else {
df.foreach { row =>
kafkaSink.foreach { ks =>
ks.value.send(topic, row.getAs[String](0))
}
}
}
}
case _ => {
val dataSet = df.toJSON
dataSet.foreach { row =>
kafkaSink.foreach { ks =>
ks.value.send(topic, row)
}
}
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import java.util.Properties

import com.alibaba.fastjson.JSONObject
import com.typesafe.config.{Config, ConfigFactory}
import io.github.interestinglab.waterdrop.UserRuntimeException
import io.github.interestinglab.waterdrop.apis.BaseStructuredStreamingOutput
import io.github.interestinglab.waterdrop.config.TypesafeConfigUtils
import io.github.interestinglab.waterdrop.output.utils.{KafkaProducerUtil, StructuredUtils}
Expand Down Expand Up @@ -39,6 +40,7 @@ class Kafka extends BaseStructuredStreamingOutput {
topic = config.getString("topic")
val defaultConfig = ConfigFactory.parseMap(
Map(
"serializer" -> "json",
"streaming_output_mode" -> "Append",
"trigger_type" -> "default",
producerPrefix + "retries" -> 2,
Expand All @@ -50,7 +52,8 @@ class Kafka extends BaseStructuredStreamingOutput {
)
config = config.withFallback(defaultConfig)
val props = new Properties()
TypesafeConfigUtils.extractSubConfig(config, producerPrefix, false)
TypesafeConfigUtils
.extractSubConfig(config, producerPrefix, false)
.entrySet()
.foreach(entry => {
val key = entry.getKey
Expand All @@ -68,7 +71,8 @@ class Kafka extends BaseStructuredStreamingOutput {

TypesafeConfigUtils.hasSubConfig(config, outConfPrefix) match {
case true => {
TypesafeConfigUtils.extractSubConfig(config, outConfPrefix, false)
TypesafeConfigUtils
.extractSubConfig(config, outConfPrefix, false)
.entrySet()
.foreach(entry => {
val key = entry.getKey
Expand All @@ -89,10 +93,25 @@ class Kafka extends BaseStructuredStreamingOutput {
* Things to do with each Row.
**/
override def process(row: Row): Unit = {
val json = new JSONObject()
row.schema.fieldNames
.foreach(field => json.put(field, row.getAs(field)))
kafkaSink.value.send(topic, json.toJSONString)

config.getString("serializer") match {
case "text" => {
if (row.schema.size != 1) {
throw new UserRuntimeException(
s"Text data source supports only a single column," +
s" and you have ${row.schema.size} columns.")
} else {
kafkaSink.value.send(topic, row.getAs[String](0))
}
}
case _ => {
val json = new JSONObject()
row.schema.fieldNames
.foreach(field => json.put(field, row.getAs(field)))
kafkaSink.value.send(topic, json.toJSONString)
}
}

}

/**
Expand Down

0 comments on commit 205c9d5

Please sign in to comment.