प्रौद्योगिकी साझेदारी

काफ्का-स्ट्रीम सन्देशप्रवाहप्रक्रियायाः सिद्धान्तानां च संक्षिप्तं विश्लेषणम्

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

निम्नलिखितम् प्रकरणानाम् एकः संयोजनः अस्ति: काफ्का सन्देशप्रवाहप्रक्रियायाः निष्पादनप्रक्रियायाः परीक्षणार्थं दृष्टान्तार्थं च सन्देशेषु शब्दानां आवृत्तीनां संख्यायाः गणना

मवेन् आश्रयः

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <exclusions>
                <exclusion>
                    <artifactId>connect-json</artifactId>
                    <groupId>org.apache.kafka</groupId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.kafka</groupId>
                    <artifactId>kafka-clients</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
        </dependency>
    </dependencies>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

प्रेप्सति

प्रथमं क्रमशः सन्देशनिर्माता, सन्देशग्राहकः, धारासंसाधकः च इति त्रयः वर्गाः लिखित्वा रचयन्तु ।
KafkaStreamProducer:सन्देशनिर्माता

public class KafkaStreamProducer {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties properties = new Properties();
        //kafka的连接地址
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.246.128:9092");
        //发送失败,失败的重试次数
        properties.put(ProducerConfig.RETRIES_CONFIG, 5);
        //消息key的序列化器
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        //消息value的序列化器
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        for (int i = 0; i < 5; i++) {
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>("kafka-stream-topic-input", "hello kafka");
            producer.send(producerRecord);
        }

        producer.close();

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

सन्देशनिर्माता विषये प्रतिवेदनं करोतिkafka-stream-topic-inputपञ्चवारं प्रेषितम्hello kafka
KafkaStreamConsumer: सन्देश उपभोक्ता

public class KafkaStreamConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        //kafka的连接地址
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.246.128:9092");
        //消费者组
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
        //消息的反序列化器
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        //手动提交偏移量
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        //订阅主题
        consumer.subscribe(Collections.singletonList("kafka-stream-topic-output"));

        try {
            while (true) {
                ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    System.out.println("consumerRecord.key() = " + consumerRecord.key());
                    System.out.println("consumerRecord.value() = " + consumerRecord.value());
                }
                // 异步提交偏移量
                consumer.commitAsync();
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 同步提交偏移量
            consumer.commitSync();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

KafkaStreamQuickStart: स्ट्रीमिंग प्रोसेसिंग क्लास

public class KafkaStreamQuickStart {

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.246.128:9092");
        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-quickstart");

        StreamsBuilder streamsBuilder = new StreamsBuilder();

        //流式计算
        streamProcessor(streamsBuilder);

        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties);

        kafkaStreams.start();
    }

