카프카란?
아파치 재단의 카프카는 pub-sub모델의 메세지 큐이고, 분산환경에 특화되어 설계되어 있다는 특징을 가짐으로써, 기존의 RabbitMQ와 같은 다른 메세지큐와의 성능 차이가 난다(훨씬 빠르게 처리한다). 그 외에도 클러스터 구성, fail-over, replication와 같은 여러 가지 특징들을 가지고 있다.
서버가 꺼진다. 갑작스럽게 전원이 내려가는 현상에서도
데이터를 손실없이 복구 할 수 있다.
데이터를 효과적으로 많이 처리할 수 있다.
데이터 전송 데이터를 가져옴
프로듀서: 데이터를 넣는다
컨슈머: 데이터를 가져간다
메세지 브로커다
특유의 아키텍처를 가진다.
클러스터: 높은 확장성 제공
토픽을 파티션으로 분할하여 메시지를 관리한다
RabbitMQ가 거래소, 큐를 사용 --> 메시지 처리
카프카 토픽 사용
카프카 토픽 : 클러스터의 모든 브로커에 걸쳐 복제된다 replicated
클러스터의 각 노드:
토픽에 대한 리더로 동작,
토픽 데이터를 관리,
클러스터의 다른 노드로 데이터 복제
각 토픽은 여러 개의 파티션으로 분할
클러스터의 각 노드는 한 토픽의 하나 이상의 파티션의 리더가 된다.
스프링을 사용해서 카프카로부터 메시지 전송, 받는 법
1. 카프카 사용을 위해 스프링 설정하기
pom.xml
의존성 추가
kafakaTemplate는
카프카 브로커: localhost:9092를 리스닝 한다.
실무: 다른 호스트와 다른 포트 사용해라
spring.kafka.bootstrap-servers 속성
카프카 클러스터로 초기 연결에 사용되는 카프카 서버들의 위치를 설정한다.
클러스터 카프카 서버 중 하나가
kafka.tacocloud.com:9092 YAML 파일에 적으라. 리스닝함
kafka.tacocloud.com:9093
kafka.tacocloud.com:9094
메시지 전송, 받을 준비가 됨
2. kafkaTemplate을 사용해서 메시지 전송하기
kafkaTemplate메서드
제네릭 타입을 사용한다.
메시지를 전송할 때 직접 도메인 타입을 처리할 수 있다.
모든 send 메서드가 convertAndSend() 기능을 갖고 있다.
매개변수
메시지가 전송되는 방법
- 메시지가 전송될 토픽
- 토픽 데이터를 쓰는 파티션
- 레코드 전송 키
- 타임스탬프
- 페이로드
파티션 | 저장소 안에 분리되어진 공간이다. 파티션의 목적은 데이터를 더 많이 더 빨리 보내고 처리할 수 있는 것을 위해서 만들어진 것으로 파티션이 없다면 마치 1차선으로 된 도로를 달리는 것과 같지만, 파티션을 늘리고 프로듀서를 늘리게 될 경우 N차선의 도로를 달리는 것과 같은 효과를 얻는다. 출처: https://needjarvis.tistory.com/603 [자비스가 필요해] |
키 | |
토픽 | 저장소 이름 카프카(Kafka)는 데이터를 최종적으로 토픽(Topic)이라는 곳에 저장을 하며 데이터를 구분하기 위한 분류값 혹은 구분된 저장소 출처: https://needjarvis.tistory.com/603 [자비스가 필요해] 큐라고 생각하면 쉽다 |
페이로드 | |
ProducerRecord | 모든 성행 매개변수들을 하나의 객체에 담은 타입 |
send() 메서드 사용
주문 데이터 전송
OrderMessagingService
kafkaTemplate.send("tacocloud.orders.topic", order); // tacocloud.orders.topic 토픽 이름으로 Order객체를 전송한다.
spring.kafka.template.default-topic 속성에
tacocloud.orders.topic을 기본 토픽으로 설정한다.// 토픽 이름
kafkaTemplate.send("tacocloud.orders.topic", order);
kafkaTemplate.sendDefault(order);
3. 카프카 리스너 작성하기
메시지를 수신하는 메서드 없음
메시지 리스너: 카프카 토픽 메시지를 가져오는 유일한 방법
@KafakaListner 어노테이션이 지정된 메서드 == 메시지 리스너
@KafakaListner | @JmsListner | @RabbitListner |
@KafakaListner(topics="tacocloud.orders.topic") // 토픽이름에 메시지가 도착할때
public void handle(Order order){ // 페이로드: Order객체
ui.displayOrder(order);
}
@KafakaListner(topics="tacocloud.orders.topic") // 토픽이름에 메시지가 도착할때
public void handle(Order order, ConsumerRecord<Order> record){ //추가 메타데이터
// 받은 메세지의 파티션, 타임스탬프 로깅하기 위해
log.info("메세지 받음 from 파티션: {}, 시간:{}", record.partition(), record.timestamp() );
ui.displayOrder(order);
}
@KafakaListner(topics="tacocloud.orders.topic") // 토픽이름에 메시지가 도착할때
public void handle(Order order, Message<Order> message){ //추가 메타데이터
// 받은 메세지의 파티션, 타임스탬프 로깅하기 위해
MessageHeaders headers = message.getHeaders();
log.info("메세지 받음 from 파티션: {}, 시간:{}", message.get(KafkaHeaders.받은_파티션_아이디), message.get(KafkaHeaders.받은_타임스탬프) );
ui.displayOrder(order);
}
//메서지 페이로드
ConsumerRecord.value()
Message.getPayload()
4. 비동기 메세지 전송과 수신 기능 추가 -> 타코 클라우드 어플리케이션 빌드, 실행.
'java > 스프링인액션' 카테고리의 다른 글
10. 리액터 (0) | 2021.12.12 |
---|---|
8. 비동기 메시지 전송 (0) | 2021.12.05 |
[6.3] 스프링 데이터 REST (0) | 2021.11.27 |
[6.3] (0) | 2021.11.27 |
챕터4 스프링 시큐리티 (0) | 2021.11.20 |