본문 바로가기
BIG DATA/KAFKA

kafka prosumer (Python), consumer(C++) 샘플소스 소스

by 골든크랩 2024. 12. 26.
728x90
반응형

 


# pip install confluent-kafka
from confluent_kafka import Producer
import json

producer = Producer({'bootstrap.servers': '172.24.195.146:9092'})

topic_name = "quickstart-events"
message = {
    "code": "A005930",
    "code_name": "삼성전자",
    "price": 53700
}

# message를 JSON 문자열로 변환하고 UTF-8로 인코딩
message_json = json.dumps(message).encode('utf-8')

producer.produce(topic=topic_name, value=message_json)
producer.flush()

 

 


c++ 프로그램 작성을 위한 라이브러리 설치
sudo apt-get update
sudo apt-get install openssl libssl-dev
sudo apt-get install libworkflow-dev librdkafka-dev

libssl-dev 가 설치되야, header 파일을 쓸수 있다.  그렇다면 openssl 는 설치 안해도 되나?

- 설치 확인

openssl version

 

설치된 파일들 확인법

dpkg -L openssl

 

 

참고)makefile에 아래 라이브러리 추가할것.

-lworkflow -lrdkafka -lpthread
//
// Created by mecen on 2024-12-26.
//
// group 이 있는 예제
#include <workflow/WFTaskFactory.h>
#include <workflow/WFOperator.h>
#include <workflow/WFFacilities.h>
#include <librdkafka/rdkafka.h>
#include <iostream>
#include <iconv.h>
#include <vector>

// UTF-8 디코딩 함수
std::wstring utf8_to_wstring(const std::string& str)
{
    iconv_t cd = iconv_open("WCHAR_T", "UTF-8");
    if (cd == (iconv_t)-1)
    {
        throw std::runtime_error("iconv_open failed");
    }

    size_t in_size = str.size();
    size_t out_size = in_size * sizeof(wchar_t);
    std::vector<wchar_t> out_buf(out_size);

    char* in_buf = const_cast<char*>(str.data());
    char* out_ptr = reinterpret_cast<char*>(out_buf.data());
    size_t result = iconv(cd, &in_buf, &in_size, &out_ptr, &out_size);
    if (result == (size_t)-1)
    {
        iconv_close(cd);
        throw std::runtime_error("iconv conversion failed");
    }

    std::wstring wstr(out_buf.data(), out_buf.size() - out_size / sizeof(wchar_t));
    iconv_close(cd);
    return wstr;
}

class KafkaConsumer
{
public:
    KafkaConsumer(const std::string& brokers, const std::string& group_id, const std::string& topic)
    {
        // Kafka 설정
        conf = rd_kafka_conf_new();
        rd_kafka_conf_set(conf, "bootstrap.servers", brokers.c_str(), errstr, sizeof(errstr));
        rd_kafka_conf_set(conf, "group.id", group_id.c_str(), errstr, sizeof(errstr));
        rd_kafka_conf_set(conf, "auto.offset.reset", "earliest", errstr, sizeof(errstr));

        // Consumer 생성
        rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));

        // Topics 설정
        topics = rd_kafka_topic_partition_list_new(1);
        rd_kafka_topic_partition_list_add(topics, topic.c_str(), -1);
        rd_kafka_subscribe(rk, topics);
    }

    void start()
    {
        // Workflow를 사용한 Kafka 메시지 수신 및 처리
        while (true)
        {
            rd_kafka_message_t* msg = rd_kafka_consumer_poll(rk, 1000); // 1초 동안 대기
            if (msg)
            {
                if (!msg->err)
                {
                    std::string message((char*)msg->payload, msg->len);
                    // UTF-8로 디코딩
                    try
                    {
                        std::wstring wmessage = utf8_to_wstring(message);
                        std::wcout << L"Received message: " << wmessage << std::endl;
                    }
                    catch (const std::exception& ex)
                    {
                        std::cerr << "Error decoding message: " << ex.what() << std::endl;
                    }
                }
                else
                {
                    std::cerr << "Error: " << rd_kafka_message_errstr(msg) << std::endl;
                }
                rd_kafka_message_destroy(msg);
            }
        }
    }

    ~KafkaConsumer()
    {
        rd_kafka_consumer_close(rk);
        rd_kafka_topic_partition_list_destroy(topics);
        rd_kafka_destroy(rk);
    }

private:
    rd_kafka_t* rk;
    rd_kafka_conf_t* conf;
    rd_kafka_topic_partition_list_t* topics;
    char errstr[512];
};

int main()
{
    const std::string brokers = "172.24.195.146:9092";
    const std::string group_id = "my_group";  // 이 값은 임의로 지정할 수 있습니다.
    const std::string topic = "quickstart-events";

    KafkaConsumer consumer(brokers, group_id, topic);
    consumer.start();

    return 0;
}

 

728x90
반응형

댓글