개요
- 개념
- 주의점
- 사용
개념
Kafka에서 consumer가 데이터를 받았다고 확인하는 용도로 offset을 사용합니다.
하지만 auto commit은 예외나 에러가 발생하여 comsume 과정에서 데이터를 못 가져오는 경우, 데이터 유실이 발생할 수 있습니다.
따라서 메시지를 읽었다는 offset commit 시기를 직접 조절하여 메시지 유실을 대비할 수 있습니다. (consume과정에서 에러 발생시, offset을 롤백하여 다음에 다시 읽어옵니다.)
❗주의점
일반적으로 auto commit을 비활성화하려면 enable.auto.commit=false를 설정하면 된다고 생각하기 쉽습니다.
하지만 enable.auto.commit=false를 설정해도 해당 토픽에 lag이 없는 것을 확인할 수 있는데, 이는 Spring이 자동으로 배치를 실행하면서 commit을 수행하기 때문입니다.
따라서 Spring 환경에서는 AckMode 도 함께 설정해야 합니다.
사용
@Bean
public ConsumerFactory<String, Object> batchConsumerFactory() {
Map<String, Object> props = new HashMap<>();
...
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
// auto commit 비활성화
// false로 설정하여도 spring이 자동으로 커밋함
// 완전 수동 커밋을 위해서는 AckMode를 MANUAL_IMMEDIATE로 설정해야함
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// consume 트랜젝션 수준 설정
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean(name="kafkaKpiBatchListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaKpiBatchListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(batchConsumerFactory());
//DTL에 전달하기 위해 수동 커밋
//전달되면 커밋하여 데이터 받았다고 표시
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
//에러 핸들러 설정
// 2.7.1 버전에서는 consume 레벨에서는 batch 에러 핸들링에 한계가 있음(retry, DLT 처리 불가)
//factory.setBatchErrorHandler(new SeekToCurrentBatchErrorHandler());
factory.setBatchListener(true);
factory.getContainerProperties().setPollTimeout(1000);
return factory;
}
📖 AckMode에서 사용 가능한 설정들은 아래와 같습니다
AcksMode
|
설명
|
RECORD
|
레코드 단위로 프로세싱 이후 커밋한다.
|
BATCH (default)
|
poll() 메서드로 호출된 레코드가 모두 처리된 이후 커밋한다.
|
TIME
|
특정 시간 이후에 커밋한다.이 옵션을 사용할 경우에는 시간 간격을 선언하는 `AckTime`옵션을 설정해야 한다.
|
COUNT
|
특정 개수만큼 레코드가 처리된 이후에 커밋한다.이 옵션을 사용할 경우에는 레코드 개수를 선언하는 `AckCount`옵션을 설정해야 한다.
|
COUNT_TIME
|
TIME, COUNT 옵션 중 맞는 조건이 하나라도 나올 경우 커밋한다.
|
MANUAL
|
Acknowledgement.acknowledge() 메서드가 호출되면 다음번 poll()때 커밋한다.매번 acknowledge() 메서드를 호출하면 BATCH 옵션과 동일하게 동작한다.이 옵션을 사용할 경우 AcknowledgingMessageListener 또는 BatchAcknowledgingMessageListener를 리스너로 사용해야 한다.
|
MANUAL_IMMEDIATE
|
Acknowledgement.acknowledge() 메서드를 호출한 즉시 커밋한다.이 옵션을 사용할 경우 AcknowledgingMessageListener 또는 BatchAcknowledgingMessageListener를 리스너로 사용해야 한다.
|
@KafkaListener(topics = "topic", containerFactory = "kafkaKpiBatchListenerContainerFactory")
public void listen(List<ConsumerRecord<String, Object>> records, Acknowledgment acknowledgment) throws Exception {
for (ConsumerRecord<String, Object> record : records) {
try {
Object value = record.value();
//헤더 정보 중 traceparent값 가져오기
//offset의 설정이 ealest이면 모든 데이터를 읽어오고, 해당 traceparent값을 이용하여 데이터를 가져왔는지 확인
//위 경우에는 해당 데이터를 consume했다는 상태를 저장할 DB 필요(redis가 가장 유력)
//현재는 latest이므로 주석 처리
//String traceparentValue = new String(record.headers().lastHeader("traceparent").value(), StandardCharsets.UTF_8);
//처리 로직 추가
} catch (Exception e) {
try{
//process하는 과정에서 실패한 메시지 DLT에 저장
sendToDLQ("Process Error Message", convertObjectToJsonNode(record.value()));
}catch (Exception ex){
//DLT에 저장 실패한 경우 데이터를 로그로 남김
logger.error("Error Sending Message to DLT : {}", record.value(), ex);
}
// 해당 throw를 사용하여 자동으로 DLT 생성 및 produce
// throw new BatchListenerFailedException("Failed to process", record);
}
}
try {
//이 부분에서 직접 offset commit
acknowledgment.acknowledge();
} catch (Exception e) {
logger.error("Error saving to OpenSearch or sending to DLQ", e);
//opensearch에 저장 실패한 데이터를 DLQ로 전송
for (ConsumerRecord<String, Object> record : records) {
try{
//process하는 과정에서 실패한 메시지 DLT에 저장
sendToDLQ("Error Message To Insert Opensearch", convertObjectToJsonNode(record.value()));
}catch (Exception ex){
//DLT에 저장 실패한 경우 데이터를 로그로 남김
logger.error("Error Message To Insert Opensearch : {}", record.value(), ex);
}
}
}
}
🙇♂️Ref: https://docs.spring.io/spring-kafka/docs/1.0.0.M1/reference/html/_reference.html
https://simsim231.tistory.com/293