Technologieaustausch

Eine kurze Analyse des Nachrichtenstreaming-Prozesses und der Prinzipien von Kafka-Stream

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

Das Folgende ist eine Kombination von Fällen: Zählen der Anzahl des Vorkommens von Wörtern in Nachrichten, um den Ausführungsprozess der Kafka-Nachrichten-Streaming-Verarbeitung zu testen und zu veranschaulichen

Maven-Abhängigkeit

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <exclusions>
                <exclusion>
                    <artifactId>connect-json</artifactId>
                    <groupId>org.apache.kafka</groupId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.kafka</groupId>
                    <artifactId>kafka-clients</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
        </dependency>
    </dependencies>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

Vorbereitung

Schreiben und erstellen Sie zunächst drei Klassen als Nachrichtenproduzenten, Nachrichtenkonsumenten und Stream-Prozessoren.
KafkaStreamProducer:Nachrichtenproduzent

public class KafkaStreamProducer {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties properties = new Properties();
        //kafka的连接地址
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.246.128:9092");
        //发送失败,失败的重试次数
        properties.put(ProducerConfig.RETRIES_CONFIG, 5);
        //消息key的序列化器
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        //消息value的序列化器
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        for (int i = 0; i < 5; i++) {
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>("kafka-stream-topic-input", "hello kafka");
            producer.send(producerRecord);
        }

        producer.close();

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

Der Nachrichtenproduzent berichtet an das Themakafka-stream-topic-inputFünfmal verschickthello kafka
KafkaStreamConsumer:Nachrichtenkonsument

public class KafkaStreamConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        //kafka的连接地址
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.246.128:9092");
        //消费者组
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
        //消息的反序列化器
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        //手动提交偏移量
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        //订阅主题
        consumer.subscribe(Collections.singletonList("kafka-stream-topic-output"));

        try {
            while (true) {
                ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    System.out.println("consumerRecord.key() = " + consumerRecord.key());
                    System.out.println("consumerRecord.value() = " + consumerRecord.value());
                }
                // 异步提交偏移量
                consumer.commitAsync();
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 同步提交偏移量
            consumer.commitSync();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

KafkaStreamQuickStart: Streaming-Verarbeitungsklasse

public class KafkaStreamQuickStart {

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.246.128:9092");
        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-quickstart");

        StreamsBuilder streamsBuilder = new StreamsBuilder();

        //流式计算
        streamProcessor(streamsBuilder);

        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties);

        kafkaStreams.start();
    }

