6. Jmeter 를 활용한 Kafka 병목 현상 테스트

6.1. prodcer/consumer 수정 및 주문 API 서비스 로직 수정

6.1.1. OrderProducer/OrderConsumer 수정

굳이 객체 전체를 넘기지 않아도 되므로 메세지 value 값도 string(unique 한 주문번호) 으로 수정

(필요에 따라 application.yml 의 기본 producer/consumer 설정 변경하거나, String 타입의 별도 kafka factory 설정 추가)

  • OrderProducer
private final KafkaTemplate<String, String> kafkaTemplate;

public void sendOrder(String orderNumber) {
    kafkaTemplate.send(TOPIC, orderNumber);
}
  • OrderConsumer
    private static final String TOPIC = "orders";

    private final KafkaTemplate<String, String> kafkaTemplate;

    public void sendOrder(String orderNumber) {
        kafkaTemplate.send(TOPIC, orderNumber);
    }

 

6.1.2. OrderService 수정

Kafka 메세지 전송 전에 PROCESSING(진행중) 상태로 주문데이터 저장 로직 추가

 -> 상태 전이 관리하여 중복 요청 방지를 위해

디버깅 로그 추가

package com.toy.service;

import com.toy.dto.ApiResponseDto;
import com.toy.dto.OrderRequestDto;
import com.toy.kafka.OrderProducer;
import com.toy.repository.OrderRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

@Service
@Slf4j
@RequiredArgsConstructor
public class OrderService {

    private final OrderRepository orderRepository;
    private final OrderProducer orderProducer;

    public ApiResponseDto order(OrderRequestDto orderRequestDto) {
        log.debug("Order Request !! [" + orderRequestDto.getOrderNumber() + "]");

        orderRepository.save(orderRequestDto.toEntity());
        orderProducer.sendOrder(orderRequestDto.getOrderNumber());

        return ApiResponseDto.builder().result(true).message("주문 등록 요청 성공").build();
    }
}

 

 

6.2. 주문 저장 로직에 병목 현상 구현

6.2.1. repository 에 조건부 상태 변경 추가

지정 한 상태일 때만 상태가 변경 될 수 있도록 JPQL 추가

package com.toy.repository;

import com.toy.entity.Order;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
import org.springframework.stereotype.Repository;


@Repository
public interface OrderRepository extends JpaRepository<Order, Long> {
    @Modifying
    @Query("UPDATE Order o SET o.status = :newStatus WHERE o.orderNumber = :orderNumber AND o.status = :expectedStatus")
    int updateStatusConditionally(@Param("orderNumber") String orderNumber,
                                  @Param("expectedStatus") Order.OrderStatus expectedStatus,
                                  @Param("newStatus") Order.OrderStatus newStatus);
}

 

 

6.2.2. 주문 저장 로직 변경

병목 현상 구현을 위해 2초 sleep 추가

주문 상태가 PROCESSING(진행중) 이면 DONE(완료) 처리하여 저장

 -> 주문 상태 완료 update 결과가 0인 경우 이미 처리 된 주문으로 가정하여 debug log 찍어두기

 

package com.toy.service;

import com.toy.entity.Order;
import com.toy.repository.OrderRepository;
import jakarta.transaction.Transactional;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

@Service
@Slf4j
@RequiredArgsConstructor
public class OrderStorageService {
    private final OrderRepository orderRepository;

	@Transactional
    public void store(String orderNumber) {
        log.debug("Order Store Start !! [" + orderNumber + "]");

        try {
            // 비즈니스 처리 소요 시간 가정 2초
            Thread.sleep(2000);

            int result = orderRepository.updateStatusConditionally(orderNumber, Order.OrderStatus.PROCESSING, Order.OrderStatus.DONE);

            if(result == 0) {
                log.debug("Already Stored Order :( [" + orderNumber + "]");
                return;
            }

        } catch (Exception e) {
            orderRepository.updateStatusConditionally(orderNumber, Order.OrderStatus.PROCESSING, Order.OrderStatus.FAILED);
        }

        log.debug("Order Store End !! [" + orderNumber + "]");
    }
}

 

 

6.3. Jmeter 파일 다운로드

Jmeter 공식 다운로드 페이지 접속하여 압축파일 다운로드

https://jmeter.apache.org/download_jmeter.cgi

 

 

6.4. Jmeter 실행

압축 해제 한 Jmeter 폴더\bin\jmeter.bat 클릭 하여 Jmeter GUI 실행

 

 

6.5. Jmeter 설정

6.5.1. Thread Group 추가

Test Plan 우클릭 > Add > Threads (Users) > Thread Group

