Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

数据类型转换异常 #77

Open
JialeHe opened this issue Aug 11, 2023 · 10 comments
Open

数据类型转换异常 #77

JialeHe opened this issue Aug 11, 2023 · 10 comments

Comments

@JialeHe
Copy link

JialeHe commented Aug 11, 2023

我在使用Spark-Connector读取Starrocks数据时,遇到数据类型转换的问题。
我目前使用的版本信息
Starrocks: 2.5.1
Spark: 3.2.0
Starrocks-Spark-Connector: 1.1.0

背景如下:
image

表中有个字段 dt 类型是 date
使用SparkSQL DataFrame读取Starrock, 这张表的数据。报错如下:

if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, DateType, fromJavaDate, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 37, dt), DateType), true, false) AS dt#1051
	at org.apache.spark.sql.errors.QueryExecutionErrors$.expressionEncodingError(QueryExecutionErrors.scala:1052)
	at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:210)
	at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:193)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:286)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$16(FileFormatWriter.scala:229)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1469)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.RuntimeException: java.lang.String is not a valid external type for schema of date
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.StaticInvoke_16$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_12$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
	at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:207)
	... 15 more

初步怀疑是因为Connector从Starrocks中读取到的数据 dt 字段是String类型,但是其schema又是DateType,所以报错。
image

简单复现代码:

import org.apache.spark.sql.types.{DateType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}

object Test2 {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder()
      .master("local[*]")
      .appName("Spark 3 Demo1")
      .getOrCreate()
    val data = Seq(
      Row("Kim", "Seoul", "2023-08-01"),
      Row("Lee", "Busan", "2023-08-01"),
      Row("Park", "Jeju", "2023-08-01"),
      Row("Jeong", "Daejon", "2023-08-01")
    )
    val schema = List(
      StructField("name", StringType, nullable = true),
      StructField("city", StringType, nullable = true),
      StructField("dt", DateType, nullable = true)
    )
    spark
      .createDataFrame(spark.sparkContext.parallelize(data), StructType(schema))
      .show(false)
  }
}
image

官网说明 Connector 1.1.0 已经把DATE的数据类型映射从StringType改成了DateType,但是还是出现这个报错。
感觉是只改了Schema映射,实际读取到的数据还是StringType导致的异常。

所以我将Connector 改成了 1.0.0 版本,dt的schema为StringType时便不再报错
image

@banmoy
Copy link
Collaborator

banmoy commented Aug 15, 2023

你的代码里如果只执行 dataSet.show() 会有问题吗?finalSql是什么样的?

@JialeHe
Copy link
Author

JialeHe commented Aug 15, 2023

@banmoy

starrocks table: edw_dm.dm_crowd_push_wide_acc_d, view_table: dm_crowd_push_wide_acc_d, sql: select DT,SOURCE_UID,PRODUCT,PROJECT,CATEGORY_TYPE,UNACTIVE_DAYS,GROUP_ID,REGION_LABEL,USE_TIME_LABEL,INTERACTIVE_USER_LABEL,COIN_LABEL,PUSH_TUNNEL,PUSH_ID,BRAND,DEVICE_MODEL,VERSION_CODE,OS_VERSION,PUSH_CLICK_RATE,ACCOUNT_ID,MEMBER,ACC_NO_PAY_DAYS,MAX_PAY_ANT,LAST_PAY_ANT,LATEST_PACKAGE_CHANNEL,FIRST_FOLLOW_DAYS,READ_TYPE,READ_DAY_BOOK_NAME_STR,FIRST_ACTIVATION_DAYS,IS_PUSH_INVALID,AVG_READ_DUR_1D,AVG_READ_DUR_3D,AVG_READ_DUR_7D,AVG_READ_DUR_14D,AVG_BOOK_CURRENCY_ANT_1D,AVG_BOOK_CURRENCY_ANT_3D,AVG_BOOK_CURRENCY_ANT_7D,AVG_BOOK_CURRENCY_ANT_14D,IS_CHARGE,SIGNIN_DAYS_7D,ACTIVE_DAYS_14D,CHARGE_ANT_7D,PROMOTION_TYPE,CREATE_TIME,UPDATE_TIME from dm_crowd_push_wide_acc_d

没有执行过show(), 后续的代码会write到hdfs。都是action操作,原理是一样的。

@banmoy
Copy link
Collaborator

banmoy commented Aug 15, 2023

@JialeHe 方便发一下StarRocks表的DDL吗,connector测试里倒是有验证 date类型数据的读取,我看下差别是什么

测试case: https://github.com/StarRocks/starrocks-connector-for-apache-spark/blob/main/src/test/java/com/starrocks/connector/spark/sql/ReadWriteITTest.java#L362C11-L362C11

@JialeHe
Copy link
Author

JialeHe commented Aug 15, 2023

@banmoy

