Kafka
- 자신들이 전송하는 데이터가 어떤 시스템에 저장되는지 관계하지 않고 무조건 Kafka 하나만 상대하면 된다.
- 카프카에 쌓였던 데이터 메시지들도 하둡이라던가 모니터링 시스템, 서치 엔진, 다른 어플리케이션, 이메일 쪽으로 데이터를 전달해줄 때도 카프카로부터 받아오면 되기 때문에 단일 포맷을 사용할 수 있다.
- 그럼 보내는 쪽과 받는 쪽이 누가 보냈고 누가 받았는지 전혀 신경쓰지 않는 상태에서 메시지를 주고 받는게 가능해진다.
- 메시지 보내는 쪽을 Producer
- 메시지 받는 쪽을 Concumer라고 한다.
- 특징
- 프로듀서, 컨슈머 분리해서 작업한다.
- 하나의 메시지에 여러 컨슈머 가능
- 다양한 형태의 여러 컨슈머에 전달할 수 있다.
- 높은 처리량을 위한 메시지 최적화 시켜서 내부적으로 보관하고 있다
- 카프카 자체가 클러스터링 구조를 해서 여러 개의 서버로서 구성하여 작동할 수 있다.
- 스케일링 기능 가능
- 다양한 형태의 에코 시스템을 연동해줌으로써 카프카 하나만 가지고 프로듀서, 컨슈머와 같이 보내고 받는 용도만 사용되는 것이 아니라 데이터 스트리밍 서비스를 해준다.
- 아니면 관계형 데이터베이스처럼 SQL과 같은 문법을 제공해주면 쉽게 썼던 스토리징 서비스와 연결이 된다던가 하는 식의 시나리오는 생각해볼 수 있다.
Kafka Boker
- 브로커는 카프카의 서버
- 일반적으로 카프카는 3개 이상의 Broker Cluster을 구성하는 것을 권장한다.
- 그래야 여러 개의 브로커들이 서로 밀접하게 연결되면서 한 곳에 저장되어 있던 어떤 메시지를 다른 쪽에 있는 메시지들도 같이 공유 해줌으로써 하나의 서버에서 문제가 생겼을 때 그걸 대신할 수 있는 또 다른 브로커를 사용할 수 있음으로써 안전하게 메시지를 계속적으로 사용할 수 있다.
- 이렇게 어플리케이션 서버의 상태, 서버의 리더, 문제가 생겼을 때 시스템을 연동해서 사용하는데 카프카에선느 코디네이터로서 아파치의 Zookeeper를 연동해서 사용하는 것이 일반적이다.
- 따라서 메시지를 주고 받을 때 저장되는 공간은 브로커.
- 이러한 브로커들을 중재해주는, 컨트롤 해주는 역할로서 Zookeeper라는 시스템을 사용한다.
Zookeeper
- 주키퍼는 브로커가 어떤 브로커가 있는지, 일반적인 워커 브로커, 그리고 그걸로 컨트롤할 수 있는 컨트롤러 브로커 등의 정보를 가지고 있다.
- 1대는 리더역할을 한다.
- 그런 리더를 선출하고 문제가 생겼을 때 그 리더를 잠시 중지시킨 다음에 정상적인 워커 브로커들을 다시리더로 승격시키는 과정을 담당한다.
Connect
- 아파치 카프카는 독립적으로 그 자체만으로도 메시지 보내는 것과 받는 것을 사용할 수 있지만
- 로고를 연동하는 시스템, 다른 쪽에 있는 리소스로부터 가지고 와서 데이터를 보관하고 있다가 다른 쪽에 있는 타켓 리소스에 전달시켜 줄 수있는 소스와 싱크라는 기능을 이용해서 연결할 수 있는 기능도 제공한다.
- 이러한 모든 설정파일은 config폴더(mac: etc파일)에 들어있다.
server.properties 카프카 설정파일
zookeeper.properties 주키퍼 설정 파일
/bin폴더 : 실행파일(.sh)
windows는 따로 폴더가 있다.
.bat 파일이 windows용이다.
Kafka 사용 시나리오
기능 1) 컨슈머에 메시지 전달
기능 2) 변경된 메시지를 가져오고 그 값을 다른 쪽에 있는 데이터베이스 등 다른쪽 서비스에 전달해주는 카프카 커텍트
시나리오 1 - Kafka
압축파일풀기
tar xvf kafka_2.13-3.6.1.tgz
step1) Zookeeper 구동(윈도우라서 .bat으로 실행)
./bin/windows/zookeeper-server-start.bat ./config/zookeeper.properties
step 2) Kafka 서버 구동
./bin/windows/kafka-server-start.bat ./config/server.properties
step 3) topic 생성
./bin/windows/kafka-topics.bat --create --topic quickstart-events --bootstrap-server localhost:9092 --partitions 1
topic 목록 확인
./bin/windows/kafka-topics.bat --list --bootstrap-server localhost:9092
topic 정보 확인
./bin/windows/kafka-topics.bat --describe --topic quickstart-events --bootstrap-server localhost:9092
step 4) producer 생성
./bin/windows/kafka-console-producer.bat --broker-list localhost:9092 --topic quickstart-events
step 5) consumer 생성
--from-beginning : 처음꺼부터 다 받겠다.
./bin/windows/kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic quickstart-events \
--from-beginning
producer에서 입력하면 consumer에서 보이는 것을 확인할 수 있다.
mariaDB 설치
step 1) mariaDB 설치
- MacOs
- Windows
- PackageType MSI로 설치
이게 패스워드만 신경써두면 돼서 편하다.
password : test1357로 설정하겠다.
- .zip파일로 설치할 경우
등록이 잘 되었는지 확인
step 2) 종속성
order-service
pom.xml
<dependency>
<groupId>org.mariadb.jdbc</groupId>
<artifactId>mariadb-java-client</artifactId>
</dependency>
step 3) 실행
- cmd
mariaDB 서버 실행
mysql.server start
or
- MySQL Client
step 4) 데이터베이스 생성
show databases;
create database mydb;
step 5) 데이터 베이스 확인 및 테이블 생성
프로젝트 실행 후
order-service 포트/h2-consle
테이블 생성
create table users(
id int auto_increment primary key,
user_id varchar(20),
pwd varchar(20),
name varchar(20),
created_at datetime default NOW()
);
Kafka Connect를 통해서 유저의 어떤 데이터가 insert될 때 그 데이터를 감지했다가 데이터의 내용을 새로운 데이터 베이스에 또는 새로운 데이터에 옮기는 작업을 해보자
시나리오 2 - Kafka Connect
1. Kafka Connect
- Kafka Connect를 통해 Data를 Imort/Export 가능
- 코드없이 Configuration으로 데이터를 이동
- Standalone mode, Distribution(분산) mode 지원
- RESTful API를 통해 지원
- Stream 또는 Batch 형태로 데이터 전송 가능
- 커스텀 Connector를 통한 다양한 Plugin 제공(File, S3, Hive, Mysql, etc ...)
가져오는 쪽 : ConnectSource
보내는 쪽 : ConnectSync
Kafka Connect 설치
참고링크 - windows kafka 정상 작동하는 버전 제공문의
https://drive.google.com/file/d/1gCAelonqPAL5rVPReBMk-b0fb7StqwTa/view?usp=sharing
https://www.inflearn.com/questions/601451
kafka 설정 관련해서 올려주신 goolge drive 링크가 만료되었습니다 - 인프런 | 질문 & 답변 (inflearn.com)
step 1. Windows 버전의 Kafka + Kafka Connect 파일 다운
설치 링크(추천X..강의에서 설치링크를 주었지만..)
직접 사이트에서 설치를 추천한다..
Apache Kafka 설치링크 이거만 다운 ㄱㄱ
step 2. zookeeper 실행
./bin/windows/zookeeper-server-start.bat ./config/zookeeper.properties
step 3. kafka 실행
.\bin\windows\kafka-server-start.bat .\config\server.properties
step 4. kafka connect 실행
C:\Work\confluent>.\bin\windows\connect-distributed.bat .\etc\kafka\connect-distributed.properties
/c/kafka_demo/confluent-6.1.0
$ ./bin/windows/connect-distributed.bat ./etc/kafka/connect-distributed.properties
step 5. topic 목록 확인
Kafka Connect를 기동하게 되면 자동으로 생성되는 토픽이 있다.
./bin/windows/kafka-topics.bat --list --bootstrap-server localhost:9092
step 6. kafka connect 플러그인 확인
2. JDBC Connector 설치
Kafka Connect를 통해 데이터를 한쪽에서 읽어와서 다른 쪽으로 전달하기 위해서 관계형 데이터베이스를 사용할 것이다.
java에서 관계형 데이터베이스를 사용하려면 JDBC라이브러리를 이용한다.
그래서 Kafka Connect에서도 사용하려고 하는 소스, 사용하려는 타겟에 맞는 JDBC 커넥터를 설치해야 한다.
step 1.JDBC Connect 설치
- 직접 설치(⭐추천!!!)
다운 링크 https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc
- 터미널 설치(windows의 경우, 직접 설치를 추천한다.. 에러가 많이 떠서 손이 정말 많이 간다...(경험담.. ☠ ☠ ))
kafka랑 connect가 같이 설치되기 때문에
이렇게 설치할 경우에는 zookeeper나 kafka나 여기서 다운된 걸로 사용해야 한다.
가장 주의해야 할 점은 터미널로 설치 했을 경우, 설정파일이 config폴더가 아닌 etc폴더에 있다는 것이다!!
이거 때문에 나중에 connect 서버를 기동할 때 문제가 발생하여 config가 아닌 etc라고 따로 지정해 줘야 한다.
아래 자세히 쓰여 있다.(관련 링크)
curl -O http://packages.confluent.io/archive/5.5/confluent-community-5.5.2-2.12.tar.gz
curl -O http://packages.confluent.io/archive/6.1/confluent-community-6.1.0.tar.gz
압축 풀기
tar xvf confluent-community-7.3.1.tar.gz
cd $KAFKA_CONNECT_HOME
Kafka Connect 실행
./bin/connect-distributed ./etc/kafka/connect-distributed.properties
이거 추가하면 된다.
step 2. JDBC Connector 설정 - windows
etc/kafka/connect-distributed.properties 파일 마지막에 아래 plugin 정보 추가
- plugin.path=[confluentinc-kafka-connect-jdbc-10.7.3 폴더]
kafka에서 connector링크는 jdbc를 사용하겠다.
다양한 connector를 사용할 수 있다.
*Kafka
klafka_2.12-3.4.0.tgz (https://kafka.apache.org/downloads)
* Kafka-connect
confluent-community-7.3.1 (https://www.confluent.io/installation/)
* Kafka-connect-jdbc
confluentinc-kafka-connect-jdnc-10.6.3.zip (https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc)
step 3. MariaDB 적용
JdbcSourceConnector에서 MariaDB를 사용하기 위해 mariadb 드라이버 복사
${USER.HOME}\.m2 폴더에서 mariadb-java-client-2.7.1.jar파일을 ./share/java/kafka/로 복사
이제 터미널에서 connect를 실행한다(올린다).
직접설치한 파일로 실행할 경우에는 설정파일이 etc가 아니라 config에 있다!!!
/c/kafka_demo/confluent-6.1.0
$ ./bin/windows/connect-distributed.bat ./etc/kafka/connect-distributed.properties
터미널에서 connect를 다운로드 했을 경우, 에러가 뜬다.
해결
config폴더에서 실행하는 것을 etc폴더로 바꿔준다.
connect-distributed.bat에서 아래 코드를 추가한다.
3. Kafka Sorce Connect 추가 (MariaDB)
postman
jdbc source(보내는 쪽) connector를 생성(POST요청)
{
"name" : "my-source-connect",
"config" : {
"connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url":"jdbc:mysql://localhost:3306/mydb",
"connection.user":"root",
"connection.password":"test1357",
"mode": "incrementing",
"incrementing.column.name" : "id",
"table.whitelist":"users",
"topic.prefix" : "my_topic_",
"tasks.max" : "1"
}
}
토픽에 데이터가 제대로 들어갔는지, 실제 소스가 만들어졌는지
MySQL client(MariaDB)
1. 전에 만들어둔 db사용
MariaDB [(none)]> use mydb
Database changed
2. 테이블 조회
MariaDB [mydb]> show tables;
+----------------+
| Tables_in_mydb |
+----------------+
| users |
+----------------+
MariaDB [mydb]> desc users;
+------------+-------------+------+-----+---------------------+----------------+
| Field | Type | Null | Key | Default | Extra |
+------------+-------------+------+-----+---------------------+----------------+
| id | int(11) | NO | PRI | NULL | auto_increment |
| user_id | varchar(20) | YES | | NULL | |
| pwd | varchar(20) | YES | | NULL | |
| name | varchar(20) | YES | | NULL | |
| created_at | datetime | YES | | current_timestamp() | |
+------------+-------------+------+-----+---------------------+----------------+
3. db에 변화를 줌.. user insert
insert 2번 했다
MariaDB [mydb]> insert into users(user_id, pwd, name) values('user1','test1111','User name');
Query OK, 1 row affected (0.002 sec)
4. topic조회
jdbc source connector 서버 post요청할 때 만들어둔 topic이 생성된 걸 볼 수 있다.
lsi66@DESKTOP-MNP0GKS MINGW64 /c/kafka_demo/confluent-6.1.0
$ ./bin/windows/kafka-topics.bat --list --bootstrap-server localhost:9092
__consumer_offsets
connect-configs
connect-offsets
connect-status
my_topic_users //✔ 여기
quickstart-events
5. consumer 실행
lsi66@DESKTOP-MNP0GKS MINGW64 /c/kafka_demo/confluent-6.1.0
$ ./bin/windows/kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic my_topic_users --from-beginning
insert했던 값이 나온다. 이 값을 잘 펼쳐보면.. 이렇게 나온다.
{"schema":{
"type":"struct",
"fields":[
{"type":"int32","optional":false,"field":"id"},
{"type":"string","optional":true,"field":"user_id"},
{"type":"string","optional":true,"field":"pwd"},
{"type":"string","optional":true,"field":"name"},
{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"created_at"}
],
"optional":false,
"name":"users"},
"payload":{ // payload : 실제 전달된 데이터
"id":2,
"user_id":"admin",
"pwd":"admin1111",
"name":"Adiministrator",
"created_at":1706887570000}
}
4. Kafka Sink connect 추가(MariaDB)
1. sink 생성
postaman
{
"name":"my-sink-connect",
"config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url":"jdbc:mysql://localhost:3306/mydb",
"connection.user":"root",
"connection.password":"test1357",
"auto.create":"true", // ⭐⭐⭐ topic과 같은 테이블을 생성하겠다.
"auto.evolve":"true",
"delete.enabled":"false",
"tasks.max":"1",
"topics":"my_topic_users"
}
}
2. 생성된 커넥터 조회
정상적으로 sink connector가 만들어졌다는 말은 데이터베이스에 topic이름과 같은 형태의 테이블이 생성되었다는 것을 의미한다.
user insert 해보자.
MariaDB [mydb]> insert into users(user_id, pwd, name) values('user2','user2222','username2');
Query OK, 1 row affected (0.003 sec)
MariaDB [mydb]> insert into users(user_id, pwd, name) values('user3','user3333','username3');
Query OK, 1 row affected (0.004 sec)
users 테이블에 user2, user3가 추가 되었다.
source conntector로 받은 데이터를 sink connector로 전달도니 데이터가
sink connector와 연결된 my_topic_users 테이블에도 추가 되었다.
💡
이 작업을 잘 사용하면 ITC쓰면서 말하는 데이터 이관기, ETL(Extract, Transform, Load) 라고 해서 데이터를 추출해서 데이터를 가공이라던가 변형을 해서 다른 쪽에 데이터를 저장하거나 보내주는 역할을 할 수 있다.
물론 별도의 ETL 도구를 사용하실 수도 있겠지만 지금 하고 있는 카프카 메시징 서비스를 이용해서 카프카 커넥트만 가지고도 싱크 커넥트와 소스 커넥트 잘 조합하시게 되면 한쪽의 데이터 소스에서 다른 쪽에 있는 데이터 타겟까지 원하시는 데이터를 변형해서 전달하는 게 가능해지겠죠.
➕ Kafka Producer를 이용하여 Kafka Topic에 데이터 직접 전송
- Kafka-console-producer에서 데이터 전송 → Topic에 추가 → MariaDB에 추가를 해보자
Kafka-console-producer 실행
source connctor에서 만든 topic에 데이터를 직접 넣었기 때문에 해당 topic과 연결된 테이블에만 추가 되었다.
Kafka Connect 순서 정리
step 1. 설치
step 2. zookeeper 실행
./bin/windows/zookeeper-server-start.bat ./config/zookeeper.properties
step 3. kafka 실행
./bin/windows/kafka-server-start.bat ./config/server.properties
step 4. kafka connect 실행
/c/kafka_demo/confluent-6.1.0
$ ./bin/windows/connect-distributed.bat ./etc/kafka/connect-distributed.properties
// etc가 아니라 config일 수도 있음..
step 5. kafka source connector 실행
jdbc source(보내는 쪽) connector를 생성
여기서 JSON설정으로 기동하고 나서 해당 DB에 insert 하고 topic을 살펴보면 데이터가 와있을것임)
http://127.0.0.1:8083/connectors (POST요청)
{
"name" : "my-source-connect",
"config" : {
"connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url":"jdbc:mysql://localhost:3306/mydb",
"connection.user":"root",
"connection.password":"test1357",
"mode": "incrementing",
"incrementing.column.name" : "id",
"table.whitelist":"users",
"topic.prefix" : "my_topic_",
"tasks.max" : "1"
}
}
step 6. insert 등 DB에 변화주기
MariaDB [mydb]> insert into users(user_id, pwd, name) values('user1','test1111','User name');
Query OK, 1 row affected (0.002 sec)
step 7. kafka sink connector 실행
jdbc sink connector를 생성
(여기서 JSON설정으로 기동하고 나서 DB에 insert를 하면 sink DB쪽에도 추가가될것임)
http://127.0.0.1:8083/connectors (POST요청)
{
"name" : "my-source-connect",
"config" : {
"connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url":"jdbc:mysql://localhost:3306/mydb",
"connection.user":"root",
"connection.password":"test1357",
"mode": "incrementing",
"incrementing.column.name" : "id",
"table.whitelist":"users",
"topic.prefix" : "my_topic_",
"tasks.max" : "1"
}
}