spark streaming join 维度表解决方案

多数据源Join维度表思路

很多时候流上报过来的数据只有id,这时需要我们join维度表,这里有三种思路:
1.在数据源端进行join,如在kafka写入时进行join,不过这需要数据源端配合。
2.在计算引擎上join,如spark streaming/flink。
3.在结果端,如结果写入es/mysql,可在此时根据key join。
这三种方式大家按实际情况进行取舍,各自有自己的适用场景。我这里主要介绍在
计算引擎端进行join。

计算引擎端join

计算引擎端join我想到了有三种方式,下面我一一的给大家介绍。

流与完全静态数据Join

流与完全静态数据Join有两种方式,一种是RDD join,另一种是broadcast join。
不过这种方式,我觉得应该是很少用的,很少有完全不变的数据。而且这种方式完
全可以用后面提到的方式进行替代。
RDD join

1
2
3
4
val dataset: RDD[String, String] = ...
val kafkaDStream: DStream[String, String] = ...
val joinedStream = kafkaDStream.transform { rdd => rdd.join(dataset) }
//val joinedStream = kafkaDStream.foreachRDD(_.join(dataset)

broadcast join

1
2
3
4
5
6
7
8
val dataset: RDD[String, String] = ...
val broadcastData=ssc.sparkContext.broadcast(dataset.collect())
val kafkaDStream: DStream[String, String] = ...
val joinedStream = kafkaDStream.mapPartitions(part=>{
val dataset = broadcastData.value
part.map(row=>{
...
})

流与半静态数据Join

半静态数据指的是放在redis等的数据,会被外部更新。思路是,根据redis分区策略,
key进行repartition,然后每个partition去对应redis机器上取得数据,然后组合。
这里的话我想的是如果redis的分区策略和kafka的分区策略一致,就不用repar了。
这里外部的缓存还可以根据key的热度进行缓存,不用缓存全部数据,数据量大的时候
也不用担心。

1
2
3
4
5
6
7
val kafkaDStream: DStream[String, String] = ...
val result=kafkaDStream.mapPartitions(part=>{
val redisCli=connToRedis("localhost",6111,1000,10)
part.map(row=>{
...
})
})

流与流join

流与流join,就是两个实时流进行join。我们之前说过流与静态数据join有更好的替
代方案,这里我们将维度数据以流的方式查出来,并且batch时间设置比较长的时间,
那么这个流就可以作为维度流与事实流进行join,并且可以按照我们设置的batch时
间来更新数据。如果我们维度表比较小可以考虑这种方式,维度表比较大的话更建议
第二种方式。

1
2
3
val kafkaDStream1: DStream[String, String] = ...
val kafkaDStream2: DStream[String, String] = ...
val joinedStream = kafkaDStream1.join(kafkaDStream2)

undefined