본문 바로가기
BIG DATA/KAFKA

WSL2 에서 KAFKA 설치및 테스트

by 골든크랩 2024. 12. 24.
728x90
반응형

설치과정이 모두 동일하기 때문에 WSL2가 특별히 중요한게 아니다.

=======================================================================================


01. 다운로드
https://kafka.apache.org/downloads

Scala 2.13  - kafka_2.13-3.9.0.tgz (asc, sha512)

02. 압축풀기-> root 계정으로 해야 함

/home/goldencrab/kafka_2.13-3.9.0


03. 사용할 수 있는 언어들
https://cwiki.apache.org/confluence/display/KAFKA/Clients

04. kafka 기동 

===> 한글이 깨지지 않도록 하려고 하는데....kafka에서 제공하는 툴들에서는 안깨지나, 어플리케이션과 제공툴 사이에선 깨짐.
export KAFKA_OPTS="-Dfile.encoding=UTF-8"


04.1. zookeeper 기동
./zookeeper-server-start.sh ../config/zookeeper.properties

04.2. kafka 기동  ===> 여기서 외부 연동을 위한 PORT 를 설정해주나??
./kafka-server-start.sh ../config/server.properties

listeners=PLAINTEXT://172.24.195.146:9092


05. Topic 생성
 ./kafka-topics.sh --create --topic quickstart-events --bootstrap-server 172.24.195.146:9092 --partitions 1
 
06. Topic 목록 확인
 ./kafka-topics.sh --bootstrap-server 172.24.195.146:9092 --list
 
07. Topic 정보 확인
./kafka-topics.sh --describe --topic quickstart-events --bootstrap-server 172.24.195.146:9092

08. 메시지 생산
./kafka-console-producer.sh --broker-list  172.24.195.146:9092 --topic quickstart-events

09. 메시지 소비
 ./kafka-console-consumer.sh --bootstrap-server 172.24.195.146:9092 --topic quickstart-events --from-beginning

===> 한글이 깨지지 않도록 하려고 하는데....kafka에서 제공하는 툴들에서는 안깨지나, 어플리케이션과 제공툴 사이에선 깨짐.
 ./kafka-console-consumer.sh --bootstrap-server 172.24.195.146:9092 --topic quickstart-events --from-beginning --property print.key=true --property key.separator=" - " --property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer.encoding=UTF8

 10. 테스트
 8번 메시지 생산 콘솔에서 입력한다.


카프카 옵션이 잘 설명된것 같다.

https://waterfogsw.tistory.com/9

 ===========================

 

[Kafka] Kafka 커맨드라인

토픽 생성 kafka-topics.sh를 통해 토픽 관련 명령을 실행할 수 있다. $ bin/kafka-topics.sh \ --create \ --bootstrap-server :9092 \ --partitions 3 \ --replication-factor 1 \ --config retention.ms=172800000 \ --topic hello.kafka create 토

waterfogsw.tistory.com

 

 

응용프로그램 python 으로 개발하기


 pip install confluent-kafka
 
 https://developer.confluent.io/get-started/python/?utm_medium=sem&utm_source=google&utm_campaign=ch.sem_br.nonbrand_tp.prs_tgt.dsa_mt.dsa_rgn.apac_lng.eng_dv.all_con.confluent-developer&utm_term=&creative=&device=c&placement=&gad_source=1&gclid=Cj0KCQiA1Km7BhC9ARIsAFZfEIv5I54rtFW9ecy9WRsP_Q1PfcCNLrlqqFmmEtO3r-u_BneFTgO6NHkaAtC0EALw_wcB#build-producer

 

Apache Kafka and Python - Getting Started Tutorial

How to run a Kafka client application written in Python that produces to and consumes messages from a Kafka cluster, complete with step-by-step instructions and examples.

developer.confluent.io

 

root@goldencrab:~# hostname -I
172.24.195.146

Kafka의 server.properties 파일에서 advertised.listeners 설정을 확인하고, 
WSL2의 IP 주소와 포트를 지정해야 할 수도 있습니다:
advertised.listeners=PLAINTEXT://172.28.32.1:9092

 

 

Python Producer 샘플코드

from confluent_kafka import Producer
import json

producer = Producer({'bootstrap.servers': '172.24.195.146:9092'})

topic_name = "quickstart-events"
message = {
    "userid": 2,
    "gender": "FEMALE",
    "username": "옥순"
}

producer.produce(topic=topic_name, value=json.dumps(message))
producer.flush()  # 토픽에 메시지가 들어갈 때까지 대기

 

 

root@goldencrab:/home/goldencrab/kafka_2.13-3.9.0/bin#  ./kafka-console-consumer.sh --bootstrap-server 172.24.195.146:9092 --topic quickstart-events --from-beginning
Hello
hi
{"userid": 2, "gender": "FEMALE", "username": "\uc625\uc21c"}
{"userid": 3, "gender": "MAIL", "username": "Dennis"}

한글이 깨짐...copilot 을 통해서 이문제를 수정하도록 했다....

 

1. Producer 쪽 샘플

from confluent_kafka import Producer

conf = {'bootstrap.servers': 'localhost:9092'}
producer = Producer(**conf)

def delivery_report(err, msg):
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))

message = '안녕하세요'.encode('utf-8')
producer.produce('your_topic', value=message, callback=delivery_report)
producer.flush()

 

 

2. Consumer 쪽 샘플

from confluent_kafka import Consumer, KafkaException

conf = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'my_group',
    'auto.offset.reset': 'earliest'
}

consumer = Consumer(**conf)
consumer.subscribe(['your_topic'])

try:
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                continue
            else:
                raise KafkaException(msg.error())
        else:
            # 메시지 수신 및 디코딩
            print(msg.value().decode('utf-8'))
finally:
    consumer.close()

 

 

 

 

728x90
반응형

댓글