Technology sharing

Brevis analysis de Kafka-Stream nuntium processum effusissimum et principia

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

Coniunctio casuum haec est: numerans numerum eventuum verborum in nuntiis ad probandum et illustrandum processum exsecutionis de kafka nuntius processui effusis.

Maven dependentia

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <exclusions>
                <exclusion>
                    <artifactId>connect-json</artifactId>
                    <groupId>org.apache.kafka</groupId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.kafka</groupId>
                    <artifactId>kafka-clients</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
        </dependency>
    </dependencies>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

Praeparatio

Primum scribe et tres classes crea, respective ut nuntius effectrix, nuntius consumers, processor amnis.
KafkaStreamProducer: Nuntius producentis

public class KafkaStreamProducer {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties properties = new Properties();
        //kafka的连接地址
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.246.128:9092");
        //发送失败,失败的重试次数
        properties.put(ProducerConfig.RETRIES_CONFIG, 5);
        //消息key的序列化器
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        //消息value的序列化器
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        for (int i = 0; i < 5; i++) {
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>("kafka-stream-topic-input", "hello kafka");
            producer.send(producerRecord);
        }

        producer.close();

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

Nuntius producentis refert ad remkafka-stream-topic-inputMissus quinquieshello kafka
KafkaStreamConsumer:Nuntius dolor

public class KafkaStreamConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        //kafka的连接地址
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.246.128:9092");
        //消费者组
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
        //消息的反序列化器
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        //手动提交偏移量
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        //订阅主题
        consumer.subscribe(Collections.singletonList("kafka-stream-topic-output"));

        try {
            while (true) {
                ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    System.out.println("consumerRecord.key() = " + consumerRecord.key());
                    System.out.println("consumerRecord.value() = " + consumerRecord.value());
                }
                // 异步提交偏移量
                consumer.commitAsync();
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 同步提交偏移量
            consumer.commitSync();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

KafkaStreamQuickStart: Genus processus streaming

public class KafkaStreamQuickStart {

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.246.128:9092");
        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-quickstart");

        StreamsBuilder streamsBuilder = new StreamsBuilder();

        //流式计算
        streamProcessor(streamsBuilder);

        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties);

        kafkaStreams.start();
    }

    /**
     * 消息格式:hello world hello world
     * 配置并处理流数据。
     * 使用StreamsBuilder创建并配置KStream,对输入的主题中的数据进行处理,然后将处理结果发送到输出主题。
     * 具体处理包括:分割每个消息的值,按值分组,对每个分组在10秒的时间窗口内进行计数,然后将结果转换为KeyValue对并发送到输出主题。
     *
     * @param streamsBuilder 用于构建KStream对象的StreamsBuilder。
     */
    private static void streamProcessor(StreamsBuilder streamsBuilder) {
        // 从"kafka-stream-topic-input"主题中读取数据流
        KStream<String, String> stream = streamsBuilder.stream("kafka-stream-topic-input");
        System.out.println("stream = " + stream);
        // 将每个值按空格分割成数组,并将数组转换为列表,以扩展单个消息的值
        stream.flatMapValues((ValueMapper<String, Iterable<String>>) value -> {
                    String[] valAry = value.split(" ");
                    return Arrays.asList(valAry);
                })
                // 按消息的值进行分组,为后续的窗口化计数操作做准备
                .groupBy((key, value) -> value)
                // 定义10秒的时间窗口,在每个窗口内对每个分组进行计数
                .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
                .count()
                // 将计数结果转换为流,以便进行进一步的处理和转换
                .toStream()
                // 显示键值对的内容,并将键和值转换为字符串格式
                .map((key, value) -> {
                    System.out.println("key = " + key);
                    System.out.println("value = " + value);
                    return new KeyValue<>(key.key().toString(), value.toString());
                })
                // 将处理后的流数据发送到"kafka-stream-topic-output"主题
                .to("kafka-stream-topic-output");
    }
    
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54

Hoc genus processus primus incipit a topickafka-stream-topic-inputDa nuntium impetra e , et mitte in thema post dispensandokafka-stream-topic-outputet nuntius dolorKafkaStreamConsumerconsumere

Proventus of the

Insert imaginem descriptionis hic
Insert imaginem descriptionis hic

Processus processus et explicatio principii streaming

Primum tempus

Cum intrantes topic from *kafka-stream-topic-input Cum legere notitias amnis, unumquodque nuntius est par pretii.Pone clavem verbi initus estnullvel chorda specifica, prout nuntius mittitur in argumento initus.

KStream<String, String> stream = streamsBuilder.stream("kafka-stream-topic-input");
  • 1

Split nuntius valorem

ususflatMapValues methodus valorem nuntii scindit, sed haec operatio clavem nuntii non mutat.Si clavem intrare nuntium estnullergo adhuc clavis nuntii estnull

stream.flatMapValues((ValueMapper<String, Iterable<String>>) value -> {
    String[] valAry = value.split(" ");
    return Arrays.asList(valAry);
})
  • 1
  • 2
  • 3
  • 4

Coetus per nuntium pretii

In Kafka rivi, cum usuragroupBy Cum methodus rivum aggregat, actu specificans novam clavem, quae ad operationes fenestras et aggregationem operationum subsequentium adhibebitur.in hoc casugroupByModi sunt nuntii coetus ad valorem:

.groupBy((key, value) -> value)
  • 1

Hoc significat quod post operationem compaginem, clavis uniuscuiusque nuntii in flumine, ad valorem ipsius nuntii positum est.Itaque cum sequerismapVideri in modumkeymodulus, thiskeyactualiter originali valorem de verbo quod ingroupByPostea, valor nuntii clavis factus est.

Define tempus fenestra et comitem

In hac scaena, nuntii fenestrae sunt et numerantur, sed claves immutata manent.

.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
.count()
  • 1
  • 2

Converte comitem exitum ad stream

Cum convertendo comitem ad rivum evenit, claves adhuc eadem sunt quae prius cum copula

.toStream()
  • 1

Processus et eventus transformare

existmapmodum, videskeyParameter est actu clavis aggregatus, qui est nuntius primigenius valoris;

.map((key, value) -> {
    System.out.println("key = " + key);
    System.out.println("value = " + value);
    return new KeyValue<>(key.key().toString(), value.toString());
})
  • 1
  • 2
  • 3
  • 4
  • 5

mapin modumkey.key().toString()est ut filum repraesentationis clavis, dumvalue.toString()est ad valorem chordae comitem convertere.

Mitte processionaliter notitia ad output topic

.to("kafka-stream-topic-output");
  • 1