설치과정이 모두 동일하기 때문에 WSL2가 특별히 중요한게 아니다.
=======================================================================================
01. 다운로드
https://kafka.apache.org/downloads
Scala 2.13 - kafka_2.13-3.9.0.tgz (asc, sha512)
02. 압축풀기-> root 계정으로 해야 함
/home/goldencrab/kafka_2.13-3.9.0
03. 사용할 수 있는 언어들
https://cwiki.apache.org/confluence/display/KAFKA/Clients
04. kafka 기동
===> 한글이 깨지지 않도록 하려고 하는데....kafka에서 제공하는 툴들에서는 안깨지나, 어플리케이션과 제공툴 사이에선 깨짐.
export KAFKA_OPTS="-Dfile.encoding=UTF-8"
04.1. zookeeper 기동
./zookeeper-server-start.sh ../config/zookeeper.properties
04.2. kafka 기동 ===> 여기서 외부 연동을 위한 PORT 를 설정해주나??
./kafka-server-start.sh ../config/server.properties
listeners=PLAINTEXT://172.24.195.146:9092
05. Topic 생성
./kafka-topics.sh --create --topic quickstart-events --bootstrap-server 172.24.195.146:9092 --partitions 1
06. Topic 목록 확인
./kafka-topics.sh --bootstrap-server 172.24.195.146:9092 --list
07. Topic 정보 확인
./kafka-topics.sh --describe --topic quickstart-events --bootstrap-server 172.24.195.146:9092
08. 메시지 생산
./kafka-console-producer.sh --broker-list 172.24.195.146:9092 --topic quickstart-events
09. 메시지 소비
./kafka-console-consumer.sh --bootstrap-server 172.24.195.146:9092 --topic quickstart-events --from-beginning
===> 한글이 깨지지 않도록 하려고 하는데....kafka에서 제공하는 툴들에서는 안깨지나, 어플리케이션과 제공툴 사이에선 깨짐.
./kafka-console-consumer.sh --bootstrap-server 172.24.195.146:9092 --topic quickstart-events --from-beginning --property print.key=true --property key.separator=" - " --property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer.encoding=UTF8
10. 테스트
8번 메시지 생산 콘솔에서 입력한다.
카프카 옵션이 잘 설명된것 같다.
https://waterfogsw.tistory.com/9
===========================
응용프로그램 python 으로 개발하기
root@goldencrab:~# hostname -I
172.24.195.146
Kafka의 server.properties 파일에서 advertised.listeners 설정을 확인하고,
WSL2의 IP 주소와 포트를 지정해야 할 수도 있습니다:
advertised.listeners=PLAINTEXT://172.28.32.1:9092
Python Producer 샘플코드
from confluent_kafka import Producer
import json
producer = Producer({'bootstrap.servers': '172.24.195.146:9092'})
topic_name = "quickstart-events"
message = {
"userid": 2,
"gender": "FEMALE",
"username": "옥순"
}
producer.produce(topic=topic_name, value=json.dumps(message))
producer.flush() # 토픽에 메시지가 들어갈 때까지 대기
root@goldencrab:/home/goldencrab/kafka_2.13-3.9.0/bin# ./kafka-console-consumer.sh --bootstrap-server 172.24.195.146:9092 --topic quickstart-events --from-beginning
Hello
hi
{"userid": 2, "gender": "FEMALE", "username": "\uc625\uc21c"}
{"userid": 3, "gender": "MAIL", "username": "Dennis"}
한글이 깨짐...copilot 을 통해서 이문제를 수정하도록 했다....
1. Producer 쪽 샘플
from confluent_kafka import Producer
conf = {'bootstrap.servers': 'localhost:9092'}
producer = Producer(**conf)
def delivery_report(err, msg):
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
message = '안녕하세요'.encode('utf-8')
producer.produce('your_topic', value=message, callback=delivery_report)
producer.flush()
2. Consumer 쪽 샘플
from confluent_kafka import Consumer, KafkaException
conf = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'my_group',
'auto.offset.reset': 'earliest'
}
consumer = Consumer(**conf)
consumer.subscribe(['your_topic'])
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
raise KafkaException(msg.error())
else:
# 메시지 수신 및 디코딩
print(msg.value().decode('utf-8'))
finally:
consumer.close()
'BIG DATA > KAFKA' 카테고리의 다른 글
kafka prosumer (Python), consumer(C++) 샘플소스 소스 (1) | 2024.12.26 |
---|---|
c++ comsumper 개발환경 구축 (0) | 2021.06.14 |
goLang consumer 예제 (0) | 2021.06.14 |
kafka 연동을 위한 python producer 소스/테스트 방법 (0) | 2021.06.14 |
kafka 실행및 테스트, 점검법 (0) | 2021.06.14 |
댓글