    /**
     * 消息格式:hello world hello world
     * 配置并处理流数据。
     * 使用StreamsBuilder创建并配置KStream,对输入的主题中的数据进行处理,然后将处理结果发送到输出主题。
     * 具体处理包括:分割每个消息的值,按值分组,对每个分组在10秒的时间窗口内进行计数,然后将结果转换为KeyValue对并发送到输出主题。
     *
     * @param streamsBuilder 用于构建KStream对象的StreamsBuilder。
     */
    private static void streamProcessor(StreamsBuilder streamsBuilder) {
        // 从"kafka-stream-topic-input"主题中读取数据流
        KStream<String, String> stream = streamsBuilder.stream("kafka-stream-topic-input");
        System.out.println("stream = " + stream);
        // 将每个值按空格分割成数组,并将数组转换为列表,以扩展单个消息的值
        stream.flatMapValues((ValueMapper<String, Iterable<String>>) value -> {
                    String[] valAry = value.split(" ");
                    return Arrays.asList(valAry);
                })
                // 按消息的值进行分组,为后续的窗口化计数操作做准备
                .groupBy((key, value) -> value)
                // 定义10秒的时间窗口,在每个窗口内对每个分组进行计数
                .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
                .count()
                // 将计数结果转换为流,以便进行进一步的处理和转换
                .toStream()
                // 显示键值对的内容,并将键和值转换为字符串格式
                .map((key, value) -> {
                    System.out.println("key = " + key);
                    System.out.println("value = " + value);
                    return new KeyValue<>(key.key().toString(), value.toString());
                })
                // 将处理后的流数据发送到"kafka-stream-topic-output"主题
                .to("kafka-stream-topic-output");
    }
    
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54

अयं प्रसंस्करणवर्गः प्रथमं विषयात् आरभ्यतेkafka-stream-topic-inputतः सन्देशदत्तांशं प्राप्नुवन्तु, प्रक्रियां कृत्वा विषये प्रेषयन्तु चkafka-stream-topic-output, ततः सन्देशग्राहकःKafkaStreamConsumerप्लक्ष्

परिणामाः

अत्र चित्रविवरणं सम्मिलितं कुर्वन्तु
अत्र चित्रविवरणं सम्मिलितं कुर्वन्तु

स्ट्रीमिंग प्रोसेसिंग प्रक्रिया तथा सिद्धान्तव्याख्या

प्रारम्भिकः चरणः

तः विषयं प्रविष्टे सतिkafka-stream-topic-input दत्तांशप्रवाहं पठन् प्रत्येकं सन्देशः कील-मूल्ययुगलः भवति ।मानातु यत् निवेशसन्देशस्य कुञ्जी अस्तिnullअथवा विशिष्टं स्ट्रिंग्, सन्देशः कथं निवेशविषये प्रेषितः इति अवलम्ब्य ।

KStream<String, String> stream = streamsBuilder.stream("kafka-stream-topic-input");
  • 1

सन्देशमूल्यं विभक्तं कुर्वन्तु

उपयुञ्जताम्‌flatMapValues method इत्यनेन सन्देशस्य मूल्यं विभज्यते, परन्तु एतत् क्रिया सन्देशस्य कीलं न परिवर्तयति ।यदि सन्देशं प्रविष्टुं कुञ्जी अस्तिnull, तर्हि अस्मिन् स्तरे सन्देशस्य कुञ्जी निश्चलः अस्तिnull

stream.flatMapValues((ValueMapper<String, Iterable<String>>) value -> {
    String[] valAry = value.split(" ");
    return Arrays.asList(valAry);
})
  • 1
  • 2
  • 3
  • 4

सन्देशमूल्येन समूहयन्तु

काफ्का स्ट्रीम्स् इत्यस्मिन्, उपयोगं कुर्वन्groupBy यदा कश्चन विधिः धारासमूहं करोति तदा तत् वस्तुतः नूतनं कीलं निर्दिशति, यत् अनन्तरं विण्डोिंग्-क्रियाणां, समुच्चय-क्रियाणां च कृते उपयुज्यते ।एवं सतिgroupByसन्देशान् मूल्येन समूहयितुं पद्धतयः उपयुज्यन्ते :

.groupBy((key, value) -> value)
  • 1

अस्य अर्थः अस्ति यत् समूहीकरणक्रियायाः अनन्तरं धारायां प्रत्येकस्य सन्देशस्य कीलं सन्देशस्य मूल्ये सेट् भवति ।अतः यदा भवन्तः अनुवर्तनं कुर्वन्तिmapविधौ दृष्टम्keyपैरामीटर्, इदम्keyवस्तुतः सन्देशस्य मूलमूल्यं भवति यतः ingroupByतदनन्तरं सन्देशस्य मूल्यं कुञ्जी अभवत् ।

समयविण्डो परिभाष्य गणनां कुर्वन्तु

अस्मिन् स्तरे सन्देशाः विण्डो कृत्वा गण्यन्ते, परन्तु कीलानि अपरिवर्तितानि एव तिष्ठन्ति ।

.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
.count()
  • 1
  • 2

गणनाफलं धारायां परिवर्तयन्तु

गणनाफलं धारायां परिवर्तयन्ते सति, समूहीकरणे पूर्ववत् कीलानि अद्यापि भवन्ति

.toStream()
  • 1

परिणामान् प्रक्रियां कृत्वा परिवर्तनं कुर्वन्तु

अस्तिmapविधिः पश्यसिkeyपैरामीटर् वस्तुतः समूहीकृतं कीलम् अस्ति, यत् सन्देशस्य मूलमूल्यं भवति:

.map((key, value) -> {
    System.out.println("key = " + key);
    System.out.println("value = " + value);
    return new KeyValue<>(key.key().toString(), value.toString());
})
  • 1
  • 2
  • 3
  • 4
  • 5

mapविधियाम्key.key().toString()कीलस्य स्ट्रिंग् प्रतिनिधित्वं प्राप्तुं भवति, whilevalue.toString()गणनामूल्यं स्ट्रिंग् मध्ये परिवर्तयितुं भवति ।

संसाधितं दत्तांशं आउटपुट् विषयं प्रति प्रेषयन्तु

.to("kafka-stream-topic-output");
  • 1