Berbagi teknologi

Analisis singkat tentang proses dan prinsip streaming pesan Kafka-Stream

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

Berikut ini adalah kombinasi kasusnya: menghitung jumlah kemunculan kata dalam pesan untuk menguji dan mengilustrasikan proses eksekusi pemrosesan streaming pesan kafka

Ketergantungan Maven

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <exclusions>
                <exclusion>
                    <artifactId>connect-json</artifactId>
                    <groupId>org.apache.kafka</groupId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.kafka</groupId>
                    <artifactId>kafka-clients</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
        </dependency>
    </dependencies>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

Persiapan

Pertama tulis dan buat tiga kelas, masing-masing sebagai penghasil pesan, konsumen pesan, dan pemroses aliran.
KafkaStreamProducer: Produser pesan

public class KafkaStreamProducer {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties properties = new Properties();
        //kafka的连接地址
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.246.128:9092");
        //发送失败,失败的重试次数
        properties.put(ProducerConfig.RETRIES_CONFIG, 5);
        //消息key的序列化器
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        //消息value的序列化器
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        for (int i = 0; i < 5; i++) {
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>("kafka-stream-topic-input", "hello kafka");
            producer.send(producerRecord);
        }

        producer.close();

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

Produser pesan melapor ke topik tersebutkafka-stream-topic-inputDikirim lima kalihello kafka
KafkaStreamConsumer: Pesan konsumen

public class KafkaStreamConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        //kafka的连接地址
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.246.128:9092");
        //消费者组
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
        //消息的反序列化器
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        //手动提交偏移量
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        //订阅主题
        consumer.subscribe(Collections.singletonList("kafka-stream-topic-output"));

        try {
            while (true) {
                ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    System.out.println("consumerRecord.key() = " + consumerRecord.key());
                    System.out.println("consumerRecord.value() = " + consumerRecord.value());
                }
                // 异步提交偏移量
                consumer.commitAsync();
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 同步提交偏移量
            consumer.commitSync();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

KafkaStreamQuickStart: Kelas pemrosesan streaming

public class KafkaStreamQuickStart {

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.246.128:9092");
        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-quickstart");

        StreamsBuilder streamsBuilder = new StreamsBuilder();

        //流式计算
        streamProcessor(streamsBuilder);

        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties);

        kafkaStreams.start();
    }

    /**
     * 消息格式:hello world hello world
     * 配置并处理流数据。
     * 使用StreamsBuilder创建并配置KStream,对输入的主题中的数据进行处理,然后将处理结果发送到输出主题。
     * 具体处理包括:分割每个消息的值,按值分组,对每个分组在10秒的时间窗口内进行计数,然后将结果转换为KeyValue对并发送到输出主题。
     *
     * @param streamsBuilder 用于构建KStream对象的StreamsBuilder。
     */
    private static void streamProcessor(StreamsBuilder streamsBuilder) {
        // 从"kafka-stream-topic-input"主题中读取数据流
        KStream<String, String> stream = streamsBuilder.stream("kafka-stream-topic-input");
        System.out.println("stream = " + stream);
        // 将每个值按空格分割成数组,并将数组转换为列表,以扩展单个消息的值
        stream.flatMapValues((ValueMapper<String, Iterable<String>>) value -> {
                    String[] valAry = value.split(" ");
                    return Arrays.asList(valAry);
                })
                // 按消息的值进行分组,为后续的窗口化计数操作做准备
                .groupBy((key, value) -> value)
                // 定义10秒的时间窗口,在每个窗口内对每个分组进行计数
                .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
                .count()
                // 将计数结果转换为流,以便进行进一步的处理和转换
                .toStream()
                // 显示键值对的内容,并将键和值转换为字符串格式
                .map((key, value) -> {
                    System.out.println("key = " + key);
                    System.out.println("value = " + value);
                    return new KeyValue<>(key.key().toString(), value.toString());
                })
                // 将处理后的流数据发送到"kafka-stream-topic-output"主题
                .to("kafka-stream-topic-output");
    }
    
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54

Kelas pemrosesan ini pertama-tama dimulai dari topikkafka-stream-topic-inputDapatkan data pesan dari , dan kirimkan ke topik setelah diproseskafka-stream-topic-output, dan kemudian konsumen pesanKafkaStreamConsumermengkonsumsi

Hasil dari

Masukkan deskripsi gambar di sini
Masukkan deskripsi gambar di sini

Proses pemrosesan streaming dan penjelasan prinsipnya

Fase awal

Saat memasukkan topik darikafka-stream-topic-input Saat membaca aliran data, setiap pesan adalah pasangan nilai kunci.Misalkan kunci dari pesan masukan adalahnullatau string tertentu, bergantung pada cara pesan dikirim ke topik masukan.

KStream<String, String> stream = streamsBuilder.stream("kafka-stream-topic-input");
  • 1

Pisahkan nilai pesan

menggunakanflatMapValues metode membagi nilai pesan, tetapi operasi ini tidak mengubah kunci pesan.Jika kunci untuk memasukkan pesan adalahnull, maka pada tahap ini kunci pesannya masih adanull

stream.flatMapValues((ValueMapper<String, Iterable<String>>) value -> {
    String[] valAry = value.split(" ");
    return Arrays.asList(valAry);
})
  • 1
  • 2
  • 3
  • 4

Kelompokkan berdasarkan nilai pesan

Di Kafka Streams, saat menggunakangroupBy Saat suatu metode mengelompokkan suatu aliran, metode tersebut sebenarnya menentukan kunci baru, yang akan digunakan untuk operasi windowing dan operasi agregasi selanjutnya.pada kasus inigroupByMetode yang digunakan untuk mengelompokkan pesan berdasarkan nilai:

.groupBy((key, value) -> value)
  • 1

Ini berarti bahwa setelah operasi pengelompokan, kunci setiap pesan dalam aliran diatur ke nilai pesan tersebut.Oleh karena itu, saat Anda menindaklanjutinyamapTerlihat pada metodenyakeyparameter, inikeysebenarnya adalah nilai asli pesan tersebut karena digroupBySetelah itu, nilai pesan menjadi kuncinya.

Tentukan jendela waktu dan hitung

Pada tahap ini, pesan dijendelakan dan dihitung, namun kuncinya tetap tidak berubah.

.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
.count()
  • 1
  • 2

Ubah hasil hitungan menjadi streaming

Saat mengubah hasil hitungan menjadi stream, kuncinya masih sama seperti sebelumnya saat mengelompokkan

.toStream()
  • 1

Memproses dan mengubah hasil

adamapmetodenya, Anda tahukeyParameter sebenarnya adalah kunci yang dikelompokkan, yang merupakan nilai asli pesan:

.map((key, value) -> {
    System.out.println("key = " + key);
    System.out.println("value = " + value);
    return new KeyValue<>(key.key().toString(), value.toString());
})
  • 1
  • 2
  • 3
  • 4
  • 5

mapdalam metodekey.key().toString()adalah untuk mendapatkan representasi string dari kunci, sementaravalue.toString()adalah mengonversi nilai hitungan menjadi string.

Kirim data yang diproses ke topik keluaran

.to("kafka-stream-topic-output");
  • 1