- Orders Service에 요청된 주문 정보를 DB가 아니라 Kafka Topic으로 전송
- Kafka Topic에 설정된 Kafka Sink Connect를 사용해 단일 DB에 저장 → 데이터 동기화
H2 DB → MariaDB 변경
step 1. Orders Service의 JPA데이터베이스 교체 (H2 DB → MariaDB)
- MariaDB에서 table 생성
show databases;
use mydb; //사용하고자 하는 database명 입력
show tables;
orders 테이블 생성(mariaDB에서 테이블 생성해두기)
매번 실행할 때 생성하지 말고 그냥 하나 생성해둠..
create table orders (
id int auto_increment primary key,
user_id varchar(50) not null,
product_id varchar(20) not null,
order_id varchar(50) not null,
qty int default 0,
unit_price int default 0,
total_price int default 0,
created_at datetime default now()
);
- application.yml에서 데이터 소스 바꾸기 (H2 DB → MariaDB)
<dependency>
<groupId>org.mariadb.jdbc</groupId>
<artifactId>mariadb-java-client</artifactId>
</dependency>
spring:
application:
name: order-service
h2:
console:
enabled: true
settings:
web-allow-others: true
path: /h2-console
datasource:
# 1 H2
# driver-class-name: org.h2.Driver
# url: jdbc:h2:mem:testDB
# 2 MariaDB ✔여기 수정
url: jdbc:mariadb://localhost:3306/mydb
driver-class-name: org.mariadb.jdbc.Driver
username: root
password: test1357
step2. order 넣기
mariaDB로 변경이 잘 되어 order가 들어간 것을 확인할 수 있다.
MariaDB [mydb]> select * from orders;
+----+--------------------------------------+-------------+--------------------------------------+------+------------+-------------+---------------------+
| id | user_id | product_id | order_id | qty | unit_price | total_price | created_at |
+----+--------------------------------------+-------------+--------------------------------------+------+------------+-------------+---------------------+
| 1 | e19e505e-f467-4c95-898e-e6dab828d86c | CATALOG-003 | 88807c53-1304-4fd7-90a5-6f91457edd9d | 10 | 100 | 1000 | 2024-02-16 16:14:21 |
+----+--------------------------------------+-------------+--------------------------------------+------+------------+-------------+---------------------+
1 row in set (0.002 sec)
Orders Service에서 주문을 하면 User Serivce에 반영이 되는지 테스트해보자.
Orders Service
OrderController
step3. kafka로 변경
@RestController
@RequestMapping("/order-service")
public class OrderController {
// .. 생략 ..
@PostMapping("/{userId}/orders")
public ResponseEntity<ResponseOrder> createOrder(@PathVariable("userId") String userId, @RequestBody RequestOrder orderDetails) {
// .. 생략 ..
/* jpa */
// OrderDto createdOrder = orderService.createOrder(orderDto);
// ResponseOrder responseOrder = mapper.map(createdOrder, ResponseOrder.class);
/* kafka ✔ 여기추가*/
orderDto.setOrderId(UUID.randomUUID().toString());
orderDto.setTotalPrice(orderDto.getQty() * orderDto.getUnitPrice());
ResponseOrder responseOrder = mapper.map(orderDto, ResponseOrder.class);
/* send this order to the kafka */
kafkaProducer.send("example-catalog-topic", orderDto); // order와 catalog 테스트
kafkaProducer.send("orders", orderDto); // order와 uesr 테스트
return ResponseEntity.status(HttpStatus.CREATED).body(responseOrder);
}
// .. 생략 ..
}
메시지등록
이렇게 클래스를 만들어 놓으면 ObjectMapper같은 클래스를 API를 이용해서 가지고 있었던 Java의 오브젝트를 JSON으로 쉽게 변경할 수 있다.
step4. Schema, Field, Payload, KafkaDto 클래스 생성
@Data
@AllArgsConstructor
public class Field {
private String type;
private boolean optional;
private String tield;
}
@Data
@Builder
public class Schema {
private String type;
private List<Field> fields;
private boolean optional;
private String name;
}
@Data
@Builder
public class Payload {
private String order_id;
private String user_id;
private String product_id;
private int qty;
private int unit_price;
private int total_price;
}
@Data
@AllArgsConstructor
public class KafkaOrderDto implements Serializable {
private Schema schema;
private Payload payload;
}
@Service
@Slf4j
public class OrderProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
List<Field> fields = Arrays.asList(new Field("string", true, "order_id"), // asList : 배열을 리스트로 바꾸기
new Field("string", true, "user_id"),
new Field("string", true, "product_id"),
new Field("int32", true, "qty"),
new Field("int32", true, "unit_price"),
new Field("int32", true, "total_price")
);
Schema schema = Schema.builder()
.type("struct")
.fields(fields)
.optional(false)
.name("orders")
.build();
@Autowired
public OrderProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public OrderDto send(String topic, OrderDto orderDto) {
Payload payload = Payload.builder()
.order_id(orderDto.getOrderId())
.user_id(orderDto.getUserId())
.product_id(orderDto.getProductId())
.qty(orderDto.getQty())
.unit_price(orderDto.getUnitPrice())
.total_price(orderDto.getTotalPrice())
.build();
KafkaOrderDto kafkaOrderDto = new KafkaOrderDto(schema, payload);
ObjectMapper mapper = new ObjectMapper();
String jsonInString = "";
try {
jsonInString = mapper.writeValueAsString(kafkaOrderDto);
} catch (JsonProcessingException ex) {
ex.printStackTrace();
}
kafkaTemplate.send(topic, jsonInString);
log.info("Order Producer sent data from the Order microservice:" + kafkaOrderDto);
return orderDto;
}
}
step5. Orders Service를 위한 kafka Sink connector 추가
{
"name":"my-order-sink-connect",
"config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url":"jdbc:mysql://localhost:3306/mydb",
"connection.user":"root",
"connection.password":"test1357",
"auto.create":"true",
"auto.evolve":"true",
"delete.enabled":"false",
"tasks.max":"1",
"topics":"orders"
}
}
connector 확인
테스트
- Orders Service 2개 기동
#1번 기동
#2번 기동
- 주문하기
- Order Serivce 2개 중 어떤 걸로 들어갔는지 콘솔 확인
#1번으로 들어온 것을 확인했다.
여러번 주문을 해보았다.
- 주문이 DB 1개로 동기화가 잘 되었을까?
주문한게 모두 한 orders 테이블에 들어가 있는 것을 확인할 수 있다.
MariaDB [mydb]> select * from orders;
+----+--------------------------------------+-------------+--------------------------------------+------+------------+-------------+---------------------+
| id | user_id | product_id | order_id | qty | unit_price | total_price | created_at |
+----+--------------------------------------+-------------+--------------------------------------+------+------------+-------------+---------------------+
| 1 | e19e505e-f467-4c95-898e-e6dab828d86c | CATALOG-003 | 88807c53-1304-4fd7-90a5-6f91457edd9d | 10 | 100 | 1000 | 2024-02-16 16:14:21 |
| 2 | e19e505e-f467-4c95-898e-e6dab828d86c | CATALOG-001 | 93bb30e5-3e09-4a33-b5e5-4f3c3e484267 | 12 | 1000 | 12000 | 2024-02-16 23:34:34 |
| 3 | e19e505e-f467-4c95-898e-e6dab828d86c | CATALOG-004 | 61f7cf0b-2330-43d2-a2a0-42b9f9224746 | 12 | 1000 | 12000 | 2024-02-16 23:34:59 |
| 4 | e19e505e-f467-4c95-898e-e6dab828d86c | CATALOG-005 | b9e6ad89-8849-4fa7-be0e-b2a5cb912450 | 13 | 1000 | 13000 | 2024-02-16 23:36:27 |
+----+--------------------------------------+-------------+--------------------------------------+------+------------+-------------+---------------------+
4 rows in set (0.002 sec)
- 한 유저의 주문내역도 볼 수 있다.
👀 헷갈리는 것/추가적으로 궁금한 것
헷갈리는 것/추가적으로 궁금한 것(정리해서 게시글로 정리해 뒀음)
👀 Connect와 Connector 차이
Apache Kafka에서는 Kafka Connect라는 이름으로 데이터 이동을 위한 도구를 제공합니다.
이 도구의 핵심 구성 요소는 Connectors(커넥터)입니다.
👀 Kafka 와 Kafka Connector 차이
Kafka
목적: 데이터를 안전하게 저장하고, 실시간으로 처리하며, 다양한 애플리케이션 간에 데이터를 전송하는 플랫폼입니다. 데이터의 Pub/Sub(발행/구독)을 중심으로 데이터를 관리합니다.
역할: 메시지 큐나 데이터 파이프라인의 역할을 수행하여 데이터를 안전하게 저장하고, 다양한 소비자에게 실시간으로 전달합니다.
사용 사례: 대규모 데이터 스트리밍, 로그 처리, 이벤트 기반 마이크로서비스 아키텍처, 데이터 수집 및 분석 등에 사용됩니다.
구성 요소: 브로커, 토픽, 파티션, 프로듀서, 컨슈머 등으로 구성됩니다.
확장성: 대규모 데이터 스트리밍에 대한 높은 확장성과 처리량을 제공합니다.
Kafka Connector
목적: Kafka와 외부 시스템 간에 데이터를 이동시키는 도구로, 데이터의 이전, 호환성 유지, 데이터 스트리밍 등을 담당합니다.
역할: 외부 시스템과의 연동을 담당하여 데이터를 Kafka로 가져오거나 Kafka에서 다른 시스템으로 보내는 작업을 처리합니다.
사용 사례: 다양한 데이터베이스, 파일 시스템, 메시징 시스템 등과의 통합, 데이터의 이전과 호환성 유지, 대량 데이터의 스트리밍 등에 사용됩니다.
구성 요소: Source Connector(데이터를 Kafka로 가져오는), Sink Connector(데이터를 Kafka에서 다른 시스템으로 보내는)로 구성됩니다.
확장성: 다양한 외부 시스템과의 통합을 지원하여 데이터 이전 및 호환성 유지를 편리하게 처리할 수 있습니다.
👩 나의 경험담..
kafka는 데이터만 전송(save따로 코드 구현 필요)하고
kafka connect는 데이터를 전송하면 db에 save까지 한번에 됐다.
👀 Kafka Connector 장단점
장점
1. DB가 달라도 싱크 가능(이기종 db) - Kafka Connector만이 코드없이 가능
2. 마이크로서비스환경에서 서로 다른 db의 싱크를 손쉽게(코드없이) 대부분 보장 가능
3. Event Sourcing, CQRS, Saga 패턴 등을 사용하지 않고도 어느정도 싱크가 보장됨
단점
1. 서버하나 더 돌려야 된다.
Kafka Connect서버가 내려갈 때 연동되어 있는 db 모두 global lock이 걸린다.(connect 재기동하려면 유저가 없는 새벽에 한다.)
2. 주기적으로 db가 변동이 있는지 확인하기 위한 쿼리가 계속 나가고 있음(jdbc Connector)
이거 막으려면 db debezium connector를 사용하면 쿼리 안나가는 방식으로 가능하긴 함
3. DB 커스텀이 어렵다.
A_DB와 B_DB가 Connect로 연결이 되어 있는데
A_DB에 save할 시, B_DB에 save하면서 다른 필드에는 -1을 하고 싶을 때 안됨.
왜냐 둘이 이미 싱크 맞아서 하나라고 봐도 무방하기 때문..