2024-07-12
한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina
Ce qui suit est une combinaison de cas : compter le nombre d'occurrences de mots dans les messages pour tester et illustrer le processus d'exécution du traitement de streaming de messages Kafka.
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<exclusions>
<exclusion>
<artifactId>connect-json</artifactId>
<groupId>org.apache.kafka</groupId>
</exclusion>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
</dependencies>
Commencez par écrire et créer trois classes, respectivement en tant que producteurs de messages, consommateurs de messages et processeurs de flux.
KafkaStreamProducer
:Producteur de messages
public class KafkaStreamProducer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties properties = new Properties();
//kafka的连接地址
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.246.128:9092");
//发送失败,失败的重试次数
properties.put(ProducerConfig.RETRIES_CONFIG, 5);
//消息key的序列化器
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
//消息value的序列化器
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
for (int i = 0; i < 5; i++) {
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("kafka-stream-topic-input", "hello kafka");
producer.send(producerRecord);
}
producer.close();
}
}
Le producteur du message rapporte au sujetkafka-stream-topic-input
Envoyé cinq foishello kafka
KafkaStreamConsumer
: Consommateur de messages
public class KafkaStreamConsumer {
public static void main(String[] args) {
Properties properties = new Properties();
//kafka的连接地址
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.246.128:9092");
//消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
//消息的反序列化器
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//手动提交偏移量
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
//订阅主题
consumer.subscribe(Collections.singletonList("kafka-stream-topic-output"));
try {
while (true) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println("consumerRecord.key() = " + consumerRecord.key());
System.out.println("consumerRecord.value() = " + consumerRecord.value());
}
// 异步提交偏移量
consumer.commitAsync();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 同步提交偏移量
consumer.commitSync();
}
}
}
KafkaStreamQuickStart
: Classe de traitement du streaming
public class KafkaStreamQuickStart {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.246.128:9092");
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-quickstart");
StreamsBuilder streamsBuilder = new StreamsBuilder();
//流式计算
streamProcessor(streamsBuilder);
KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties);
kafkaStreams.start();
}
/**
* 消息格式:hello world hello world
* 配置并处理流数据。
* 使用StreamsBuilder创建并配置KStream,对输入的主题中的数据进行处理,然后将处理结果发送到输出主题。
* 具体处理包括:分割每个消息的值,按值分组,对每个分组在10秒的时间窗口内进行计数,然后将结果转换为KeyValue对并发送到输出主题。
*
* @param streamsBuilder 用于构建KStream对象的StreamsBuilder。
*/
private static void streamProcessor(StreamsBuilder streamsBuilder) {
// 从"kafka-stream-topic-input"主题中读取数据流
KStream<String, String> stream = streamsBuilder.stream("kafka-stream-topic-input");
System.out.println("stream = " + stream);
// 将每个值按空格分割成数组,并将数组转换为列表,以扩展单个消息的值
stream.flatMapValues((ValueMapper<String, Iterable<String>>) value -> {
String[] valAry = value.split(" ");
return Arrays.asList(valAry);
})
// 按消息的值进行分组,为后续的窗口化计数操作做准备
.groupBy((key, value) -> value)
// 定义10秒的时间窗口,在每个窗口内对每个分组进行计数
.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
.count()
// 将计数结果转换为流,以便进行进一步的处理和转换
.toStream()
// 显示键值对的内容,并将键和值转换为字符串格式
.map((key, value) -> {
System.out.println("key = " + key);
System.out.println("value = " + value);
return new KeyValue<>(key.key().toString(), value.toString());
})
// 将处理后的流数据发送到"kafka-stream-topic-output"主题
.to("kafka-stream-topic-output");
}
}
Cette classe de traitement commence d'abord par le sujetkafka-stream-topic-input
Obtenez les données du message de et envoyez-les au sujet après traitementkafka-stream-topic-output
, puis le consommateur du messageKafkaStreamConsumer
consommer
Lorsque vous entrez dans un sujet dekafka-stream-topic-input
Lors de la lecture d'un flux de données, chaque message est une paire clé-valeur.Supposons que la clé du message d'entrée soitnull
ou une chaîne spécifique, selon la manière dont le message est envoyé au sujet d'entrée.
KStream<String, String> stream = streamsBuilder.stream("kafka-stream-topic-input");
utiliserflatMapValues
La méthode divise la valeur du message, mais cette opération ne change pas la clé du message.Si la clé pour saisir le message estnull
, alors à ce stade la clé du message est toujoursnull
。
stream.flatMapValues((ValueMapper<String, Iterable<String>>) value -> {
String[] valAry = value.split(" ");
return Arrays.asList(valAry);
})
Dans Kafka Streams, lors de l'utilisationgroupBy
Lorsqu'une méthode regroupe un flux, elle spécifie en fait une nouvelle clé, qui sera utilisée pour les opérations de fenêtrage et d'agrégation ultérieures.dans ce casgroupBy
Les méthodes sont utilisées pour regrouper les messages par valeur :
.groupBy((key, value) -> value)
Cela signifie qu'après l'opération de regroupement, la clé de chaque message du flux est définie sur la valeur du message.Par conséquent, lorsque vous effectuez un suivimap
Vu dans la méthodekey
paramètre, cecikey
est en fait la valeur originale du message car dansgroupBy
Après cela, la valeur du message est devenue la clé.
A ce stade, les messages sont fenêtrés et comptés, mais les clés restent inchangées.
.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
.count()
Lors de la conversion du résultat du comptage en flux, les clés sont toujours les mêmes qu'auparavant lors du regroupement
.toStream()
existermap
méthode, tu voiskey
Le paramètre est en fait la clé groupée, qui est la valeur originale du message :
.map((key, value) -> {
System.out.println("key = " + key);
System.out.println("value = " + value);
return new KeyValue<>(key.key().toString(), value.toString());
})
map
dans la méthodekey.key().toString()
est d'obtenir la représentation sous forme de chaîne de la clé, tandis quevalue.toString()
consiste à convertir la valeur de comptage en chaîne.
.to("kafka-stream-topic-output");