CREATE TABLE `dm_crowd_push_wide_acc_d` (
  `dt` date,
  `source_uid` varchar(65533),
  `product` varchar(65533),
  `project` varchar(65533),
  `category_type` int(11),
  `unactive_days` int(11),
  `group_id` int(11),
  `region_label` varchar(65533),
  `use_time_label` varchar(65533),
  `interactive_user_label` varchar(65533),
  `coin_label` varchar(65533),
  `push_tunnel` varchar(65533),
  `push_id` varchar(65533),
  `brand` varchar(65533),
  `device_model` varchar(65533),
  `version_code` varchar(65533),
  `os_version` varchar(65533),
  `push_click_rate` varchar(65533),
  `account_id` bigint(20),
  `member` varchar(65533),
  `acc_no_pay_days` bigint(20),
  `max_pay_ant` bigint(20),
  `last_pay_ant` bigint(20),
  `latest_package_channel` varchar(65533),
  `first_follow_days` bigint(20),
  `read_type` int(11),
  `read_day_book_name_str` varchar(65533),
  `first_activation_days` bigint(20),
  `is_push_invalid` int(11),
  `avg_read_dur_1d` double,
  `avg_read_dur_3d` double,
  `avg_read_dur_7d` double,
  `avg_read_dur_14d` double,
  `avg_book_currency_ant_1d` double,
  `avg_book_currency_ant_3d` double,
  `avg_book_currency_ant_7d` double,
  `avg_book_currency_ant_14d` double,
  `is_charge` int(11),
  `signin_days_7d` bigint(20),
  `active_days_14d` int(11),
  `charge_ant_7d` double,
  `promotion_type` varchar(65533),
  `create_time` varchar(65533),
  `update_time` varchar(65533)
) ENGINE = OLAP DUPLICATE KEY(
  `dt`,
  `source_uid`,
  `product`,
  `project`,
  `category_type`,
  `unactive_days`,
  `group_id`
) COMMENT "OLAP" PARTITION BY RANGE(`dt`) (
  PARTITION p20230804
  VALUES
    [("2023-08-04"), ("2023-08-05")),
PARTITION p20230805 VALUES [("2023-08-05"), ("2023-08-06")),
PARTITION p20230806 VALUES [("2023-08-06"), ("2023-08-07")),
PARTITION p20230807 VALUES [("2023-08-07"), ("2023-08-08")),
PARTITION p20230808 VALUES [("2023-08-08"), ("2023-08-09")),
PARTITION p20230809 VALUES [("2023-08-09"), ("2023-08-10")))
DISTRIBUTED BY HASH(`source_uid`) BUCKETS 120 
PROPERTIES (
"replication_num" = "2",
"dynamic_partition.enable" = "false",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.time_zone" = "Asia/Shanghai",
"dynamic_partition.start" = "-3",
"dynamic_partition.end" = "3",
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "120",
"in_memory" = "false",
"storage_format" = "DEFAULT",
"enable_persistent_index" = "true",
"compression" = "ZSTD"
);

@JialeHe
Copy link
Author

JialeHe commented Aug 15, 2023

@banmoy 按照UT中的逻辑
我改了一下复现的代码,确实是能走通。
感觉应该是Connector从Starrocks读出来的DT字段是String类型的,而不是Date类型
image

@banmoy
Copy link
Collaborator

banmoy commented Aug 15, 2023

@JialeHe 按照你的建表语句和spark查询,我验证了下,还是没有复现 ,麻烦确认下这个是不是符合你的场景?

StarRock表插入数据,为了简单,只插入了包含date的duplicate key列

insert into edw_dm.dm_crowd_push_wide_acc_d (dt, source_uid, product, project, category_type, unactive_days, group_id) values ("2023-08-09", "a", "b", "c", 1, 2, 3);

spark代码

