标签搜索

目 录CONTENT

文章目录

Kafka集成Spark

陈铭
2023-03-03 / 0 评论 / 1 点赞 / 170 阅读 / 602 字 / 正在检测是否收录...

pom依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>ltd.cmjava</groupId>
    <artifactId>Spark</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>

    </dependencies>

</project>

Spark 生产者

package ltd.cmjava.spark

import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer

object SparkKafkaProducer {

  def main(args: Array[String]): Unit = {

    // 0 配置信息
    val properties = new Properties()
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092,hadoop101:9092")
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,classOf[StringSerializer])
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,classOf[StringSerializer])

    // 1 创建一个生产者
    val producer = new KafkaProducer[String, String](properties)

    // 2 发送数据
    for (i <- 1 to 5) {
      producer.send(new ProducerRecord[String,String]("first","cm"+i))
    }

    // 3 关闭资源
    producer.close()
  }

}

Spark 消费者

package ltd.cmjava.spark

import org.apache.spark.SparkConf
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}

object SparkKafkaConsumer {

  def main(args: Array[String]): Unit = {

    // 1 初始化上下文环境
    val conf = new SparkConf().setMaster("local[*]").setAppName("spark-kafka")
    val ssc = new StreamingContext(conf, Seconds(3))


    // 2 消费数据
    val kafkapara  = Map[String,Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG->"hadoop100:9092,hadoop101:9092",
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG->classOf[StringDeserializer],
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG->classOf[StringDeserializer],
      ConsumerConfig.GROUP_ID_CONFIG->"test"
    )
    val KafkaDSteam = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](Set("first"), kafkapara))

    val valueDStream = KafkaDSteam.map(record => record.value())

    valueDStream.print()

    // 3 执行代码 并阻塞
    ssc.start()
    ssc.awaitTermination()
  }

}

测试

测试生产者

# 执行SparkKafkaProducer,看看shell控制台输出
kafka-console-consumer.sh -bootstrap-server hadoop100:9092 --topic first 

测试消费者

# 从shell控制台发送消息,执行SparkKafkaConsumer,看看IDEA控制台输出
kafka-console-producer.sh --bootstrap-server hadoop100:9092 --topic first
1

评论区