GCP/원데이

[GCP 원데이] RabbitMQ

ozofweird 2021. 5. 3. 19:24

1. RabbitMq

1) RabbitMQ

글 작성 요청을 저장했다가 DB가 처리할 수 있는 만큼만 처리할 때 사용하고, 글 목록 캐시는 사용자의 글 목록 조회 요청을 매번 DB에서 조회하는 것이 아닌 애플리케이션에 저장을 했다가 그 결과를 반환하도록 한다.

 

5672 포트는 실제 메시지를 RabbitMQ와 주고받기 위한 포트이며, 15672 포트는 모니터링을 위한 포트이다.

docker run -d --hostname my-rabbit --name some-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management

 

모니터링 페이지의 접속 정보는 다음과 같다. 해당 부분은 Admin 항목에서 관리를 위한 계정을 생성해주는 것이 좋다.

 

  • Username - guest
  • Password - guest

 

2) 큐 추가

3) 큐 삽입

Public Message 항목은 메시지를 큐에다 넣기 위한 항목이다.

4) 큐 확인

Get Message를 이용하여 큐에 들어간 메시지를 확인할 수 있다.

큐 확인과 동시에 메시지를 큐에서 빼기 위해서는 Ack Node의 설정을 변경해준다.

5) build.gradle

Spring Boot 애플리케이션에서 RabbitMQ를 사용하기 위해 의존성을 추가해준다.

implementation group: 'org.springframework.boot', name: 'spring-boot-starter-amqp', version: '2.4.5'

6) application.yml

spring:
  config:
    activate:
      on-profile: local

  datasource:
    url: jdbc:postgresql://localhost:5432/postgresql
    username: postgresql
    password: postgrespassword
  jpa:
    show-sql: true
    hibernate:
      dialect: org.hibernate.dialect.PostgreSQLDialect
      ddl-auto: update

  rabbitmq:
    host: localhost
    username: guest
    password: guest
    port: 5672

7) Producer, Consumer

메시지를 큐에 넣는(글 작성) Producer와 메시지를 빼오는 Consumer를 생성해준다.

package com.example.io;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class Producer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendTo(String message) {
        this.rabbitTemplate.convertAndSend("CREATE_POST_QUEUE", message);
    }
}
package com.example.io;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort;
import org.springframework.web.bind.annotation.*;

import java.util.List;

@RestController
public class PostController {

    private static Integer PAGE_SIZE = 20;

    @Autowired
    private PostRepository postRepository;

    @Autowired
    private Producer producer;

    @Autowired
    private ObjectMapper objectMapper;

    // 1. 글을 작성한다.
    @PostMapping("/post")
    public Post createPost(@RequestBody Post post) throws JsonProcessingException {
        String jsonPost = objectMapper.writeValueAsString(post);
        producer.sendTo(jsonPost);

        return post;
    }

    ...
}

Postman에서 글 작성 요청을 해보고, 모니터링 화면으로 돌아가면 메시지가 들어간 것을 확인할 수 있다.

Cunsumer는, 큐에 메시지가 들어오면 handler 메서드가 호출이 되며, 스트링을 읽어온다.

package com.example.io;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class Consumer {

    @Autowired
    private ObjectMapper objectMapper;

    @Autowired
    private PostRepository postRepository;

    @RabbitListener(queues = "CREATE_POST_QUEUE")
    public void handler(String message) throws JsonProcessingException {
        Post post = objectMapper.readValue(message, Post.class);
        postRepository.save(post);
    }
}

8) RabbitMQ 인스턴스 설정

  • 도쿄
  • e2-medium
  • CentOS 7
  • 5672, 15672 포트 방화벽 설정

 

sudo yum install -y docker
sudo systemctl start docker
sudo chmod 666 /var/run/docker.sock
docker run -d --hostname my-rabbit --name some-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management

※ application.yml 파일에는 RabbitMQ 인스턴스의 내부 IP로 설정해준다. 

