Order-service 요청된 주문의 수량 정보를 Catalog-service에 반영
Order-service에서 kafka Topic으로 메시지 전송 → Producer
Catalog-service에서 kafaka topic에 전송된 메시지 취득 → Consumer
Concurrent kafkaListenerContainerFactory : 토픽에 변경 사항이 있는지 계속 리스닝하고 있고 이벤트가 발생했을 때 그걸 캐치할 수 있는 리스너
데이터를 지정해 줄 때 토픽에 저장되는 값이 어떤 형태로 저장될지 지정할 수 있다. 그 중 하나가 key-value값이다.
그래서 key-value값으로 저장되었을 때, 역으로 해석해서 사용해야 한다.
그때 Deserializer 라는 타입을 지정하게 된다.
데이터를 하나 만들어서 다른 쪽으로 전달하기 위한 용도로서 압축하는 과정을 serializer라고 하고 원래 형태로 푸는 것을 Deserializer라고 한다.
전달하는 방식은 String 형태로 쓸 것이다.
카프카 토픽에 데이터를 보내기 위해서 사용되는 객체, KafkaTemplate을 이용해서 데이터 전달
caltalog-service
pom.xml
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
kafkaConsumerConfig
@EnableKafka
@Configuration
public class kafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() { // 접속 정보 설정
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupId"); // TODO 더 알아보기
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // Deserializer: KEY 풀어주기
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // VALUE 풀어주기
return new DefaultKafkaConsumerFactory<>(properties);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { // consumerFactory를 등록
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory = new ConcurrentKafkaListenerContainerFactory<>();
kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());
return kafkaListenerContainerFactory;
}
}
KafkaConsumer
@Service
@Slf4j
public class KafkaConsumer {
CatalogRepository repository;
@Autowired
public KafkaConsumer(CatalogRepository repository) {
this.repository = repository;
}
@KafkaListener(topics = "example-catalog-topic")
public void updateQty(String kafkaMessage) {
log.info("Kafka message: ->" + kafkaMessage);
Map<Object, Object> map = new HashMap<>();
ObjectMapper mapper = new ObjectMapper(); // 👀 아래 설명..메시지를 직렬화해서 전달된다..ObjectMapper를 이용해 역직렬화해서 사용한다
// TODO 역직렬화
try {
map = mapper.readValue(kafkaMessage, new TypeReference<Map<Object, Object>>() {});
} catch (JsonProcessingException ex) {
ex.printStackTrace();
}
CatalogEntity entity = repository.findByProductId((String) map.get("productId"));
if (entity != null) {
entity.setStock(entity.getStock() - (Integer)map.get("qty")); //(Integer): object이기 때문에 integer로 바꿔줌..
repository.save(entity);
}
}
}
map = mapper.readValue(kafkaMessage, new TypeReference<Map<Object, Object>>() {});
● readValue : 메서드를 사용하여 JSON 문자열을 Java 객체로 변환,
Kafka 메시지를 받아와서 해당 메시지를 JSON 형식으로 역직렬화
그 결과로 map변수에는 역직렬화된 JSON 데이터가 Java의 Map 형식으로 저장
첫 번째 인자로 JSON 문자열을, 두 번째 인자로는 어떤 타입으로 역직렬화할지에 대한 정보를 받는다.
● kafkaMessage : Kafka로부터 받아온 JSON 형식의 문자열
● TypeReference : Map<Object, Object> 형식으로 역직렬화하도록 지정
order-service
pom.xml
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
kafkaProducerConfig
@EnableKafka
@Configuration
public class kafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() { // 접속 정보 설정
Map<String, Object> properties = new HashMap<>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // KEY 풀어주기
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // VALUE 풀어주기
return new DefaultKafkaProducerFactory<>(properties);
}
@Bean //데이터 전달하는 용도의 카프카템플릿
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
KafkaProducer
@Service
@Slf4j
public class KafkaProducer {
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public OrderDto send(String topic, OrderDto orderDto) {
ObjectMapper mapper = new ObjectMapper();
String jsonInString = "";
try {
jsonInString = mapper.writeValueAsString(orderDto); // 👀 아래 설명
} catch (JsonProcessingException ex) {
ex.printStackTrace();
}
kafkaTemplate.send(topic, jsonInString);
log.info("kafka Producer sent data from the Order microservice:" + orderDto);
return orderDto;
}
}
jsonInString = mapper.writeValueAsString(orderDto);
ObjectMapper를 생성합니다. 이것은 Jackson 라이브러리의 일부로, Java 객체를 JSON 형식으로 직렬화하고, JSON 데이터를 객체로 역직렬화하는 데 사용
mapper.writeValueAsString(orderDto)를 사용하여 orderDto 객체를 JSON 형식의 문자열로 변환합니다.
writeValueAsString 메서드는 주어진 객체를 JSON 문자열로 변환하여 반환합니다.
이 메서드는 예외 처리를 위해 JsonProcessingException을 throw할 수 있습니다.
더 알아보기
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupId"); // TODO 더 알아보기