pom依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>ltd.cmjava</groupId>
<artifactId>Spark</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.0.0</version>
</dependency>
</dependencies>
</project>
Spark 生产者
package ltd.cmjava.spark
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
object SparkKafkaProducer {
def main(args: Array[String]): Unit = {
// 0 配置信息
val properties = new Properties()
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092,hadoop101:9092")
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,classOf[StringSerializer])
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,classOf[StringSerializer])
// 1 创建一个生产者
val producer = new KafkaProducer[String, String](properties)
// 2 发送数据
for (i <- 1 to 5) {
producer.send(new ProducerRecord[String,String]("first","cm"+i))
}
// 3 关闭资源
producer.close()
}
}
Spark 消费者
package ltd.cmjava.spark
import org.apache.spark.SparkConf
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
object SparkKafkaConsumer {
def main(args: Array[String]): Unit = {
// 1 初始化上下文环境
val conf = new SparkConf().setMaster("local[*]").setAppName("spark-kafka")
val ssc = new StreamingContext(conf, Seconds(3))
// 2 消费数据
val kafkapara = Map[String,Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG->"hadoop100:9092,hadoop101:9092",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG->classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG->classOf[StringDeserializer],
ConsumerConfig.GROUP_ID_CONFIG->"test"
)
val KafkaDSteam = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](Set("first"), kafkapara))
val valueDStream = KafkaDSteam.map(record => record.value())
valueDStream.print()
// 3 执行代码 并阻塞
ssc.start()
ssc.awaitTermination()
}
}
测试
测试生产者
# 执行SparkKafkaProducer,看看shell控制台输出
kafka-console-consumer.sh -bootstrap-server hadoop100:9092 --topic first
测试消费者
# 从shell控制台发送消息,执行SparkKafkaConsumer,看看IDEA控制台输出
kafka-console-producer.sh --bootstrap-server hadoop100:9092 --topic first
评论区