본문 바로가기

java/스프링인액션

[8.3] 카프카 사용하기

카프카란?

아파치 재단의 카프카는 pub-sub모델의 메세지 큐이고, 분산환경에 특화되어 설계되어 있다는 특징을 가짐으로써, 기존의 RabbitMQ와 같은 다른 메세지큐와의 성능 차이가 난다(훨씬 빠르게 처리한다). 그 외에도 클러스터 구성, fail-over, replication와 같은 여러 가지 특징들을 가지고 있다.

 

서버가 꺼진다. 갑작스럽게 전원이 내려가는 현상에서도

데이터를 손실없이 복구 할 수 있다.

데이터를 효과적으로 많이 처리할 수 있다.

https://www.youtube.com/watch?v=waw0XXNX-uQ

데이터 전송          데이터를 가져옴

프로듀서: 데이터를 넣는다

컨슈머: 데이터를 가져간다

 

 

메세지 브로커다

특유의 아키텍처를 가진다.

 

클러스터: 높은 확장성 제공

토픽을 파티션으로 분할하여 메시지를 관리한다

 

RabbitMQ가 거래소,를 사용 --> 메시지 처리

카프카 토픽 사용

 

카프카 토픽 : 클러스터의 모든 브로커에 걸쳐 복제된다 replicated

클러스터의 각 노드:

     토픽에 대한 리더로 동작,

     토픽 데이터를 관리,

     클러스터의 다른 노드로 데이터 복제

 

각 토픽은 여러 개의 파티션으로 분할

클러스터의 각 노드는 한 토픽의 하나 이상의 파티션의 리더가 된다.

 

 

스프링을 사용해서 카프카로부터 메시지 전송, 받는 법

 

 

1. 카프카 사용을 위해 스프링 설정하기

pom.xml

의존성 추가

 

 

kafakaTemplate는

카프카 브로커: localhost:9092를 리스닝 한다.

실무: 다른 호스트와 다른 포트 사용해라

 

spring.kafka.bootstrap-servers 속성

카프카 클러스터로 초기 연결에 사용되는 카프카 서버들의 위치를 설정한다.

 

클러스터 카프카 서버 중 하나가 

kafka.tacocloud.com:9092 YAML 파일에 적으라. 리스닝함

kafka.tacocloud.com:9093

kafka.tacocloud.com:9094

 

메시지 전송, 받을 준비가 됨

 

 

2. kafkaTemplate을 사용해서 메시지 전송하기 

kafkaTemplate메서드

제네릭 타입을 사용한다.

메시지를 전송할 때 직접 도메인 타입을 처리할 수 있다.

모든 send 메서드가 convertAndSend() 기능을 갖고 있다.

 

매개변수

메시지가 전송되는 방법

- 메시지가 전송될 토픽

- 토픽 데이터를 쓰는 파티션

- 레코드 전송 키

- 타임스탬프

- 페이로드

 

 

파티션 저장소 안에 분리되어진 공간이다. 파티션의 목적은 데이터를 더 많이 더 빨리 보내고 처리할 수 있는 것을 위해서 만들어진 것으로 파티션이 없다면 마치 1차선으로 된 도로를 달리는 것과 같지만, 파티션을 늘리고 프로듀서를 늘리게 될 경우 N차선의 도로를 달리는 것과 같은 효과를 얻는다.

출처: https://needjarvis.tistory.com/603 [자비스가 필요해]
 
토픽 저장소 이름

카프카(Kafka)는 데이터를 최종적으로 토픽(Topic)이라는 곳에 저장을 하며 데이터를 구분하기 위한 분류값 혹은 구분된 저장소


출처: https://needjarvis.tistory.com/603 [자비스가 필요해]

큐라고 생각하면 쉽다

페이로드  
ProducerRecord 모든 성행 매개변수들을 하나의 객체에 담은 타입

 

send() 메서드 사용

주문 데이터 전송

OrderMessagingService

 

 

kafkaTemplate.send("tacocloud.orders.topic", order); // tacocloud.orders.topic 토픽 이름으로 Order객체를 전송한다.

 

 

spring.kafka.template.default-topic 속성에

tacocloud.orders.topic을 기본 토픽으로 설정한다.// 토픽 이름

 

 

kafkaTemplate.send("tacocloud.orders.topic", order);

kafkaTemplate.sendDefault(order);

 

 

 

 

 

3. 카프카 리스너 작성하기

메시지를 수신하는 메서드 없음

메시지 리스너:  카프카 토픽 메시지를 가져오는 유일한 방법

@KafakaListner 어노테이션이 지정된 메서드 == 메시지 리스너

@KafakaListner  @JmsListner @RabbitListner
     

 

 

@KafakaListner(topics="tacocloud.orders.topic") // 토픽이름에 메시지가 도착할때
public void handle(Order order){ // 페이로드: Order객체
 ui.displayOrder(order);
 }


@KafakaListner(topics="tacocloud.orders.topic") // 토픽이름에 메시지가 도착할때
public void handle(Order order, ConsumerRecord<Order> record){ //추가 메타데이터
// 받은 메세지의 파티션, 타임스탬프 로깅하기 위해
 log.info("메세지 받음 from 파티션: {}, 시간:{}", record.partition(), record.timestamp() );
 ui.displayOrder(order);
 }
 
 @KafakaListner(topics="tacocloud.orders.topic") // 토픽이름에 메시지가 도착할때
public void handle(Order order, Message<Order> message){ //추가 메타데이터
// 받은 메세지의 파티션, 타임스탬프 로깅하기 위해
 MessageHeaders headers = message.getHeaders();
 log.info("메세지 받음 from 파티션: {}, 시간:{}", message.get(KafkaHeaders.받은_파티션_아이디), message.get(KafkaHeaders.받은_타임스탬프) );
 ui.displayOrder(order);
 }
 
 //메서지 페이로드
 ConsumerRecord.value()
 Message.getPayload()

 

4. 비동기 메세지 전송과 수신 기능 추가 -> 타코 클라우드 어플리케이션 빌드, 실행.

'java > 스프링인액션' 카테고리의 다른 글

10. 리액터  (0) 2021.12.12
8. 비동기 메시지 전송  (0) 2021.12.05
[6.3] 스프링 데이터 REST  (0) 2021.11.27
[6.3]  (0) 2021.11.27
챕터4 스프링 시큐리티  (0) 2021.11.20