Skip to content

Latest commit

 

History

History
150 lines (135 loc) · 3.78 KB

cassandraSink.md

File metadata and controls

150 lines (135 loc) · 3.78 KB

1.格式:

CREATE TABLE tableName(
    colName colType,
    ...
    colNameX colType
 )WITH(
    type ='cassandra',
    address ='ip:port[,ip:port]',
    userName ='userName',
    password ='pwd',
    database ='databaseName',
    tableName ='tableName',
    parallelism ='parllNum'
 );

2.支持版本

cassandra-3.x

3.表结构定义

参数名称 含义
tableName 在 sql 中使用的名称;即注册到flink-table-env上的名称
colName 列名称
colType 列类型 colType支持的类型

4.参数

参数名称 含义 是否必填 默认值
type 表明 输出表类型 cassandra
address 连接cassandra数据库 jdbcUrl
userName cassandra连接用户名
password cassandra连接密码
tableName cassandra表名称
database cassandra表名称
parallelism 并行度设置 1
maxRequestsPerConnection 每个连接允许的并发请求数 1
coreConnectionsPerHost 每台主机连接的核心数 8
maxConnectionsPerHost Cassandra集群里的每个机器都最多连接数 32768
maxQueueSize Cassandra队列大小 100000
readTimeoutMillis Cassandra读超时 60000
connectTimeoutMillis Cassandra连接超时 60000
poolTimeoutMillis Cassandra线程池超时 60000

5.完整样例:

CREATE TABLE MyTable(
    id bigint,
    name varchar,
    address varchar
)WITH(
    type = 'kafka10',
    bootstrapServers = '172.16.101.224:9092',
    zookeeperQuorm = '172.16.100.188:2181/kafka',
    offsetReset = 'latest',
    topic = 'tiezhu_test_in2',
    timezone = 'Asia/Shanghai',
    topicIsPattern = 'false',
    parallelism = '1'
);

CREATE TABLE sideTable(
    id bigint,
    message varchar,
    PRIMARY KEY(id),
    PERIOD FOR SYSTEM_TIME
)WITH(
    type ='cassandra',
    address ='192.168.80.106:9042, 192.168.80.107:9042',
    database ='tiezhu',
    tableName ='stu',
    userName='cassandra',
    password='cassandra',
    cache ='LRU',
    parallelism ='1',
    partitionedJoin='false'
);

CREATE TABLE MyResult(
    id bigint,
    name varchar,
    address varchar,
    message varchar
 )WITH(
    type ='cassandra',
    address ='192.168.80.106:9042,192.168.80.107:9042',
    userName='cassandra',
    password='cassandra',
    database ='tiezhu',
    tableName ='stu_out',
    parallelism ='1'
);

insert
into
    MyResult
        select
            t1.id AS id,
            t1.name AS name,
            t1.address AS address,
            t2.message AS message
    from
    (
        select
        id,
        name,
        address
        from
            MyTable
        ) t1
        join    sideTable t2
        on t1.id = t2.id;   

6.结果表数据展示

cqlsh:tiezhu> desc stu_out

CREATE TABLE tiezhu.stu_out (
    id int PRIMARY KEY,
    address text,
    message text,
    name text
) WITH bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';


cqlsh:tiezhu> select * from stu_out limit 1;

 id | address    | message          | name
----+------------+------------------+----------
 23 | hangzhou23 | flinkStreamSql23 | tiezhu23

(1 rows)