1초 동안 사용자 10명 동시 요청 환경 설정

  • Number of Threads : Thread 수 지정 (동시 요청 사용자 수)
  • Ramp-up period : 요청 텀 지정
  • Loop Count : 반복 횟수 지정

 

6.5.2. Http Request 추가

Thread Group 우클릭 > Add > Sampler > Http Request

API Url, method 설정

Jmeter 함수 활용하여 Request Body 설정

  • __time(yyyyMMddHHmmss) : 현재 시간 문자열 변환
  • __threadNum : 스레드 번호 (각각 다른 번호 주기 위해 활용)
{
  "orderNumber": "order_${__time(yyyyMMddHHmmss)}_${__threadNum}",
  "productId": 1,
  "userId": "haley${__threadNum}",
  "quantity": 1
}

 

6.5.3. Http Header Manager 추가

Http Request 우클릭 > Add > Config Element > Http Header Manager

Content-Type : application/json 추가

 

6.5.4. View Results Tree 추가

Thread Group 우클릭 > Add > Listener > View Results Tree

 

6.6. Jmeter 테스트 실행

상단 실행 버튼 클릭하고 View Results Tree 클릭하여

Request 정상 호출 되었는지 확인

 

 

6.7. 병목 현상 확인

OrderService, OrderStorageService 에 찍어둔 debug log 확인

Order API 호출은 0.4초에 10회 모두 요청했지만,

실제 주문 저장 처리는 12분 00초 부터 12분 20초 까지 약 20초 소요

 -> consumer concurrency 기본값은 1 이므로, sleep 2초 * 10회 하여 총 20초

17:11:57.969+09:00 DEBUG 19192 : Order Request !! [order_20250509171157_6]
17:11:57.969+09:00 DEBUG 19192 : Order Request !! [order_20250509171157_2]
17:11:57.969+09:00 DEBUG 19192 : Order Request !! [order_20250509171157_3]
17:11:57.970+09:00 DEBUG 19192 : Order Request !! [order_20250509171157_4]
17:11:57.969+09:00 DEBUG 19192 : Order Request !! [order_20250509171157_7]
17:11:57.969+09:00 DEBUG 19192 : Order Request !! [order_20250509171157_5]
17:11:57.973+09:00 DEBUG 19192 : Order Request !! [order_20250509171157_1]
17:11:58.002+09:00 DEBUG 19192 : Order Request !! [order_20250509171157_8]
17:11:58.185+09:00 DEBUG 19192 : Order Request !! [order_20250509171158_9]
17:11:58.380+09:00 DEBUG 19192 : Order Request !! [order_20250509171158_10]

17:12:00.232+09:00 DEBUG 19192 : Order Store Start !! [order_20250509171157_4]
17:12:02.465+09:00 DEBUG 19192 : Order Store End !! [order_20250509171157_4]
17:12:02.486+09:00 DEBUG 19192 : Order Store Start !! [order_20250509171158_10]
17:12:04.496+09:00 DEBUG 19192 : Order Store End !! [order_20250509171158_10]
17:12:04.519+09:00 DEBUG 19192 : Order Store Start !! [order_20250509171157_8]
17:12:06.543+09:00 DEBUG 19192 : Order Store End !! [order_20250509171157_8]
17:12:06.605+09:00 DEBUG 19192 : Order Store Start !! [order_20250509171157_7]
17:12:08.627+09:00 DEBUG 19192 : Order Store End !! [order_20250509171157_7]
17:12:08.643+09:00 DEBUG 19192 : Order Store Start !! [order_20250509171158_9]
17:12:10.667+09:00 DEBUG 19192 : Order Store End !! [order_20250509171158_9]
17:12:10.699+09:00 DEBUG 19192 : Order Store Start !! [order_20250509171157_6]
17:12:12.708+09:00 DEBUG 19192 : Order Store End !! [order_20250509171157_6]
17:12:12.733+09:00 DEBUG 19192 : Order Store Start !! [order_20250509171157_2]
17:12:14.756+09:00 DEBUG 19192 : Order Store End !! [order_20250509171157_2]
17:12:14.789+09:00 DEBUG 19192 : Order Store Start !! [order_20250509171157_1]
17:12:16.811+09:00 DEBUG 19192 : Order Store End !! [order_20250509171157_1]
17:12:16.840+09:00 DEBUG 19192 : Order Store Start !! [order_20250509171157_3]
17:12:18.854+09:00 DEBUG 19192 : Order Store End !! [order_20250509171157_3]
17:12:18.887+09:00 DEBUG 19192 : Order Store Start !! [order_20250509171157_5]
17:12:20.898+09:00 DEBUG 19192 : Order Store End !! [order_20250509171157_5]
728x90
반응형

