kafka集成structure streaming总结

1.集成配置

我这边使用maven构建项目,大家根据实际修改。Pom文件配置,截取了需要的部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kudu/kudu-client -->
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-client</artifactId>
<version>1.10.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>2.3.0</version>
</dependency>


打包配置,后面我会解释为什么打包这么配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/services/org.apache.spark.sql.sources.DataSourceRegister</resource>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>

2.demo代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", ConfigConstant.zkServers)//kafka地址
.option("subscribe", ProjectConstant.KAFKA_TOPIC)//kafka topic
.load()

val query = df.writeStream
.foreach(new KuduForeachWriter(ConfigConstant.kuduMaster, ProjectConstant.KUDU_MY_FIRST_TABLE))//kudu地址,kudu表名,这里通过structure streaming的 //ForeachWriter写入数据到kudu,详细可以去spark官网了解
// .format("consle")
.outputMode("update")
.start()

query.awaitTermination()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class KuduForeachWriter(val kuduMaster: String, val kuduTableName: String)
extends ForeachWriter[Row] {

var client: KuduClient = _
var kuduSession: KuduSession = _
var kuduTable: KuduTable = _

def open(partitionId: Long, version: Long): Boolean = {
this.client = new KuduClient.KuduClientBuilder(kuduMaster).build()
this.kuduTable = client.openTable(kuduTableName);
this.kuduSession = client.newSession()
true
}

def process(record: Row): Unit = {
val upsert = this.kuduTable.newUpsert();
val row: PartialRow = upsert.getRow();
row.addString(0, record.getString(0));
if (record.isNullAt(1)) {
row.setNull(1)
} else {
row.addInt(1, record.getInt(1));
}
this.kuduSession.apply(upsert);
}

def close(errorOrNull: Throwable): Unit = {
this.kuduSession.close()
this.client.shutdown()
}
}

3.执行脚本

下面是提交的脚本,大家按自己的环境修改即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#!/bin/sh
env=$1
echo "当前环境>>>>>>$env"

export SPARK_KAFKA_VERSION=0.10

spark2-submit \
--class weshare.data.center.taskinit.Kafka2KuduDemoInit \
--master yarn \
--deploy-mode client \
--driver-memory 1G \
--num-executors 2 \
--executor-memory 512M \
--executor-cores 1 \
--conf "spark.driver.extraJavaOptions=-Dscala.env=$env" \
--conf "spark.executor.extraJavaOptions=-Dscala.env=$env" \
data-application-0.0.1.jar

4.出现问题以及解决方案

第一个问题,出现这个错误

Exception in thread “main” java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at http://spark.apache.org/third-party-projects.html
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:553)
at org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:89)
at org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:89)
at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:198)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:90)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:90)
at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:150)
at com.example.KafkaConsumer.main(KafkaConsumer.java:21)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$runMain(SparkSubmit.scala:782)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: kafka.DefaultSource
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.spark.sql.execution.datasources.DataSource$anonfun$22$anonfun$apply$14.apply(DataSource.scala:537)
at org.apache.spark.sql.execution.datasources.DataSource$anonfun$22$anonfun$apply$14.apply(DataSource.scala:537)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.sql.execution.datasources.DataSource$anonfun$22.apply(DataSource.scala:537)
at org.apache.spark.sql.execution.datasources.DataSource$anonfun$22.apply(DataSource.scala:537)
at scala.util.Try.orElse(Try.scala:84)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:537)
… 17 more
这个问题需要我们修改pom文件的打包配置,也就是这一部分,

1
2
3
4
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/services/org.apache.spark.sql.sources.DataSourceRegister</resource>
</transformer>

需要多个包里面的DataSourceRegister以append的方式打包,而不是覆盖,这样我们就不会出现找不到类了。

第二个问题

19/01/15 19:36:40 WARN consumer.ConsumerConfig: The configuration max.poll.records = 1 was supplied but isn’t a known config.
19/01/15 19:36:40 INFO utils.AppInfoParser: Kafka version : 0.9.0-kafka-2.0.2
19/01/15 19:36:40 INFO utils.AppInfoParser: Kafka commitId : unknown
19/01/15 19:36:40 ERROR streaming.StreamExecution: Query queryMyBatchTopicData [id = 25b0620e-20b5-4efe-babb-dda94ef3ccc6, runId = 013d0674-23f7-4ebe-a5fb-84c8699ea1b9] terminated with error
java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;)V
at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:63)
at org.apache.spark.sql.kafka010.KafkaOffsetReader.createConsumer(KafkaOffsetReader.scala:297)
at org.apache.spark.sql.kafka010.KafkaOffsetReader.(KafkaOffsetReader.scala:78)
at org.apache.spark.sql.kafka010.KafkaSourceProvider.createSource(KafkaSourceProvider.scala:88)
at org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:243)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2$$anonfun$applyOrElse$1.apply(StreamExecution.scala:158)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2$$anonfun$applyOrElse$1.apply(StreamExecution.scala:155)
at scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:194)
at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:80)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2.applyOrElse(StreamExecution.scala:155)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2.applyOrElse(StreamExecution.scala:153)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
这个问题是kafka版本问题导致的,可以参考官方的解决方案。地址如下:https://www.cloudera.com/documentation/spark2/latest/topics/spark2_kafka.html#running_jobs。
主要有两种解决方式
方案一:在执行的提交脚本中添加参数

1
2
3
4
5
6
7
8
# Set the environment variable for the duration of your shell session:
export SPARK_KAFKA_VERSION=0.10
spark-submit arguments

# Or:

# Set the environment variable for the duration of a single command:
SPARK_KAFKA_VERSION=0.10 spark-submit arguments

方案二:修改cdh,spark2的配置

undefined