Flink-clickhouse

demo

使用flink的JDBCAppendTableSink,和写入mysql方式都类似。
1、mvn中引入jar

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<properties>
<flink.version>1.10.0</flink.version>
<scala.version>2.11.12</scala.version>
</properties>

<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_${scala.tools.version}</artifactId>
<version>${flink.version}</version>
</dependency>
```
2、代码

fields = [‘uid’, ‘type’, ‘stime’, ‘r_query’, ‘tokens’,
‘e’, ‘source’, ‘ip’, ‘position’, ‘platform’,
‘query’, ‘doc_qipu_pair’, ‘rpage’, ‘bucket’,
‘page’, ‘clk_type’, ‘dt’, ‘dh’]
tableName = “search_etl”
sql = “insert into search_etl(uid, type, stime, r_query, tokens, e, source, ip, position, “ +
“platform, query, doc_qipu_pair, rpage, bucket, page, clk_type, dt, dh) “ +
“values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)”

val CH_DRIVER = “ru.yandex.clickhouse.ClickHouseDriver”
var inputFields: Array[String] = _
var fieldsSchema: List[TypeInformation[_]] = _
var sql: String = _
var sinkJdbc: JDBCAppendTableSink = _

sql = taskInfo.getSql.getSql
inputFields = taskInfo.getProjecting.getFields.asScala.toArray
fieldsSchema = List.fill(inputFields.length)(BasicTypeInfo.STRING_TYPE_INFO)
sinkJdbc = JDBCAppendTableSink.builder()
.setDrivername(CH_DRIVER)
.setUsername(CH_USER)
.setPassword(CH_PWD)
.setDBUrl(CH_DRIVER_URL)
.setQuery(sql)
.setParameterTypes(fieldsSchema.toArray: _*)
.setBatchSize(1000)
.build()

val sourceStream: DataStream[String] = addSource

val stream: DataStream[Row] = {
implicit val tpe: TypeInformation[Row] = new RowTypeInfo(fieldsSchema.toArray, inputFields)
sourceStream
.map[Row](new EtlLoaderMap(inputFields.slice(0, inputFields.length - 2)))
.uid(“parse_json_2_hive”)
}

sinkJdbc.emitDataStream(stream.javaStream)      
1
3、其他

import com.couchbase.client.java.document.json.JsonObject
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.types.Row
import org.joda.time.DateTime
import scala.collection.JavaConverters._

class EtlLoaderMap(inputFields: Array[String]) extends MapFunction[String, Row]{

override def map(value: String): Row = {

val values = JsonObject.fromJson(value).toMap
val minutesAgo =  BehaviorHistoryMergeUtils.minutesLater(0L, -120)
val pingbackTime = values.get("stime").toString.toLong
val row = new Row(inputFields.length + 2)
values.asScala.foreach(f => if(inputFields.indexOf(f._1) >= 0) row.setField(inputFields.indexOf(f._1), f._2.toString) else println("wrong field"+f._1))
if (pingbackTime >= minutesAgo) {
  row.setField(inputFields.length, new DateTime(pingbackTime).toString("yyyy-MM-dd"))
  row.setField(inputFields.length + 1, new DateTime(pingbackTime).toString("HH"))
} else {
  row.setField(inputFields.length, "2012-12-21")
  row.setField(inputFields.length + 1, "24")
}

row

}

}

### 其他文章
https://mp.weixin.qq.com/s/yEnsXPtERMh7tBwvxx8GVw (比较详细,比较好)
https://mp.weixin.qq.com/s/afSQPcTMePNNLoDSgMBRWg
### 参考
https://wchch.github.io/2019/11/27/%E4%BD%BF%E7%94%A8Flink-SQL%E8%AF%BB%E5%8F%96kafka%E6%95%B0%E6%8D%AE%E5%B9%B6%E9%80%9A%E8%BF%87JDBC%E6%96%B9%E5%BC%8F%E5%86%99%E5%85%A5Clickhouse%E5%AE%9E%E6%97%B6%E5%9C%BA%E6%99%AF%E7%9A%84%E7%AE%80%E5%8D%95%E5%AE%9E%E4%BE%8B/ (flink sql)

https://blog.csdn.net/wflh323/article/details/95794719 (写mysql)
https://www.cnblogs.com/qiu-hua/p/13871460.html (flink1.10&1.11写入ch的例子)

文章作者:Lily

原始链接:/2020/05/11/Flink-clickhouse/

版权说明:转载请保留原文链接及作者。