9) 글 목록 캐싱

첫 번째 페이지로 요청이 들어왔을 때 캐싱이 되도록 설정해준다.

package com.example.io;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@Component
public class PostCacheService {

    @Autowired
    private PostRepository postRepository;

    private Page<Post> firstPostPage;

    @Scheduled(cron = "* * * * * *")
    public void updateFirstPostPage() {
        firstPostPage = postRepository.findAll(
                PageRequest.of(0, 20, Sort.by("id").descending())
        );
    }

    public Page<Post> getFirstPostPage() {
        return this.firstPostPage;
    }
}
package com.example.io;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort;
import org.springframework.web.bind.annotation.*;

import java.util.List;

@RestController
public class PostController {

    private static Integer PAGE_SIZE = 20;

    @Autowired
    private PostRepository postRepository;

    @Autowired
    private Producer producer;

    @Autowired
    private ObjectMapper objectMapper;

    @Autowired
    private PostCacheService postCacheService;

    // 1. 글을 작성한다.
    @PostMapping("/post")
    public Post createPost(@RequestBody Post post) throws JsonProcessingException {
//        return postRepository.save(post);
        String jsonPost = objectMapper.writeValueAsString(post);
        producer.sendTo(jsonPost);

        return post;
    }

    // 2. 글 목록을 페이징하여 반환 (page 파라미터가 없을 경우 1번 page 요청)
    @GetMapping("/posts")
    public Page<Post> getPostList(@RequestParam(defaultValue = "1") Integer page) {
        if(page.equals(1)) {
            return postCacheService.getFirstPostPage();
        } else {
            return postRepository.findAll(
                    PageRequest.of(page - 1, PAGE_SIZE, Sort.by("id").descending())
            );
        }
    }

    ...
}
package com.example.io;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

@EnableScheduling
@SpringBootApplication
public class IoApplication {

    public static void main(String[] args) {
        SpringApplication.run(IoApplication.class, args);
    }

}

10) 서버 반영

마지막으로 Jenkins를 이용하여 내용을 반영해준다. 캐싱은 애플리케이션 내부에 구현하는 것보다는 Redis와 같은 별도 서비스를 이용하는 것이 더 효율적이다.


RabbitMQ는 비동기적인 작업을 처리할 때 좋다. 특정 이벤트를 발생시킨 클라이언트가 응답을 받을 필요가 없거나 즉시 받을 필요가 없는 경우 사용된다. 

 

응답을 받을 필요가 없는 경우는 로그와 같이 여러 서버에서 발생한 내용을 쌓는 작업이 있이며, 즉시 받을 필요가 없는 경우는 작업에 오랜시간이 걸리지만 완료될 경우 클라이언트에게 어떤식으로든 알려주는 작업을 뜻한다.

 

Consumer는 작업이 완료되었음을 Producer에게 전달(Callback)한다. 이 때 중요한 점은 어떤 작업이 완료되었는지 알려주기 위해서는 처음부터 요청을 식별할 수 있는 키가 존재해야한다는 것이다. 이 키는 Producer에서 유니크한 값으로 발급되었다면 문제가 없으며, 해당 키는 메시지에 포함되어 Consumer에게 전달이된다. 결국 Producer는 작업 완료 즉시 완료 여부를 알 수 있게 된다. Producer에서 로드밸런서를 끼고 여러개가 있는 경우에는 동일한 데이터를 공유해야하기 때문에 키 별로 작업 처리 완료 여부를 보관하기 위해서 Redis 캐시를 사용하는 것을 권장한다고 한다.

 