5. Kafka 메세지 수신 용 Consumer 생성 및 메세지 처리

5.1. 저장 할 Entity 및 Repository 생성

  • 주문 저장 될 Entity
package com.toy.entity;

import jakarta.persistence.*;
import lombok.*;

import java.time.LocalDateTime;

@Entity
@Table(name = "orders")
@Getter
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class Order {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    // 주문 고유 번호
    @Column(nullable = false, unique = true)
    private String orderNumber;

    // 상품 ID
    private Long productId;

    // 주문자 ID
    private String userId;

    // 주문 수량
    private int quantity;

    // 주문 상태 (PROCESSING, DONE, FAILED)
    @Enumerated(EnumType.STRING)
    private OrderStatus status;

    private LocalDateTime orderedAt;

    public enum OrderStatus {
        PROCESSING, DONE, FAILED
    }
}

 

  • Repository
package com.toy.repository;

import com.toy.entity.Order;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;

@Repository
public interface OrderRepository extends JpaRepository<Order, Long> {
    
}

 

5.2. Consumer 생성 (Kafka Listener)

  • topics : 수신 할 토픽 지정
  • groupId : 메세지 관리 그룹 지정
package com.toy.kafka;

import com.toy.dto.OrderRequestDto;
import com.toy.service.OrderStorageService;
import lombok.RequiredArgsConstructor;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor
public class OrderConsumer {
    private final OrderStorageService orderStorageService;

    @KafkaListener(
            topics = "orders",
            groupId = "order-consumer-group"
    )
    public void consume(OrderRequestDto orderDto) {
        orderStorageService.store(orderDto);
    }
}

 

 

728x90

 

5.3. 메세지 처리 서비스 생성

  • Kafka 메세지 객체 Class 에 entity 객체로 변환하는 메서드 추가
public class OrderRequestDto {

	... 생략

    public Order toEntity() {
        return Order.builder()
                .orderNumber(orderNumber)
                .productId(productId)
                .userId(userId)
                .quantity(quantity)
                .status(Order.OrderStatus.PROCESSING)
                .orderedAt(LocalDateTime.now())
                .build();
    }
}

 

  • 주문 데이터 저장 Service 생성
package com.toy.service;

import com.toy.dto.OrderRequestDto;
import com.toy.repository.OrderRepository;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;

@Service
@RequiredArgsConstructor
public class OrderStorageService {
    private final OrderRepository orderRepository;

    public void store(OrderRequestDto order) {
        orderRepository.save(order.toEntity());
    }
}

 

 

5.4. API 호출

Swagger UI (혹은 Postman 등) 활용하여 API 호출

 

 

5.5. Database 저장 확인

DataBase 조회 및 정상 저장 확인

SELECT * FROM orders;

 

728x90
반응형

4. Kafka 메세지 전송 용 API 및 Producer 생성

4.1. API 에서 사용 될 Request / Response DTO 생성

  • API Request 와 Kafka 메세지로 사용 될 DTO
package com.toy.dto;

import com.toy.entity.Order;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;

import java.time.LocalDateTime;

@Getter
@NoArgsConstructor
@AllArgsConstructor
@Builder
@Schema(description = "주문 등록 Request")
public class OrderRequestDto {

    @Schema(description = "주문 번호")
    private String orderNumber;

    @Schema(description = "상품 ID")
    private Long productId;

    @Schema(description = "사용자 ID")
    private String userId;

    @Schema(description = "수량")
    private int quantity;
}

 

  • API Response 에서 사용 될 DTO
package com.toy.dto;

import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;

@Getter
@Builder
@Schema(description = "API 공통 Response")
public class ApiResponseDto {
    @Schema(description = "성공여부")
    private boolean result;
    @Schema(description = "메세지")
    private String message;
}

 

4.2. 주문 등록 API에 사용 될 Controller, Service 생성

  • Controller
package com.toy.controller;

import com.toy.dto.ApiResponseDto;
import com.toy.dto.OrderRequestDto;
import com.toy.service.OrderService;
import io.swagger.v3.oas.annotations.Operation;
import lombok.RequiredArgsConstructor;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;

@RestController
@RequiredArgsConstructor
@RequestMapping("/api/orders")
public class OrderController {

    private final OrderService orderService;

    @Operation(summary = "주문")
    @PostMapping
    public ResponseEntity<ApiResponseDto> order(@RequestBody OrderRequestDto order) {
        return ResponseEntity.ok(orderService.order(order));
    }
}

 

  • Service Class
package com.toy.service;

import com.toy.dto.ApiResponseDto;
import com.toy.dto.OrderRequestDto;
import com.toy.kafka.OrderProducer;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;

