demo
使用flink的JDBCAppendTableSink
,和写入mysql方式都类似。
1、mvn中引入jar
1 | <properties> |
fields = [‘uid’, ‘type’, ‘stime’, ‘r_query’, ‘tokens’,
‘e’, ‘source’, ‘ip’, ‘position’, ‘platform’,
‘query’, ‘doc_qipu_pair’, ‘rpage’, ‘bucket’,
‘page’, ‘clk_type’, ‘dt’, ‘dh’]
tableName = “search_etl”
sql = “insert into search_etl(uid, type, stime, r_query, tokens, e, source, ip, position, “ +
“platform, query, doc_qipu_pair, rpage, bucket, page, clk_type, dt, dh) “ +
“values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)”
val CH_DRIVER = “ru.yandex.clickhouse.ClickHouseDriver”
var inputFields: Array[String] = _
var fieldsSchema: List[TypeInformation[_]] = _
var sql: String = _
var sinkJdbc: JDBCAppendTableSink = _
sql = taskInfo.getSql.getSql
inputFields = taskInfo.getProjecting.getFields.asScala.toArray
fieldsSchema = List.fill(inputFields.length)(BasicTypeInfo.STRING_TYPE_INFO)
sinkJdbc = JDBCAppendTableSink.builder()
.setDrivername(CH_DRIVER)
.setUsername(CH_USER)
.setPassword(CH_PWD)
.setDBUrl(CH_DRIVER_URL)
.setQuery(sql)
.setParameterTypes(fieldsSchema.toArray: _*)
.setBatchSize(1000)
.build()
val sourceStream: DataStream[String] = addSource
val stream: DataStream[Row] = {
implicit val tpe: TypeInformation[Row] = new RowTypeInfo(fieldsSchema.toArray, inputFields)
sourceStream
.map[Row](new EtlLoaderMap(inputFields.slice(0, inputFields.length - 2)))
.uid(“parse_json_2_hive”)
}
sinkJdbc.emitDataStream(stream.javaStream)
1 | 3、其他 |
import com.couchbase.client.java.document.json.JsonObject
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.types.Row
import org.joda.time.DateTime
import scala.collection.JavaConverters._
class EtlLoaderMap(inputFields: Array[String]) extends MapFunction[String, Row]{
override def map(value: String): Row = {
val values = JsonObject.fromJson(value).toMap
val minutesAgo = BehaviorHistoryMergeUtils.minutesLater(0L, -120)
val pingbackTime = values.get("stime").toString.toLong
val row = new Row(inputFields.length + 2)
values.asScala.foreach(f => if(inputFields.indexOf(f._1) >= 0) row.setField(inputFields.indexOf(f._1), f._2.toString) else println("wrong field"+f._1))
if (pingbackTime >= minutesAgo) {
row.setField(inputFields.length, new DateTime(pingbackTime).toString("yyyy-MM-dd"))
row.setField(inputFields.length + 1, new DateTime(pingbackTime).toString("HH"))
} else {
row.setField(inputFields.length, "2012-12-21")
row.setField(inputFields.length + 1, "24")
}
row
}
}
### 其他文章
https://mp.weixin.qq.com/s/yEnsXPtERMh7tBwvxx8GVw (比较详细,比较好)
https://mp.weixin.qq.com/s/afSQPcTMePNNLoDSgMBRWg
### 参考
https://wchch.github.io/2019/11/27/%E4%BD%BF%E7%94%A8Flink-SQL%E8%AF%BB%E5%8F%96kafka%E6%95%B0%E6%8D%AE%E5%B9%B6%E9%80%9A%E8%BF%87JDBC%E6%96%B9%E5%BC%8F%E5%86%99%E5%85%A5Clickhouse%E5%AE%9E%E6%97%B6%E5%9C%BA%E6%99%AF%E7%9A%84%E7%AE%80%E5%8D%95%E5%AE%9E%E4%BE%8B/ (flink sql)
https://blog.csdn.net/wflh323/article/details/95794719 (写mysql)
https://www.cnblogs.com/qiu-hua/p/13871460.html (flink1.10&1.11写入ch的例子)