그 다음으로 해결해야하는 작업으로는 사용자와 Producer간의 데이터를 주고 받는 방식이다. 사용자가 웹 브라우저라고 가정했을 때, 커넥션을 계속 가지고 있어야하는 웹 소켓과 요청을 한 직후부터 작업이 완료되었는지 계속 Producer에게 물어보는 폴링 방식이 존재한다. 폴링을 선택한다면, 작업이 완료되기 전까지는 Producer는 계속 완료되지 않았다고 응답ㅇ르 하게 된다. 그러다 작업이 완료되는 순간 브라우저에서는 작업 완료에 맞는 스크립트를 실행하여 화면에 보여주게 된다.

 

 

 

2. 성능 측정

1) Artillery 성능 측정

config:
  target: "http://[NginX IP]"
  phases:
    - duration: 60
      arrivalRate: 3
      name: Warm up
    - duration: 120
      arrivalRate: 3
      rampTo: 50
      name: Ramp up load
    - duration: 600
      arrivalRate: 50
      name: Sustained load
  payload:
    path: "ratings_test_10k.csv"
    fields:
      - "content" 
scenarios:
  - name: "just post content"
    flow:
      - post:
          url: "/post"
          json:
            content: "{{ content }}"
      - think: 1
      - get:
          url: "/posts"

artillery run --output report.json io-test.yaml

스트레스 테스트 결과에 에러가 발생한 경우에는 인스턴스의 리전을 고려해보아야한다. 각 인스턴스가 설치된 지리적인 위치에 따라 거리가 다르기 때문이다. 각 인스턴스에 접속하여 인스턴스끼리 Ping을 테스트해보면, 다른 리전에 있는 인스턴스 사이는 60-70ms가 걸리는 반면에 같은 리전에 있는 인스턴스들은 1ms 만 걸리는 것을 확인할 수 있다.

 

톰갯은 요청이 들어오면 기본적으로 생성되는 200개의 쓰레드가 요청을 처리하고, 모든 쓰레드가 사용중일 경우에는 길이가 100인 큐에 저장을 했다가 처리를 한다. 즉, 하나의 요청이 톰캣의 쓰레드를 오래 사용할 수록 쓰레드는 빠르게 고갈이 되며 큐는 가득 차버리게 된다. 그 외의 나머지 요청들은 500에러 혹은 타임아웃이 발생하게 된다.

 

요청을 늦게 응답할 수록 해당 요청이 톰캣의 쓰레드를 차지하고 있기 때문에 쓰레드가 금방 고갈된다. 이를 해결하기 위해서는 요청을 가능하면 빠르게 처리하고 응답을 주는 것이 중요하다.

2) 인스턴스 삭제 및 생성

요청을 보다 빠르게 처리하기 위해서 서울 리전에 있는 Worker 인스턴스 하나를 지우고 서울에 NginX 인스턴스를 새로 만들어준다. 만든 NginX 인스턴스로 스트레스 테스트를 진행해준다. Latency를 좀 더 줄여주기 위해 RabbitMQ도 새로만든 NginX 인스턴스에 같이 올려준다.

 

3 개의 Worker 인스턴스중에서 하나를 삭제했기 때문에 실제 트래픽을 처리하는 주체가 줄어들게 된다. 전체 톰캣의 쓰레드 수도 600에서 400으로 줄어들게 된다. 하지만 네트워크 지연(Latency)는 줄어들게 된다. CPU 바운드 애플리케이션이었다면 성능이 안좋아질 수 도 있지만, I/O 바운드 애플리케이션은 컴퓨팅 파워가 아닌 네트워크 지연이 병목되고 있기 때문에 효과가 있다.

 

기존의 NginX 인스턴스를 이용하여 머신이미지를 만들고, 만든 이미지로 서울 리전에 새로운 NginX 인스턴스를 생성해준다. 그리고 새로만든 NginX에 접속하여 NginX를 실행시켜준다. (단, 기존의 Worker 인스턴스가 삭제되었기 때문에 NginX 설정파일과 Jenkins에서 배포 설정에서 삭제된 서버의 정보를 삭제해준다.)

sudo vi /etc/nginx/nginx.conf
sudo systemctl start nginx