    /**
     * 消息格式:hello world hello world
     * 配置并处理流数据。
     * 使用StreamsBuilder创建并配置KStream,对输入的主题中的数据进行处理,然后将处理结果发送到输出主题。
     * 具体处理包括:分割每个消息的值,按值分组,对每个分组在10秒的时间窗口内进行计数,然后将结果转换为KeyValue对并发送到输出主题。
     *
     * @param streamsBuilder 用于构建KStream对象的StreamsBuilder。
     */
    private static void streamProcessor(StreamsBuilder streamsBuilder) {
        // 从"kafka-stream-topic-input"主题中读取数据流
        KStream<String, String> stream = streamsBuilder.stream("kafka-stream-topic-input");
        System.out.println("stream = " + stream);
        // 将每个值按空格分割成数组,并将数组转换为列表,以扩展单个消息的值
        stream.flatMapValues((ValueMapper<String, Iterable<String>>) value -> {
                    String[] valAry = value.split(" ");
                    return Arrays.asList(valAry);
                })
                // 按消息的值进行分组,为后续的窗口化计数操作做准备
                .groupBy((key, value) -> value)
                // 定义10秒的时间窗口,在每个窗口内对每个分组进行计数
                .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
                .count()
                // 将计数结果转换为流,以便进行进一步的处理和转换
                .toStream()
                // 显示键值对的内容,并将键和值转换为字符串格式
                .map((key, value) -> {
                    System.out.println("key = " + key);
                    System.out.println("value = " + value);
                    return new KeyValue<>(key.key().toString(), value.toString());
                })
                // 将处理后的流数据发送到"kafka-stream-topic-output"主题
                .to("kafka-stream-topic-output");
    }
    
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54

Diese Verarbeitungsklasse beginnt zunächst mit dem Themakafka-stream-topic-inputErhalten Sie Nachrichtendaten von und senden Sie sie nach der Verarbeitung an das Themakafka-stream-topic-outputund dann der NachrichtenkonsumentKafkaStreamConsumerverbrauchen

Ergebnisse der

Fügen Sie hier eine Bildbeschreibung ein
Fügen Sie hier eine Bildbeschreibung ein

Streaming-Verarbeitungsprozess und Prinzipbeschreibung

Die Anfangsphase

Beim Eingeben eines Themas vonkafka-stream-topic-input Beim Lesen eines Datenstroms ist jede Nachricht ein Schlüssel-Wert-Paar.Angenommen, der Schlüssel der Eingabenachricht lautetnulloder eine bestimmte Zeichenfolge, je nachdem, wie die Nachricht an das Eingabethema gesendet wird.

KStream<String, String> stream = streamsBuilder.stream("kafka-stream-topic-input");
  • 1

Geteilter Nachrichtenwert

verwendenflatMapValues Die Methode teilt den Wert der Nachricht auf, aber dieser Vorgang ändert nicht den Schlüssel der Nachricht.Wenn der Schlüssel zum Eingeben der Nachricht lautetnull, dann ist in diesem Stadium der Schlüssel der Nachricht nochnull

stream.flatMapValues((ValueMapper<String, Iterable<String>>) value -> {
    String[] valAry = value.split(" ");
    return Arrays.asList(valAry);
})
  • 1
  • 2
  • 3
  • 4

Nach Nachrichtenwert gruppieren

In Kafka Streams, bei VerwendunggroupBy Wenn eine Methode einen Stream gruppiert, gibt sie tatsächlich einen neuen Schlüssel an, der für nachfolgende Fensteroperationen und Aggregationsoperationen verwendet wird.in diesem FallgroupByMethoden werden verwendet, um Nachrichten nach Wert zu gruppieren:

.groupBy((key, value) -> value)
  • 1

Dies bedeutet, dass nach dem Gruppierungsvorgang der Schlüssel jeder Nachricht im Stream auf den Wert der Nachricht gesetzt wird.Deshalb, wenn Sie nachfassenmapIn der Methode gesehenkeyParameter, thiskeyist tatsächlich der ursprüngliche Wert der Nachricht, weil ingroupByDanach ist der Wert der Nachricht zum Schlüssel geworden.

Zeitfenster definieren und zählen

Zu diesem Zeitpunkt werden die Nachrichten im Fenster angezeigt und gezählt, die Schlüssel bleiben jedoch unverändert.

.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
.count()
  • 1
  • 2

Konvertieren Sie das Zählergebnis in einen Stream

Bei der Konvertierung des Zählergebnisses in einen Stream sind die Schlüssel weiterhin dieselben wie zuvor beim Gruppieren

.toStream()
  • 1

Ergebnisse verarbeiten und transformieren

existierenmapMethode, sehen SiekeyDer Parameter ist eigentlich der gruppierte Schlüssel, der den ursprünglichen Wert der Nachricht darstellt:

.map((key, value) -> {
    System.out.println("key = " + key);
    System.out.println("value = " + value);
    return new KeyValue<>(key.key().toString(), value.toString());
})
  • 1
  • 2
  • 3
  • 4
  • 5

mapin der Methodekey.key().toString()besteht darin, die Zeichenfolgendarstellung des Schlüssels abzurufen, whilevalue.toString()besteht darin, den Zählwert in eine Zeichenfolge umzuwandeln.

Verarbeitete Daten an Ausgabethema senden

.to("kafka-stream-topic-output");
  • 1