5. Kafka 메세지 수신 용 Consumer 생성 및 메세지 처리

5.1. 저장 할 Entity 및 Repository 생성

  • 주문 저장 될 Entity
package com.toy.entity;

import jakarta.persistence.*;
import lombok.*;

import java.time.LocalDateTime;

@Entity
@Table(name = "orders")
@Getter
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class Order {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    // 주문 고유 번호
    @Column(nullable = false, unique = true)
    private String orderNumber;

    // 상품 ID
    private Long productId;

    // 주문자 ID
    private String userId;

    // 주문 수량
    private int quantity;

    // 주문 상태 (PROCESSING, DONE, FAILED)
    @Enumerated(EnumType.STRING)
    private OrderStatus status;

    private LocalDateTime orderedAt;

    public enum OrderStatus {
        PROCESSING, DONE, FAILED
    }
}

 

  • Repository
package com.toy.repository;

import com.toy.entity.Order;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;

@Repository
public interface OrderRepository extends JpaRepository<Order, Long> {
    
}

 

5.2. Consumer 생성 (Kafka Listener)

  • topics : 수신 할 토픽 지정
  • groupId : 메세지 관리 그룹 지정
package com.toy.kafka;

import com.toy.dto.OrderRequestDto;
import com.toy.service.OrderStorageService;
import lombok.RequiredArgsConstructor;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor
public class OrderConsumer {
    private final OrderStorageService orderStorageService;

    @KafkaListener(
            topics = "orders",
            groupId = "order-consumer-group"
    )
    public void consume(OrderRequestDto orderDto) {
        orderStorageService.store(orderDto);
    }
}

 

 

728x90

 

5.3. 메세지 처리 서비스 생성

  • Kafka 메세지 객체 Class 에 entity 객체로 변환하는 메서드 추가
public class OrderRequestDto {

	... 생략

    public Order toEntity() {
        return Order.builder()
                .orderNumber(orderNumber)
                .productId(productId)
                .userId(userId)
                .quantity(quantity)
                .status(Order.OrderStatus.PROCESSING)
                .orderedAt(LocalDateTime.now())
                .build();
    }
}

 

  • 주문 데이터 저장 Service 생성
package com.toy.service;

import com.toy.dto.OrderRequestDto;
import com.toy.repository.OrderRepository;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;

@Service
@RequiredArgsConstructor
public class OrderStorageService {
    private final OrderRepository orderRepository;

    public void store(OrderRequestDto order) {
        orderRepository.save(order.toEntity());
    }
}

 

 

5.4. API 호출

Swagger UI (혹은 Postman 등) 활용하여 API 호출

 

 

5.5. Database 저장 확인

DataBase 조회 및 정상 저장 확인

SELECT * FROM orders;

 

728x90
반응형

4. Kafka 메세지 전송 용 API 및 Producer 생성

4.1. API 에서 사용 될 Request / Response DTO 생성

  • API Request 와 Kafka 메세지로 사용 될 DTO
package com.toy.dto;

import com.toy.entity.Order;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;

import java.time.LocalDateTime;

@Getter
@NoArgsConstructor
@AllArgsConstructor
@Builder
@Schema(description = "주문 등록 Request")
public class OrderRequestDto {

    @Schema(description = "주문 번호")
    private String orderNumber;

    @Schema(description = "상품 ID")
    private Long productId;

    @Schema(description = "사용자 ID")
    private String userId;

    @Schema(description = "수량")
    private int quantity;
}

 

  • API Response 에서 사용 될 DTO
package com.toy.dto;

import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;

@Getter
@Builder
@Schema(description = "API 공통 Response")
public class ApiResponseDto {
    @Schema(description = "성공여부")
    private boolean result;
    @Schema(description = "메세지")
    private String message;
}

 

4.2. 주문 등록 API에 사용 될 Controller, Service 생성

  • Controller
package com.toy.controller;

import com.toy.dto.ApiResponseDto;
import com.toy.dto.OrderRequestDto;
import com.toy.service.OrderService;
import io.swagger.v3.oas.annotations.Operation;
import lombok.RequiredArgsConstructor;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;

@RestController
@RequiredArgsConstructor
@RequestMapping("/api/orders")
public class OrderController {

    private final OrderService orderService;

    @Operation(summary = "주문")
    @PostMapping
    public ResponseEntity<ApiResponseDto> order(@RequestBody OrderRequestDto order) {
        return ResponseEntity.ok(orderService.order(order));
    }
}

 

  • Service Class