sudo yum install -y docker
sudo systemctl start docker
sudo chmod 666 /var/run/docker.sock

docker run -d --hostname my-rabbit --name some-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management

 

새로 생성한 NginX 인스턴스 생성이 완료가 되면 해당 IP로 RabbitMQ에 접속하여 큐를 생성해주고 프로젝트의 RabbitMQ 설정을 수정해준다.

  rabbitmq:
    host: 10.178.0.18
    username: guest
    password: guest
    port: 5672

3) 서버 반영 및 테스트

깃 허브에 커밋을 하고 Jenkins에서 빌드를 하여 반영을 해준다. 반영이 완료가 되었다면, 확인을 위해 새로 생성한 NginX 인스턴스의 외부 IP로 접속해본다.

다시 새로 생성한 NginX 인스턴스 외부 IP로 Artillery 스크립트를 수정해준뒤 테스트를 진행해본다. 그럼에도 실패했다면, NginX의 에러일 확률이 높다. NginX에 문제가 있는지 알기 위한 방법은 간단하다. 직접 애플리케이션에 스트레스 테스트를 했을 경우와 NginX로 스트레스를 테스트를 각각 시도하여 어느 부분에서 문제가 발생을 했는지 확인할 수 있다.

 

그 중 'Too many open files' 에러의 경우 각 프로세스의 nofile limit을 올려주어야 해결이 가능하다. NginX의 프로세스를 확인했을 때, 각 Worker 프로세스의 PID를 이용하여 nofile limit 설정을 해준다.

ps -aux | grep nginx

// 현재 설정된 nofile limit 확인
sudo prlimit --nofile --output RESOURCE,SOFT,HARD --pid [NginX PID]

// 설정가능한 최대 nofile limit 확인
sudo cat /proc/sys/fs/file-max

// nofile limit 10만으로 수정
sudo prlimit --nofile=100000 --pid=[NginX PID 1]
sudo prlimit --nofile=100000 --pid=[NginX PID 2]

※ 로그의 경우 관리해야할 서버의 수가 적다면 직접 실행중인 서버에 접속하여 확인을 할 수 있지만, 관리해야할 서버가 많다면 로그들을 통합해서 봐야할 필요가 생긴다. 따라서 여러 서버들의 로그를 합치기 위한 솔루션으로는 ELK 스택을 통해 ElasticSearch로 로그가 쌓이도록 하고, 이를 Kibana로 조회하는 방법이 있다.

 

 

 

3. RabbitMQ 인프라 설정

현재까지 구현된 인프라의 구조를 본다면, 큐에 집어 넣는 것과 DB에 INSERT 하는 것이 하나의 애플리케이션으로 이루어져 있다. 통상적으로 RabbitMQ에 메시지를 넣는 과정이 DB에 데이터를 삽입하는 것보다 더 빠르다는 장점을 가지고 있다.

 

하지만 만약 실제 서비스라면, 간단히 DB에 데이터를 넣는 것이 아닌 다양한 로직이 함께 동작한다. 이 경우, 처리 시간은 길어지게 된다. 이러한 문제는 지금까지 구현된 구조를 각각의 애플리케이션으로 나누어서 해결을 할 수 있다. (지금까지는 Producer와 Consumer가 하나의 애플리케이션에 구현이 된 형태)

이 구조에서 뒤쪽의 애플리케이션은 사용자의 트래픽을 직접 받지 않게 되기에 뒤쪽 서비스가 잠시 전부 내려가 있어도 사용자는 체감하기 힘들다. 또한 배포하는 과정도 무중단으로 진행하기 수월해진다. (실제 서비스들도 이와 같은 구조로 구성된 경우가 많다.)

 

RabbitMQ 외에도 Kafka 솔루션도 많이 이용하기에 두 솔루션을 비교하여 서비스에 알맞는 것을 선택해야한다.


[참고] ellune.tistory.com/29

728x90