1.集成配置
我这边使用maven构建项目,大家根据实际修改。Pom文件配置,截取了需要的部分1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kudu/kudu-client -->
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-client</artifactId>
<version>1.10.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>2.3.0</version>
</dependency>
打包配置,后面我会解释为什么打包这么配置1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/services/org.apache.spark.sql.sources.DataSourceRegister</resource>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
2.demo代码
1 |
|
1 | class KuduForeachWriter(val kuduMaster: String, val kuduTableName: String) |
3.执行脚本
下面是提交的脚本,大家按自己的环境修改即可1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 !/bin/sh
env=$1
echo "当前环境>>>>>>$env"
export SPARK_KAFKA_VERSION=0.10
spark2-submit \
--class weshare.data.center.taskinit.Kafka2KuduDemoInit \
--master yarn \
--deploy-mode client \
--driver-memory 1G \
--num-executors 2 \
--executor-memory 512M \
--executor-cores 1 \
--conf "spark.driver.extraJavaOptions=-Dscala.env=$env" \
--conf "spark.executor.extraJavaOptions=-Dscala.env=$env" \
data-application-0.0.1.jar
4.出现问题以及解决方案
第一个问题,出现这个错误
Exception in thread “main” java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at http://spark.apache.org/third-party-projects.html
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:553)
at org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:89)
at org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:89)
at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:198)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:90)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:90)
at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:150)
at com.example.KafkaConsumer.main(KafkaConsumer.java:21)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$runMain(SparkSubmit.scala:782)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: kafka.DefaultSource
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.spark.sql.execution.datasources.DataSource$anonfun$22$anonfun$apply$14.apply(DataSource.scala:537)
at org.apache.spark.sql.execution.datasources.DataSource$anonfun$22$anonfun$apply$14.apply(DataSource.scala:537)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.sql.execution.datasources.DataSource$anonfun$22.apply(DataSource.scala:537)
at org.apache.spark.sql.execution.datasources.DataSource$anonfun$22.apply(DataSource.scala:537)
at scala.util.Try.orElse(Try.scala:84)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:537)
… 17 more
这个问题需要我们修改pom文件的打包配置,也就是这一部分,1
2
3
4<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/services/org.apache.spark.sql.sources.DataSourceRegister</resource>
</transformer>
需要多个包里面的DataSourceRegister以append的方式打包,而不是覆盖,这样我们就不会出现找不到类了。
第二个问题
19/01/15 19:36:40 WARN consumer.ConsumerConfig: The configuration max.poll.records = 1 was supplied but isn’t a known config.
19/01/15 19:36:40 INFO utils.AppInfoParser: Kafka version : 0.9.0-kafka-2.0.2
19/01/15 19:36:40 INFO utils.AppInfoParser: Kafka commitId : unknown
19/01/15 19:36:40 ERROR streaming.StreamExecution: Query queryMyBatchTopicData [id = 25b0620e-20b5-4efe-babb-dda94ef3ccc6, runId = 013d0674-23f7-4ebe-a5fb-84c8699ea1b9] terminated with error
java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;)V
at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:63)
at org.apache.spark.sql.kafka010.KafkaOffsetReader.createConsumer(KafkaOffsetReader.scala:297)
at org.apache.spark.sql.kafka010.KafkaOffsetReader.
at org.apache.spark.sql.kafka010.KafkaSourceProvider.createSource(KafkaSourceProvider.scala:88)
at org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:243)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2$$anonfun$applyOrElse$1.apply(StreamExecution.scala:158)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2$$anonfun$applyOrElse$1.apply(StreamExecution.scala:155)
at scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:194)
at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:80)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2.applyOrElse(StreamExecution.scala:155)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2.applyOrElse(StreamExecution.scala:153)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
这个问题是kafka版本问题导致的,可以参考官方的解决方案。地址如下:https://www.cloudera.com/documentation/spark2/latest/topics/spark2_kafka.html#running_jobs。
主要有两种解决方式
方案一:在执行的提交脚本中添加参数
1 | Set the environment variable for the duration of your shell session: |
方案二:修改cdh,spark2的配置