@Service
@RequiredArgsConstructor
public class OrderService {

    private final OrderProducer orderProducer;

    public ApiResponseDto order(OrderRequestDto order) {
        orderProducer.sendOrder(order);

        return ApiResponseDto.builder()
                .result(true)
                .message("주문 등록 요청 성공")
                .build();
    }
}

 

 

728x90

 

4.3. Producer 생성

KafkaTemplate<K,V>.send(topic, value);

  • K: key 타입
  • V: value 타입
  • topic: 메세지 토픽 지정
  • value: 메세지 내용 전달 value
package com.toy.kafka;

import com.toy.dto.OrderRequestDto;
import lombok.RequiredArgsConstructor;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor
public class OrderProducer {
    private static final String TOPIC = "orders";

    private final KafkaTemplate<String, Object> kafkaTemplate;

    public void sendOrder(OrderRequestDto order) {
        kafkaTemplate.send(TOPIC, order);
    }
}

 

 

4.4. API 호출

Swagger UI (혹은 Postman 등) 활용하여 API 호출

 

 

4.5. Kafka 메세지 확인

Docker 컨테이너의 Kafka CLI 를 활용하여 Kafka 메세지 정상 전송 확인

PS D:\workspace\toy> docker exec -it kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic orders --from-beginning

{"orderNumber":"order_001","productId":100,"userId":"haley","quantity":1}
Processed a total of 1 messages
728x90
반응형

2. DB(Mysql) + Kafka + Zookeeper 띄우기

2.1. docker_compose.yml 작성

version: '3.8'

services:
  # 🐘 Zookeeper
  zookeeper:
    image: bitnami/zookeeper:latest
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes

  # 🦁 Kafka
  kafka:
    image: confluentinc/cp-kafka:7.5.1
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    depends_on:
      - zookeeper

  # 🐬 MySQL
  mysql:
    image: mysql:8
    container_name: mysql-container
    ports:
      - "3306:3306"
    environment:
      MYSQL_ROOT_PASSWORD: 루트계정 비밀번호 입력
      MYSQL_DATABASE: testdb
    restart: always
  • image : 이미지 + 버전
  • container_name : 컨테이너 이름 지정
  • ports : 로컬 포트와 컨테이너 포트 연결
  • environment : 설정

 

2.2. docker_compose.yml 파일이 있는 디렉토리에서 실행

PS D:\workspace\toy\docker> docker-compose up -d
time="2025-04-24T15:57:47+09:00" level=warning msg="D:\\workspace\\toy\\docker\\docker-compose.yml: the attribute `version` is obsolete, it will be ignored, please remove it to avoid potential confusion"
[+] Running 4/4
 ✔ Network docker_default     Created                                                                                                                                                                                              0.1s 
 ✔ Container zookeeper        Started                                                                                                                                                                                              2.9s 
 ✔ Container mysql-container  Started                                                                                                                                                                                              2.8s 
 ✔ Container kafka            Started

 

 

728x90

 

2.3. 실행중인 컨테이너 확인

PS D:\workspace\toy\docker>docker ps
CONTAINER ID   IMAGE                         COMMAND                   CREATED         STATUS              PORTS                                                  NAMES
d53765c53c70   confluentinc/cp-kafka:7.5.1   "/etc/confluent/dock…"   2 minutes ago   Up About a minute   0.0.0.0:9092->9092/tcp                                 kafka
6a7a0061cafb   mysql:8                       "docker-entrypoint.s…"   2 minutes ago   Up About a minute   0.0.0.0:3306->3306/tcp, 33060/tcp                      mysql-container
d381ae94ed1b   bitnami/zookeeper:latest      "/opt/bitnami/script…"   2 minutes ago   Up About a minute   2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, 8080/tcp   zookeeper
728x90
반응형

 

대용량 트래픽 처리를 위한 환경을 구성하고 성능 테스트 하는 과정을 기록해 보려고 한다.

 

1. 환경 구성

  • Docker Desktop 설치
  • Kafka(Zookeeper) + MySQL 이미지 띄우기

 

2. Kafka 실습 프로젝트 기본 구조 만들기

  • Spring Boot3 + Java17 + JPA 프로젝트 생성
  • Producer, Consumer 간단한 메시지 처리 로직

 

3. 대용량 트래픽 시나리오 설정

  • 예: 주문 생성 → Kafka로 메시지 → Consumer가 DB 저장

 

4. JMeter 테스트 툴을 활용 한 성능 테스트 진행

  • TPS, 처리 지연 시간, 메시지 누락 여부, 리소스 사용량 분석
728x90

 

728x90
반응형

+ Recent posts