Spring/Toy

5. Kafka > Consumer 생성 및 메세지 처리

haleylog 2025. 4. 29. 17:09

5. Kafka 메세지 수신 용 Consumer 생성 및 메세지 처리

5.1. 저장 할 Entity 및 Repository 생성

  • 주문 저장 될 Entity
package com.toy.entity;

import jakarta.persistence.*;
import lombok.*;

import java.time.LocalDateTime;

@Entity
@Table(name = "orders")
@Getter
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class Order {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    // 주문 고유 번호
    @Column(nullable = false, unique = true)
    private String orderNumber;

    // 상품 ID
    private Long productId;

    // 주문자 ID
    private String userId;

    // 주문 수량
    private int quantity;

    // 주문 상태 (PROCESSING, DONE, FAILED)
    @Enumerated(EnumType.STRING)
    private OrderStatus status;

    private LocalDateTime orderedAt;

    public enum OrderStatus {
        PROCESSING, DONE, FAILED
    }
}

 

  • Repository
package com.toy.repository;

import com.toy.entity.Order;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;

@Repository
public interface OrderRepository extends JpaRepository<Order, Long> {
    
}

 

5.2. Consumer 생성 (Kafka Listener)

  • topics : 수신 할 토픽 지정
  • groupId : 메세지 관리 그룹 지정
package com.toy.kafka;

import com.toy.dto.OrderRequestDto;
import com.toy.service.OrderStorageService;
import lombok.RequiredArgsConstructor;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor
public class OrderConsumer {
    private final OrderStorageService orderStorageService;

    @KafkaListener(
            topics = "orders",
            groupId = "order-consumer-group"
    )
    public void consume(OrderRequestDto orderDto) {
        orderStorageService.store(orderDto);
    }
}

 

 

728x90

 

5.3. 메세지 처리 서비스 생성

  • Kafka 메세지 객체 Class 에 entity 객체로 변환하는 메서드 추가
public class OrderRequestDto {

	... 생략

    public Order toEntity() {
        return Order.builder()
                .orderNumber(orderNumber)
                .productId(productId)
                .userId(userId)
                .quantity(quantity)
                .status(Order.OrderStatus.PROCESSING)
                .orderedAt(LocalDateTime.now())
                .build();
    }
}

 

  • 주문 데이터 저장 Service 생성
package com.toy.service;

import com.toy.dto.OrderRequestDto;
import com.toy.repository.OrderRepository;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;

@Service
@RequiredArgsConstructor
public class OrderStorageService {
    private final OrderRepository orderRepository;

    public void store(OrderRequestDto order) {
        orderRepository.save(order.toEntity());
    }
}

 

 

5.4. API 호출

Swagger UI (혹은 Postman 등) 활용하여 API 호출

 

 

5.5. Database 저장 확인

DataBase 조회 및 정상 저장 확인

SELECT * FROM orders;

 

728x90
반응형