Compartir tecnología

Un breve análisis del proceso y los principios de transmisión de mensajes de Kafka-Stream

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

La siguiente es una combinación de casos: contar el número de apariciones de palabras en los mensajes para probar e ilustrar el proceso de ejecución del procesamiento de transmisión de mensajes de Kafka.

dependencia de maven

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <exclusions>
                <exclusion>
                    <artifactId>connect-json</artifactId>
                    <groupId>org.apache.kafka</groupId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.kafka</groupId>
                    <artifactId>kafka-clients</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
        </dependency>
    </dependencies>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

Preparación

Primero escriba y cree tres clases, respectivamente como productores de mensajes, consumidores de mensajes y procesadores de flujo.
KafkaStreamProducer: Productor de mensajes

public class KafkaStreamProducer {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties properties = new Properties();
        //kafka的连接地址
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.246.128:9092");
        //发送失败,失败的重试次数
        properties.put(ProducerConfig.RETRIES_CONFIG, 5);
        //消息key的序列化器
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        //消息value的序列化器
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        for (int i = 0; i < 5; i++) {
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>("kafka-stream-topic-input", "hello kafka");
            producer.send(producerRecord);
        }

        producer.close();

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

El productor del mensaje informa al tema.kafka-stream-topic-inputEnviado cinco veceshello kafka
KafkaStreamConsumer: consumidor de mensajes

public class KafkaStreamConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        //kafka的连接地址
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.246.128:9092");
        //消费者组
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
        //消息的反序列化器
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        //手动提交偏移量
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        //订阅主题
        consumer.subscribe(Collections.singletonList("kafka-stream-topic-output"));

        try {
            while (true) {
                ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    System.out.println("consumerRecord.key() = " + consumerRecord.key());
                    System.out.println("consumerRecord.value() = " + consumerRecord.value());
                }
                // 异步提交偏移量
                consumer.commitAsync();
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 同步提交偏移量
            consumer.commitSync();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

KafkaStreamQuickStart: Clase de procesamiento de transmisión

public class KafkaStreamQuickStart {

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.246.128:9092");
        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-quickstart");

        StreamsBuilder streamsBuilder = new StreamsBuilder();

        //流式计算
        streamProcessor(streamsBuilder);

        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties);

        kafkaStreams.start();
    }

    /**
     * 消息格式:hello world hello world
     * 配置并处理流数据。
     * 使用StreamsBuilder创建并配置KStream,对输入的主题中的数据进行处理,然后将处理结果发送到输出主题。
     * 具体处理包括:分割每个消息的值,按值分组,对每个分组在10秒的时间窗口内进行计数,然后将结果转换为KeyValue对并发送到输出主题。
     *
     * @param streamsBuilder 用于构建KStream对象的StreamsBuilder。
     */
    private static void streamProcessor(StreamsBuilder streamsBuilder) {
        // 从"kafka-stream-topic-input"主题中读取数据流
        KStream<String, String> stream = streamsBuilder.stream("kafka-stream-topic-input");
        System.out.println("stream = " + stream);
        // 将每个值按空格分割成数组,并将数组转换为列表,以扩展单个消息的值
        stream.flatMapValues((ValueMapper<String, Iterable<String>>) value -> {
                    String[] valAry = value.split(" ");
                    return Arrays.asList(valAry);
                })
                // 按消息的值进行分组,为后续的窗口化计数操作做准备
                .groupBy((key, value) -> value)
                // 定义10秒的时间窗口,在每个窗口内对每个分组进行计数
                .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
                .count()
                // 将计数结果转换为流,以便进行进一步的处理和转换
                .toStream()
                // 显示键值对的内容,并将键和值转换为字符串格式
                .map((key, value) -> {
                    System.out.println("key = " + key);
                    System.out.println("value = " + value);
                    return new KeyValue<>(key.key().toString(), value.toString());
                })
                // 将处理后的流数据发送到"kafka-stream-topic-output"主题
                .to("kafka-stream-topic-output");
    }
    
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54

Esta clase de procesamiento comienza primero con el tema.kafka-stream-topic-inputObtener datos del mensaje y enviarlos al tema después del procesamiento.kafka-stream-topic-output, y luego el consumidor del mensaje.KafkaStreamConsumerconsumir

Resultados de la

Insertar descripción de la imagen aquí
Insertar descripción de la imagen aquí

Proceso de procesamiento de transmisión y explicación del principio.

La fase inicial

Al ingresar a un tema dekafka-stream-topic-input Al leer un flujo de datos, cada mensaje es un par clave-valor.Supongamos que la clave del mensaje de entrada esnullo una cadena específica, dependiendo de cómo se envía el mensaje al tema de entrada.

KStream<String, String> stream = streamsBuilder.stream("kafka-stream-topic-input");
  • 1

Valor del mensaje dividido

usarflatMapValues El método divide el valor del mensaje, pero esta operación no cambia la clave del mensaje.Si la clave para ingresar el mensaje esnull, entonces en esta etapa la clave del mensaje todavía estánull

stream.flatMapValues((ValueMapper<String, Iterable<String>>) value -> {
    String[] valAry = value.split(" ");
    return Arrays.asList(valAry);
})
  • 1
  • 2
  • 3
  • 4

Agrupar por valor de mensaje

En Kafka Streams, cuando se usagroupBy Cuando un método agrupa una secuencia, en realidad está especificando una nueva clave, que se utilizará para operaciones de ventanas y operaciones de agregación posteriores.en este casogroupByLos métodos se utilizan para agrupar mensajes por valor:

.groupBy((key, value) -> value)
  • 1

Esto significa que después de la operación de agrupación, la clave de cada mensaje en la secuencia se establece en el valor del mensaje.Por lo tanto, cuando hagas un seguimientomapVisto en el método.keyparámetro, estekeyes en realidad el valor original del mensaje porque engroupByDespués de eso, el valor del mensaje se ha convertido en la clave.

Definir ventana de tiempo y contar

En esta etapa, los mensajes se muestran en ventanas y se cuentan, pero las claves permanecen sin cambios.

.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
.count()
  • 1
  • 2

Convertir el resultado del recuento en secuencia

Al convertir el resultado del recuento en una secuencia, las claves siguen siendo las mismas que antes al agrupar

.toStream()
  • 1

Procesar y transformar resultados

existirmapmétodo, ya veskeyEl parámetro es en realidad la clave agrupada, que es el valor original del mensaje:

.map((key, value) -> {
    System.out.println("key = " + key);
    System.out.println("value = " + value);
    return new KeyValue<>(key.key().toString(), value.toString());
})
  • 1
  • 2
  • 3
  • 4
  • 5

mapen métodokey.key().toString()es obtener la representación de cadena de la clave, mientras quevalue.toString()es convertir el valor de recuento en una cadena.

Enviar datos procesados ​​al tema de salida

.to("kafka-stream-topic-output");
  • 1