package com.toy.service;

import com.toy.dto.ApiResponseDto;
import com.toy.dto.OrderRequestDto;
import com.toy.kafka.OrderProducer;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;

@Service
@RequiredArgsConstructor
public class OrderService {

    private final OrderProducer orderProducer;

    public ApiResponseDto order(OrderRequestDto order) {
        orderProducer.sendOrder(order);

        return ApiResponseDto.builder()
                .result(true)
                .message("주문 등록 요청 성공")
                .build();
    }
}

 

 

728x90

 

4.3. Producer 생성

KafkaTemplate<K,V>.send(topic, value);

  • K: key 타입
  • V: value 타입
  • topic: 메세지 토픽 지정
  • value: 메세지 내용 전달 value
package com.toy.kafka;

import com.toy.dto.OrderRequestDto;
import lombok.RequiredArgsConstructor;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor
public class OrderProducer {
    private static final String TOPIC = "orders";

    private final KafkaTemplate<String, Object> kafkaTemplate;

    public void sendOrder(OrderRequestDto order) {
        kafkaTemplate.send(TOPIC, order);
    }
}

 

 

4.4. API 호출

Swagger UI (혹은 Postman 등) 활용하여 API 호출

 

 

4.5. Kafka 메세지 확인

Docker 컨테이너의 Kafka CLI 를 활용하여 Kafka 메세지 정상 전송 확인

PS D:\workspace\toy> docker exec -it kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic orders --from-beginning

{"orderNumber":"order_001","productId":100,"userId":"haley","quantity":1}
Processed a total of 1 messages
728x90
반응형

2. DB(Mysql) + Kafka + Zookeeper 띄우기

2.1. docker_compose.yml 작성

version: '3.8'

services:
  # 🐘 Zookeeper
  zookeeper:
    image: bitnami/zookeeper:latest
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes

  # 🦁 Kafka
  kafka:
    image: confluentinc/cp-kafka:7.5.1
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    depends_on:
      - zookeeper

  # 🐬 MySQL
  mysql:
    image: mysql:8
    container_name: mysql-container
    ports:
      - "3306:3306"
    environment:
      MYSQL_ROOT_PASSWORD: 루트계정 비밀번호 입력
      MYSQL_DATABASE: testdb
    restart: always
  • image : 이미지 + 버전
  • container_name : 컨테이너 이름 지정
  • ports : 로컬 포트와 컨테이너 포트 연결
  • environment : 설정

 

2.2. docker_compose.yml 파일이 있는 디렉토리에서 실행

PS D:\workspace\toy\docker> docker-compose up -d
time="2025-04-24T15:57:47+09:00" level=warning msg="D:\\workspace\\toy\\docker\\docker-compose.yml: the attribute `version` is obsolete, it will be ignored, please remove it to avoid potential confusion"
[+] Running 4/4
 ✔ Network docker_default     Created                                                                                                                                                                                              0.1s 
 ✔ Container zookeeper        Started                                                                                                                                                                                              2.9s 
 ✔ Container mysql-container  Started                                                                                                                                                                                              2.8s 
 ✔ Container kafka            Started

 

 

728x90

 

2.3. 실행중인 컨테이너 확인

PS D:\workspace\toy\docker>docker ps
CONTAINER ID   IMAGE                         COMMAND                   CREATED         STATUS              PORTS                                                  NAMES
d53765c53c70   confluentinc/cp-kafka:7.5.1   "/etc/confluent/dock…"   2 minutes ago   Up About a minute   0.0.0.0:9092->9092/tcp                                 kafka
6a7a0061cafb   mysql:8                       "docker-entrypoint.s…"   2 minutes ago   Up About a minute   0.0.0.0:3306->3306/tcp, 33060/tcp                      mysql-container
d381ae94ed1b   bitnami/zookeeper:latest      "/opt/bitnami/script…"   2 minutes ago   Up About a minute   2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, 8080/tcp   zookeeper
728x90
반응형

 

Docker Desktop 정상 설치 완료 후 실행 시 꺼지는 현상 발생하는 경우

원인 및 해결 방법

 

 

원인1. WSL 미설치

1.1. cmd 창에서 WSL 설치 확인 

C:\Users\asdf>wsl --list --verbose
Linux용 Windows 하위 시스템에 배포가 설치되어 있지 않습니다.
아래의 Microsoft Store에서 배포를 설치할 수 있습니다.
https://aka.ms/wslstore

 

 