SparkSession spark = SparkSession
                .builder()
                .master("local[1]")
                .appName("test")
                .getOrCreate();

        Dataset<Row> readDf = spark.read().format("starrocks")
                .option("starrocks.table.identifier", "edw_dm.dm_crowd_push_wide_acc_d")
                .option("starrocks.fenodes", FE_HTTP)
                .option("starrocks.fe.jdbc.url", FE_JDBC)
                .option("user", USER)
                .option("password", PASSWORD)
                .load();
        readDf.createOrReplaceTempView("dm_crowd_push_wide_acc_d");
        String finalSql = "select DT,SOURCE_UID,PRODUCT,PROJECT,CATEGORY_TYPE,UNACTIVE_DAYS,GROUP_ID,REGION_LABEL,USE_TIME_LABEL,INTERACTIVE_USER_LABEL,COIN_LABEL,PUSH_TUNNEL,PUSH_ID,BRAND,DEVICE_MODEL,VERSION_CODE,OS_VERSION,PUSH_CLICK_RATE,ACCOUNT_ID,MEMBER,ACC_NO_PAY_DAYS,MAX_PAY_ANT,LAST_PAY_ANT,LATEST_PACKAGE_CHANNEL,FIRST_FOLLOW_DAYS,READ_TYPE,READ_DAY_BOOK_NAME_STR,FIRST_ACTIVATION_DAYS,IS_PUSH_INVALID,AVG_READ_DUR_1D,AVG_READ_DUR_3D,AVG_READ_DUR_7D,AVG_READ_DUR_14D,AVG_BOOK_CURRENCY_ANT_1D,AVG_BOOK_CURRENCY_ANT_3D,AVG_BOOK_CURRENCY_ANT_7D,AVG_BOOK_CURRENCY_ANT_14D,IS_CHARGE,SIGNIN_DAYS_7D,ACTIVE_DAYS_14D,CHARGE_ANT_7D,PROMOTION_TYPE,CREATE_TIME,UPDATE_TIME from dm_crowd_push_wide_acc_d";
        Dataset<Row> result = spark.sql(finalSql);
        result.show();
        spark.stop();

运行spark输出

+----------+----------+-------+-------+-------------+-------------+--------+------------+--------------+----------------------+----------+-----------+-------+-----+------------+------------+----------+---------------+----------+------+---------------+-----------+------------+----------------------+-----------------+---------+----------------------+---------------------+---------------+---------------+---------------+---------------+----------------+------------------------+------------------------+------------------------+-------------------------+---------+--------------+---------------+-------------+--------------+-----------+-----------+
|        DT|SOURCE_UID|PRODUCT|PROJECT|CATEGORY_TYPE|UNACTIVE_DAYS|GROUP_ID|REGION_LABEL|USE_TIME_LABEL|INTERACTIVE_USER_LABEL|COIN_LABEL|PUSH_TUNNEL|PUSH_ID|BRAND|DEVICE_MODEL|VERSION_CODE|OS_VERSION|PUSH_CLICK_RATE|ACCOUNT_ID|MEMBER|ACC_NO_PAY_DAYS|MAX_PAY_ANT|LAST_PAY_ANT|LATEST_PACKAGE_CHANNEL|FIRST_FOLLOW_DAYS|READ_TYPE|READ_DAY_BOOK_NAME_STR|FIRST_ACTIVATION_DAYS|IS_PUSH_INVALID|AVG_READ_DUR_1D|AVG_READ_DUR_3D|AVG_READ_DUR_7D|AVG_READ_DUR_14D|AVG_BOOK_CURRENCY_ANT_1D|AVG_BOOK_CURRENCY_ANT_3D|AVG_BOOK_CURRENCY_ANT_7D|AVG_BOOK_CURRENCY_ANT_14D|IS_CHARGE|SIGNIN_DAYS_7D|ACTIVE_DAYS_14D|CHARGE_ANT_7D|PROMOTION_TYPE|CREATE_TIME|UPDATE_TIME|
+----------+----------+-------+-------+-------------+-------------+--------+------------+--------------+----------------------+----------+-----------+-------+-----+------------+------------+----------+---------------+----------+------+---------------+-----------+------------+----------------------+-----------------+---------+----------------------+---------------------+---------------+---------------+---------------+---------------+----------------+------------------------+------------------------+------------------------+-------------------------+---------+--------------+---------------+-------------+--------------+-----------+-----------+
|2023-08-09|         a|      b|      c|            1|            2|       3|        null|          null|                  null|      null|       null|   null| null|        null|        null|      null|           null|      null|  null|           null|       null|        null|                  null|             null|     null|                  null|                 null|           null|           null|           null|           null|            null|                    null|                    null|                    null|                     null|     null|          null|           null|         null|          null|       null|       null|
+----------+----------+-------+-------+-------------+-------------+--------+------------+--------------+----------------------+----------+-----------+-------+-----+------------+------------+----------+---------------+----------+------+---------------+-----------+------------+----------------------+-----------------+---------+----------------------+---------------------+---------------+---------------+---------------+---------------+----------------+------------------------+------------------------+------------------------+-------------------------+---------+--------------+---------------+-------------+--------------+-----------+-----------+

@JialeHe
Copy link
Author

JialeHe commented Aug 15, 2023

@banmoy 请问你这段代码使用的是Connector 1.1.0 版本么?

@banmoy
Copy link
Collaborator

banmoy commented Aug 15, 2023

是的

<dependency>
      <groupId>com.starrocks</groupId>
      <artifactId>starrocks-spark-connector-3.2_2.12</artifactId>
      <version>1.1.0</version>
</dependency>

@banmoy
Copy link
Collaborator

banmoy commented Aug 15, 2023

你用我的代码例子跑还是有问题吗

@JialeHe
Copy link
Author

JialeHe commented Aug 16, 2023

我今天试一下

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants