본문 바로가기

학습 기록 (Learning Logs)/Today I Learned

분산 시스템 장애 복구

분산 시스템에서 서비스 장애나 노드 장애가 발생했을 때, 복구 전략이나 재시작 로직을 설계하면서 고려했던 요소와 실제 경험이 있다면 설명해주세요.

 

장애 탐지 방법, 복구 전략(자동/수동), 데이터 정합성 유지 방법, 장애 전파 방지 등의 관점에서 설명해 보세요. 실제 경험이 없다면 학습한 내용이나 토이 프로젝트에서 고려했던 부분을 공유해도 좋습니다.

 


 

장애탐지 방법으로는 로그를 남기는 것과, 서버의 cpu상태 메모리 점유율 상태가 평소와 다르게 증가하는 경우 슬랙으로 알림 설정을 하여 모니터닝을 하도록 해야합니다. 

 

또한 정기적 ping test와 헬스체크를 통해 서버가 죽었다면 개발자에게 알림이 날려주는 alert도 구현해놔야합니다 

 

장애복구 방법으로는 이벤트 메세지의 경우 단일이 아닌 클러스터집단으로 사용하여 최소 3개 이상으로 등록하여 장애시 선출하여 진행하도록 합니다. 

 

평소에는 레플리케이션을 만들어놔야합니다. kafka를 사용하는 이벤트처리에서는 기록이 되어있기때문에 복구가 가능합니다. 하지만 레디스에 의존하는 경우 단일 레디스 환경에서는 레디스가 인메모리캐시이기때문에 저장이 되지않습니다. 

 

그래서 레디스 클러스터 환경을 구축해놓거나 wal설정으로 입력은 기록하도록하여 장애가 발생해도 데이터를 복구할 수 있도록 합니다.

 

 


 

  • 문제 상황:
    Kafka 기반 데이터 파이프라인에서 consumer가 중단되면 메시지를 처리하지 못해 데이터 누락 문제가 발생함.
  • 해결 방안:
    • 오프셋(offset)을 명시적으로 관리하여, consumer가 중단된 후 재시작 시 어느 지점부터 다시 데이터를 처리할지 결정함.
    • 실패 상황에서는 일정 시간 간격을 둔 재시도 로직을 추가하여 안정성을 높임.

 

Kafka에서 오프셋을 명시적으로 관리한다

 

메시지 소비 후에 어떤 시점까지 읽었는지(처리 완료한 메시지의 위치)를 자동으로 커밋하지 않고

애플리케이션 로직에서 직접 커밋하는 것을 의미

 

이를 통해 오류 발생 시 재처리나 중복 처리를 방지하고, 보다 정밀한 메시지 처리 제어가 가능해집니다.

1. 기본 설정 변경

기본적으로 Kafka 소비자는 enable.auto.commit 옵션이 true로 설정되어 있어 주기적으로 오프셋을 자동 커밋합니다.

이를 비활성화하여 명시적으로 관리하려면 다음과 같이 설정합니다.

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "false"); // 자동 커밋 비활성화
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

 

 

 

2. 메시지 처리 후 오프셋 커밋

메시지를 처리한 후, 즉 작업이 성공적으로 완료된 후에 커밋을 수행합니다. 이때 두 가지 방식이 있습니다.

 

동기(Synchronous) 커밋:
커밋이 완료될 때까지 기다리므로, 처리 완료와 커밋이 보장됩니다. 단, 그만큼 처리 시간이 늘어날 수 있습니다.

try {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        // 각 레코드 처리 로직
    }
    consumer.commitSync(); // 동기 커밋
} catch (Exception e) {
    // 예외 처리 로직
}

 

비동기(Asynchronous) 커밋:
커밋 요청을 보내고 바로 다음 작업으로 넘어가므로, 처리 시간은 단축되지만 커밋 실패에 대한 처리를 별도로 해주어야 합니다.

consumer.commitAsync(new OffsetCommitCallback() {
    @Override
    public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
        if (exception != null) {
            // 커밋 실패에 대한 로그 기록 또는 재시도 로직 등
        }
    }
});

 

 

3. 커밋 시점 선택

명시적 오프셋 관리를 하는 주된 이유는, 애플리케이션 로직에서 오류 발생 시 다시 처리할 메시지 범위를 명확하게 관리할 수 있기 때문입니다. 예를 들어:

  • 정확한 처리 보장: 메시지를 완전히 처리한 후 커밋하면, 재시작 시 중복 처리하지 않습니다.
  • 실패 복구: 처리 도중 예외가 발생하면 커밋을 하지 않으므로, 재시작 시 같은 메시지를 재처리할 수 있습니다.
  • 배치 커밋: 특정 배치 단위로 처리 후 커밋하거나, 특정 수 이상의 메시지를 처리한 후 커밋하는 방식으로 성능과 안정성의 균형을 맞출 수 있습니다.

4. 추가적인 고려 사항

  • 트랜잭션: Kafka 프로듀서와 소비자 모두 트랜잭션을 지원하며, 복잡한 메시지 처리 시에 트랜잭션을 활용하면 여러 파티션에 걸쳐 원자적인 커밋을 할 수 있습니다.
  • 오프셋 저장 위치 변경: 기본적으로 Kafka는 내부 토픽(__consumer_offsets)에 오프셋을 저장하지만, 애플리케이션에 따라 별도의 외부 저장소(예: 데이터베이스)를 사용하기도 합니다.

 


idempotent

동일 메시지를 여러 번 처리해도 최종 결과가 중복 없이 일관되도록 설계.

1. 고유 식별자(Idempotency Key) 사용 및 체크

// 메시지 처리 전에 고유 ID 체크
if (processedMessageIds.contains(messageId)) {
    // 이미 처리된 메시지라면 로직을 스킵
    return;
}

// 메시지 처리 로직 실행

// 처리 완료 후 고유 ID 기록 저장
processedMessageIds.add(messageId);
  • 메시지에 고유 식별자 포함:
    각 메시지에 중복 처리를 피할 수 있는 고유한 ID(예: UUID)를 포함시킵니다.
  • 처리 이력 기록:
    메시지를 처리할 때, 먼저 해당 고유 ID가 이미 처리되었는지 데이터베이스나 캐시(예: Redis 등)를 통해 확인합니다.
    • 만약 이미 처리한 메시지라면, 처리를 스킵하거나 업데이트가 필요 없음을 판단합니다.
    • 만약 처리된 기록이 없다면, 메시지를 처리한 후 해당 ID를 저장합니다.

 

2. Upsert(Insert 또는 Update) 활용

INSERT INTO user_status (user_id, status, updated_at)
VALUES (?, ?, ?)
ON DUPLICATE KEY UPDATE status = VALUES(status), updated_at = VALUES(updated_at);

데이터베이스 작업의 경우, 동일한 결과를 만들어내도록 Upsert(Insert하거나 이미 존재하면 Update하는) 쿼리를 작성할 수 있습니다.

이런 식의 쿼리를 사용하면, 동일한 메시지가 여러 번 들어와도 최종 상태는 동일하게 유지됩니다.

 

 

3. Kafka Idempotent Producer 사용

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("enable.idempotence", "true");  // idempotence 활성화

 

Kafka에서는 프로듀서 수준에서도 idempotence를 지원합니다. Kafka 0.11 이상부터 아래 설정을 통해 정확히 한 번 전송(exactly-once semantics)에 가까운 메시지 전송을 보장할 수 있습니다.

 

동일 메시지가 재전송되더라도, Kafka 브로커는 중복된 레코드를 걸러내고 유일한 결과만 기록하도록 보장합니다.

 

 


Redis 기반 상태 관리

애플리케이션의 상태(예, 현재 처리한 메시지의 오프셋, 노드의 헬스체크 결과 등)를 빠르게 저장하고 조회하기 위해

Redis의 인메모리 저장소를 활용하는 방법

 

 

1. 상태 정보 저장 및 조회

  • 키-밸류 구조 활용:
    Redis는 키-값 형태로 데이터를 저장하므로, 처리 중인 상태 정보를 식별할 수 있는 고유 키와 해당 상태 값을 저장합니다.
    예를 들어, Kafka 컨슈머의 경우 “consumer:offset:myTopic” 같은 키에 마지막으로 처리한 오프셋 값을 저장할 수 있습니다.
  • 메시지 처리 전후로 Redis에 상태 정보를 저장하고 조회할 수 있습니다.
// Redis 클라이언트 생성
Jedis jedis = new Jedis("localhost", 6379);

// 상태 조회: 예를 들어, 특정 토픽의 마지막 처리 오프셋
String key = "consumer:offset:myTopic";
String offsetString = jedis.get(key);
int offset = offsetString != null ? Integer.parseInt(offsetString) : 0;

// 메시지 처리 후 상태 업데이트:
int newOffset = offset + processedCount;
jedis.set(key, String.valueOf(newOffset));

 

2. 원자성(Atomicity)과 동시성 처리

  • 원자적 명령어:
    Redis는 단일 스레드로 동작하며 여러 클라이언트가 동시에 요청을 보내더라도 각 명령은 원자적으로 실행됩니다. 이를 활용해, 상태 업데이트 시 동시성 문제를 피할 수 있습니다.
  • 분산 락(Distributed Lock):
    여러 인스턴스가 동시에 같은 리소스를 업데이트할 가능성이 있는 경우, SET NX (Not eXists 옵션)와 만료시간을 설정하여 분산 락을 구현할 수 있습니다.
// 락 획득 시도: 5초의 TTL을 가진 분산 락
String lockKey = "lock:consumer:myTopic";
String lockResult = jedis.set(lockKey, "locked", "NX", "EX", 5);
if ("OK".equals(lockResult)) {
    // 락을 획득한 경우 상태 업데이트 로직 실행
    // 작업 완료 후 락 해제
    jedis.del(lockKey);
} else {
    // 락 획득 실패: 다른 인스턴스가 처리 중이므로 재시도 혹은 대기
}

 

 

3. TTL(Time-To-Live) 설정 및 만료 관리

  • 만료 시간 사용:
    상태 값이 오랜 시간 동안 유효하지 않다면, 자동으로 만료되도록 설정할 수 있습니다. 예를 들어 노드의 헬스체크 값은 일정 시간마다 업데이트되며, 만료된 경우 해당 노드가 비정상적인 것으로 판단할 수 있습니다.
String healthKey = "node:health:node1";
jedis.setex(healthKey, 10, "alive");  // 10초 후 자동 만료

 


서킷 브레이커(Circuit Breaker) 패턴

시스템에서 외부 서비스나 네트워크 호출 시 연속적인 실패가 발생하는 경우, 그 호출을 잠시 중단하여 전체 시스템의 장애 전파를 방지하고, 복구 여부를 점검하기 위한 디자인 패턴.

 

1. 서킷 브레이커의 주요 상태

  • Closed (정상 상태):
    • 모든 호출이 정상적으로 수행됩니다.
    • 호출 실패가 임계치를 넘지 않는 한 계속해서 외부 서비스에 요청을 보냅니다.
  • Open (차단 상태):
    • 실패가 임계치에 도달하면 서킷 브레이커가 열려서 외부 서비스에 요청하지 않습니다.
    • 이 상태에서는 즉시 예외를 반환하거나 대체 로직을 실행합니다.
  • Half-Open (시험 상태):
    • 일정 시간 후 서킷 브레이커는 제한적인 호출을 허용하여 외부 서비스 복구 여부를 확인합니다.
    • 만약 해당 호출이 성공하면 서킷 브레이커는 다시 Closed 상태로 돌아가고, 실패하면 다시 Open 상태로 전환됩니다.

 

2. 구현 단계

  1. 실패 카운터 및 임계치 설정:
    • 호출 실패 횟수를 카운트하고, 사전에 정의한 임계치(threshold) 이상 실패하면 브레이커를 Open 상태로 전환합니다.
  2. 타이머 설정:
    • 브레이커가 Open 상태에 머무르는 시간을 설정합니다. 이 시간 동안에는 외부 호출을 차단합니다.
  3. 상태 전환 로직:
    • Open 상태 후 일정 시간이 경과하면 Half-Open 상태로 전환하여 제한된 테스트 호출을 수행합니다.
    • 테스트 호출 결과에 따라, 성공하면 Closed 상태로, 실패하면 다시 Open 상태로 전환합니다.
  4. 예외 처리 및 폴백(Fallback) 로직:
    • 외부 서비스 호출 실패 시, 대체 로직(예: 캐시된 결과 반환 혹은 디폴트 값 반환)을 수행하도록 설계합니다.
public class CircuitBreaker {
    // 상태 정의 (0: Closed, 1: Open, 2: Half-Open)
    private enum State { CLOSED, OPEN, HALF_OPEN }
    
    private State state = State.CLOSED;
    private int failureCount = 0;
    private final int failureThreshold = 5;  // 예: 5회 이상 실패시 Open 상태
    private final long timeout = 10000;      // Open 상태 유지 시간 (밀리초)
    private long lastFailureTime = 0;

    public synchronized <T> T call(Callable<T> callable) throws Exception {
        // Open 상태일 경우 타임아웃 여부 체크
        if (state == State.OPEN) {
            if (System.currentTimeMillis() - lastFailureTime > timeout) {
                state = State.HALF_OPEN;
            } else {
                throw new Exception("Circuit breaker is OPEN");
            }
        }

        try {
            T result = callable.call();  // 외부 서비스 호출
            // 성공했으면 상태를 재설정
            if(state == State.HALF_OPEN) {
                // Half-Open 상태에서 성공하면 Closed 상태로 전환
                reset();
            }
            return result;
        } catch(Exception e) {
            recordFailure();
            throw e;
        }
    }

    private void recordFailure() {
        failureCount++;
        lastFailureTime = System.currentTimeMillis();
        if (failureCount >= failureThreshold) {
            state = State.OPEN;
        }
    }

    private void reset() {
        failureCount = 0;
        state = State.CLOSED;
    }
}