Skip to content

Latest commit

 

History

History
154 lines (111 loc) · 3.5 KB

catalog.md

File metadata and controls

154 lines (111 loc) · 3.5 KB

cataLog操作说明

依懒jar参考官方文档

https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/connectors/hive/

不同hive所需jar不一样

官方catalog连接配置说明:

https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/connectors/hive/#%E8%BF%9E%E6%8E%A5%E5%88%B0hive

可以把jar放到 lib 下 也可以放到http服务器上 然后在使用的时候添加http服务。

demo1

 CREATE CATALOG testmyhive WITH (
    'type' = 'hive',
    'default-database' = 'zhp',
    'hive-conf-dir' = '/Users/huipeizhu/hive-conf'
);

USE CATALOG testmyhive;

 CREATE TABLE  source_table_01 (
 f0 INT,
 f1 INT,
 f2 STRING
) WITH (
 'connector' = 'datagen',
 'rows-per-second'='5'
);
 
 
CREATE TABLE   print_table_01 (
 f0 INT,
 f1 INT,
 f2 STRING
) WITH (
 'connector' = 'print'
);

insert into print_table_01 select f0,f1,f2 from source_table_01;
 
SHOW TABLES;

SHOW FUNCTIONS;

SHOW CATALOGS;

SHOW DATABASES;

demo2

如果已经使用过了可以直接

 CREATE CATALOG testmyhive WITH (
    'type' = 'hive',
    'default-database' = 'zhp',
    'hive-conf-dir' = '/Users/huipeizhu/hive-conf'
);

USE CATALOG testmyhive;

insert into print_table_01 select f0,f1,f2 from source_table_01;

demo3

https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/connectors/hive/hive_read_write.html

流数据结果sink到hive

注意写到hive必须要开启checkpoint

 CREATE CATALOG testmyhive WITH (
    'type' = 'hive',
    'default-database' = 'zhp',
    'hive-conf-dir' = '/Users/huipeizhu/hive-conf'
);

USE CATALOG testmyhive;

drop table IF EXISTS item_test;

drop table IF EXISTS hive_flink_table;

create table item_test ( 
     itemId BIGINT,
     price BIGINT,
     proctime AS PROCTIME ()
 )with ( 
     'connector' = 'kafka',
     'topic' = 'flink-catalog-v1',  
     'properties.bootstrap.servers'='127.0.0.1:9092',
     'properties.group.id'='test-1',
     'format'='json',
     'scan.startup.mode' = 'earliest-offset'
 );


SET table.sql-dialect=hive;

CREATE TABLE hive_flink_table (
 itemId BIGINT, 
 price BIGINT, 
 ups string
) TBLPROPERTIES (
  'sink.rolling-policy.rollover-interval'='1min',
  'sink.partition-commit.trigger'='process-time',
  'sink.partition-commit.policy.kind'='metastore,success-file'
);

SET table.sql-dialect=default;

insert into hive_flink_table select itemId,price, 'XXXXaaa' as ups from item_test;

kafka 生产者 数据demo 用于测试用

public class KafkaSend {
    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put("bootstrap.servers", "127.0.0.1:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        Producer<String, String> producer = new KafkaProducer<String, String>(props);

        Map<String, Long> map = new HashMap<>();

        for (long i = 0; i <10000 ; i++) {
            map.put("itemId", i);
            map.put("price", i+1);
            producer.send(new ProducerRecord<String, String>("flink-catalog-v1", null, JSON.toJSONString(map)));
            producer.flush();
            Thread.sleep(1000L);
        }

        producer.close();
    }
}