τα στοιχεία επικοινωνίας μου
Ταχυδρομείο[email protected]
2024-07-12
한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina
Το παρακάτω είναι ένας συνδυασμός περιπτώσεων: μέτρηση του αριθμού των εμφανίσεων λέξεων στα μηνύματα για δοκιμή και απεικόνιση της διαδικασίας εκτέλεσης της επεξεργασίας ροής μηνυμάτων kafka
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<exclusions>
<exclusion>
<artifactId>connect-json</artifactId>
<groupId>org.apache.kafka</groupId>
</exclusion>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
</dependencies>
Πρώτα γράψτε και δημιουργήστε τρεις κλάσεις, αντίστοιχα ως παραγωγοί μηνυμάτων, καταναλωτές μηνυμάτων και επεξεργαστές ροής.
KafkaStreamProducer
:Παραγωγός μηνύματος
public class KafkaStreamProducer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties properties = new Properties();
//kafka的连接地址
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.246.128:9092");
//发送失败,失败的重试次数
properties.put(ProducerConfig.RETRIES_CONFIG, 5);
//消息key的序列化器
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
//消息value的序列化器
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
for (int i = 0; i < 5; i++) {
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("kafka-stream-topic-input", "hello kafka");
producer.send(producerRecord);
}
producer.close();
}
}
Ο παραγωγός μηνυμάτων αναφέρεται στο θέμαkafka-stream-topic-input
Στάλθηκε πέντε φορέςhello kafka
KafkaStreamConsumer
:Μήνυμα καταναλωτή
public class KafkaStreamConsumer {
public static void main(String[] args) {
Properties properties = new Properties();
//kafka的连接地址
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.246.128:9092");
//消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
//消息的反序列化器
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//手动提交偏移量
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
//订阅主题
consumer.subscribe(Collections.singletonList("kafka-stream-topic-output"));
try {
while (true) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println("consumerRecord.key() = " + consumerRecord.key());
System.out.println("consumerRecord.value() = " + consumerRecord.value());
}
// 异步提交偏移量
consumer.commitAsync();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 同步提交偏移量
consumer.commitSync();
}
}
}
KafkaStreamQuickStart
: Κατηγορία επεξεργασίας ροής
public class KafkaStreamQuickStart {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.246.128:9092");
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-quickstart");
StreamsBuilder streamsBuilder = new StreamsBuilder();
//流式计算
streamProcessor(streamsBuilder);
KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties);
kafkaStreams.start();
}
/**
* 消息格式:hello world hello world
* 配置并处理流数据。
* 使用StreamsBuilder创建并配置KStream,对输入的主题中的数据进行处理,然后将处理结果发送到输出主题。
* 具体处理包括:分割每个消息的值,按值分组,对每个分组在10秒的时间窗口内进行计数,然后将结果转换为KeyValue对并发送到输出主题。
*
* @param streamsBuilder 用于构建KStream对象的StreamsBuilder。
*/
private static void streamProcessor(StreamsBuilder streamsBuilder) {
// 从"kafka-stream-topic-input"主题中读取数据流
KStream<String, String> stream = streamsBuilder.stream("kafka-stream-topic-input");
System.out.println("stream = " + stream);
// 将每个值按空格分割成数组,并将数组转换为列表,以扩展单个消息的值
stream.flatMapValues((ValueMapper<String, Iterable<String>>) value -> {
String[] valAry = value.split(" ");
return Arrays.asList(valAry);
})
// 按消息的值进行分组,为后续的窗口化计数操作做准备
.groupBy((key, value) -> value)
// 定义10秒的时间窗口,在每个窗口内对每个分组进行计数
.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
.count()
// 将计数结果转换为流,以便进行进一步的处理和转换
.toStream()
// 显示键值对的内容,并将键和值转换为字符串格式
.map((key, value) -> {
System.out.println("key = " + key);
System.out.println("value = " + value);
return new KeyValue<>(key.key().toString(), value.toString());
})
// 将处理后的流数据发送到"kafka-stream-topic-output"主题
.to("kafka-stream-topic-output");
}
}
Αυτή η τάξη επεξεργασίας ξεκινά πρώτα από το θέμαkafka-stream-topic-input
Λάβετε δεδομένα μηνυμάτων από και στείλτε τα στο θέμα μετά την επεξεργασίαkafka-stream-topic-output
, και μετά ο καταναλωτής του μηνύματοςKafkaStreamConsumer
καταναλώνω
Όταν εισάγετε ένα θέμα απόkafka-stream-topic-input
Κατά την ανάγνωση μιας ροής δεδομένων, κάθε μήνυμα είναι ένα ζεύγος κλειδιού-τιμής.Ας υποθέσουμε ότι το κλειδί του μηνύματος εισαγωγής είναιnull
ή μια συγκεκριμένη συμβολοσειρά, ανάλογα με τον τρόπο αποστολής του μηνύματος στο θέμα εισαγωγής.
KStream<String, String> stream = streamsBuilder.stream("kafka-stream-topic-input");
χρήσηflatMapValues
Η μέθοδος διαχωρίζει την τιμή του μηνύματος, αλλά αυτή η λειτουργία δεν αλλάζει το κλειδί του μηνύματος.Εάν το κλειδί για την εισαγωγή του μηνύματος είναιnull
, τότε σε αυτό το στάδιο το κλειδί του μηνύματος είναι ακόμαnull
。
stream.flatMapValues((ValueMapper<String, Iterable<String>>) value -> {
String[] valAry = value.split(" ");
return Arrays.asList(valAry);
})
Στο Kafka Streams, κατά τη χρήσηgroupBy
Όταν μια μέθοδος ομαδοποιεί μια ροή, στην πραγματικότητα καθορίζει ένα νέο κλειδί, το οποίο θα χρησιμοποιηθεί για επόμενες λειτουργίες παραθύρου και συνάθροισης.σε αυτήν την περίπτωσηgroupBy
Οι μέθοδοι χρησιμοποιούνται για την ομαδοποίηση μηνυμάτων κατά τιμή:
.groupBy((key, value) -> value)
Αυτό σημαίνει ότι μετά τη λειτουργία ομαδοποίησης, το κλειδί κάθε μηνύματος στη ροή ορίζεται στην τιμή του μηνύματος.Επομένως, όταν παρακολουθείτεmap
Βλέπεται στη μέθοδοkey
παράμετρος, αυτόkey
είναι στην πραγματικότητα η αρχική τιμή του μηνύματος επειδή ingroupBy
Μετά από αυτό, η τιμή του μηνύματος έχει γίνει το κλειδί.
Σε αυτό το στάδιο, τα μηνύματα παρατίθενται και καταμετρώνται, αλλά τα κλειδιά παραμένουν αμετάβλητα.
.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
.count()
Κατά τη μετατροπή του αποτελέσματος μέτρησης σε ροή, τα πλήκτρα εξακολουθούν να είναι τα ίδια όπως πριν κατά την ομαδοποίηση
.toStream()
υπάρχειmap
μέθοδο, βλέπετεkey
Η παράμετρος είναι στην πραγματικότητα το ομαδοποιημένο κλειδί, το οποίο είναι η αρχική τιμή του μηνύματος:
.map((key, value) -> {
System.out.println("key = " + key);
System.out.println("value = " + value);
return new KeyValue<>(key.key().toString(), value.toString());
})
map
στη μέθοδοkey.key().toString()
είναι να πάρετε την παράσταση συμβολοσειράς του κλειδιού, ενώvalue.toString()
είναι η μετατροπή της τιμής μέτρησης σε συμβολοσειρά.
.to("kafka-stream-topic-output");