1.2. 위와 같이 설치되어 있지 않은 경우 powershell 로 wsl 설치 

PS C:\Windows\system32> wsl --install -d Ubuntu
설치 중: Ubuntu
Ubuntu이(가) 설치되었습니다.
Ubuntu 실행 중...

 

 

1.3. 아래와 같은 Ubuntu 창이 뜨면, 계정 생성 후 종료

 

1.4. WSL 버전 2 확인

C:\Users\CHAE>wsl --list --verbose
  NAME      STATE           VERSION
* Ubuntu    Stopped         2

 

1.4.1. 버전1 로 나오는 경우

C:\Users\asdf>wsl --list --verbose
  NAME      STATE           VERSION
* Ubuntu    Running         1

 

위와 같이 1 버전으로 나오는 경우 Microsoft 공식 사이트에서 WSL 2  커널 업데이트 설치 

https://learn.microsoft.com/ko-kr/windows/wsl/install-manual#step-4---download-the-linux-kernel-update-package

 

다운로드 받은 wsl_update_x64.msi 파일 실행 하여 설치 완료 후

cmd 에서 변환 명령어 입력

C:\Users\asdf>wsl --set-version Ubuntu 2
변환이 진행 중입니다. 몇 분 정도 걸릴 수 있습니다...
WSL 2와의 주요 차이점에 대한 자세한 내용은 https://aka.ms/wsl2를 참조하세요
변환이 완료되었습니다.

 

 

1.5. Docker Desktop 재실행

728x90

원인2. Hyper-V 비활성화

2.1. 시작 > Windows 기능 켜기 또는 끄기 실행

 

2.2. Hyper-V 비활성화 되어있는 경우 활성화로 변경 및 재부팅

 

2.3. Docker Desktop 재실행

728x90
반응형

1. Docker 설치

 

1.1. 공식 Docker 홈페이지에 접속

https://www.docker.com/products/docker-desktop/

 

 

1.2. 운영체제에 맞는 Docker Desktop 버전 선택하여 installer 설치

 

 

1.3. 설치 된 installer 실행하여 설치

 

 

설치 도중 사용중인 보안 프로그램에 따라 hosts 파일이 변경되었다는 경고를 받을 수 있으나

이는 Docker 로컬에서 컨테이너와 통신할 수 있도록 도와주는 설정일 뿐이므로

복원하지 않고 변경된 hosts 파일을 유지해야 함

 

 

1.4. 설치 완료 시 버튼 클릭하여 재부팅

 

 

1.5. 재부팅 후 터미널 창에서 docker -v 명령어를 입력하여 버전 정보가 뜬다면 정상 설치

 

 

1.6. Docker Desktop 실행하기

시작 메뉴 > Docker Desktop 검색하여 실행

 

docker 아이콘으로 확인하거나

docker run hello-world 커맨드로 정상 실행 확인 가능

 

C:\Users\asdf>docker run hello-world

.. 

Hello from Docker!
This message shows that your installation appears to be working correctly.

..

 

 

** 실행안되는 경우 아래 포스팅 참고

https://haleylog.tistory.com/24

 

Docker Desktop 설치 후 실행 안되는 경우

Docker Desktop 정상 설치 완료 후 실행 시 꺼지는 현상 발생하는 경우원인 및 해결 방법 원인1. WSL 미설치1.1. cmd 창에서 WSL 설치 확인 C:\Users\asdf>wsl --list --verboseLinux용 Windows 하위 시스템에 배포가 설

haleylog.tistory.com

728x90
반응형

 

대용량 트래픽 처리를 위한 환경을 구성하고 성능 테스트 하는 과정을 기록해 보려고 한다.

 

1. 환경 구성

  • Docker Desktop 설치
  • Kafka(Zookeeper) + MySQL 이미지 띄우기

 

2. Kafka 실습 프로젝트 기본 구조 만들기

  • Spring Boot3 + Java17 + JPA 프로젝트 생성
  • Producer, Consumer 간단한 메시지 처리 로직

 

3. 대용량 트래픽 시나리오 설정

  • 예: 주문 생성 → Kafka로 메시지 → Consumer가 DB 저장

 

4. JMeter 테스트 툴을 활용 한 성능 테스트 진행

  • TPS, 처리 지연 시간, 메시지 누락 여부, 리소스 사용량 분석
728x90

 

728x90
반응형

+ Recent posts