728x90
반응형
# pip install confluent-kafka
from confluent_kafka import Producer
import json
producer = Producer({'bootstrap.servers': '172.24.195.146:9092'})
topic_name = "quickstart-events"
message = {
"code": "A005930",
"code_name": "삼성전자",
"price": 53700
}
# message를 JSON 문자열로 변환하고 UTF-8로 인코딩
message_json = json.dumps(message).encode('utf-8')
producer.produce(topic=topic_name, value=message_json)
producer.flush()
c++ 프로그램 작성을 위한 라이브러리 설치
sudo apt-get update
sudo apt-get install openssl libssl-dev
sudo apt-get install libworkflow-dev librdkafka-dev
libssl-dev 가 설치되야, header 파일을 쓸수 있다. 그렇다면 openssl 는 설치 안해도 되나?
- 설치 확인
openssl version
설치된 파일들 확인법
dpkg -L openssl
참고)makefile에 아래 라이브러리 추가할것.
-lworkflow -lrdkafka -lpthread
//
// Created by mecen on 2024-12-26.
//
// group 이 있는 예제
#include <workflow/WFTaskFactory.h>
#include <workflow/WFOperator.h>
#include <workflow/WFFacilities.h>
#include <librdkafka/rdkafka.h>
#include <iostream>
#include <iconv.h>
#include <vector>
// UTF-8 디코딩 함수
std::wstring utf8_to_wstring(const std::string& str)
{
iconv_t cd = iconv_open("WCHAR_T", "UTF-8");
if (cd == (iconv_t)-1)
{
throw std::runtime_error("iconv_open failed");
}
size_t in_size = str.size();
size_t out_size = in_size * sizeof(wchar_t);
std::vector<wchar_t> out_buf(out_size);
char* in_buf = const_cast<char*>(str.data());
char* out_ptr = reinterpret_cast<char*>(out_buf.data());
size_t result = iconv(cd, &in_buf, &in_size, &out_ptr, &out_size);
if (result == (size_t)-1)
{
iconv_close(cd);
throw std::runtime_error("iconv conversion failed");
}
std::wstring wstr(out_buf.data(), out_buf.size() - out_size / sizeof(wchar_t));
iconv_close(cd);
return wstr;
}
class KafkaConsumer
{
public:
KafkaConsumer(const std::string& brokers, const std::string& group_id, const std::string& topic)
{
// Kafka 설정
conf = rd_kafka_conf_new();
rd_kafka_conf_set(conf, "bootstrap.servers", brokers.c_str(), errstr, sizeof(errstr));
rd_kafka_conf_set(conf, "group.id", group_id.c_str(), errstr, sizeof(errstr));
rd_kafka_conf_set(conf, "auto.offset.reset", "earliest", errstr, sizeof(errstr));
// Consumer 생성
rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
// Topics 설정
topics = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(topics, topic.c_str(), -1);
rd_kafka_subscribe(rk, topics);
}
void start()
{
// Workflow를 사용한 Kafka 메시지 수신 및 처리
while (true)
{
rd_kafka_message_t* msg = rd_kafka_consumer_poll(rk, 1000); // 1초 동안 대기
if (msg)
{
if (!msg->err)
{
std::string message((char*)msg->payload, msg->len);
// UTF-8로 디코딩
try
{
std::wstring wmessage = utf8_to_wstring(message);
std::wcout << L"Received message: " << wmessage << std::endl;
}
catch (const std::exception& ex)
{
std::cerr << "Error decoding message: " << ex.what() << std::endl;
}
}
else
{
std::cerr << "Error: " << rd_kafka_message_errstr(msg) << std::endl;
}
rd_kafka_message_destroy(msg);
}
}
}
~KafkaConsumer()
{
rd_kafka_consumer_close(rk);
rd_kafka_topic_partition_list_destroy(topics);
rd_kafka_destroy(rk);
}
private:
rd_kafka_t* rk;
rd_kafka_conf_t* conf;
rd_kafka_topic_partition_list_t* topics;
char errstr[512];
};
int main()
{
const std::string brokers = "172.24.195.146:9092";
const std::string group_id = "my_group"; // 이 값은 임의로 지정할 수 있습니다.
const std::string topic = "quickstart-events";
KafkaConsumer consumer(brokers, group_id, topic);
consumer.start();
return 0;
}
728x90
반응형
'BIG DATA > KAFKA' 카테고리의 다른 글
WSL2 에서 KAFKA 설치및 테스트 (0) | 2024.12.24 |
---|---|
c++ comsumper 개발환경 구축 (0) | 2021.06.14 |
goLang consumer 예제 (0) | 2021.06.14 |
kafka 연동을 위한 python producer 소스/테스트 방법 (0) | 2021.06.14 |
kafka 실행및 테스트, 점검법 (0) | 2021.06.14 |
댓글