카테고리 없음

Kafka Consumer Commit Config

자코린이 2024. 12. 11. 18:11

개요

  • 개념
    • 주의점
  • 사용

 

개념

Kafka에서 consumer가 데이터를 받았다고 확인하는 용도로 offset을 사용합니다.

하지만 auto commit은 예외나 에러가 발생하여 comsume 과정에서 데이터를 못 가져오는 경우, 데이터 유실이 발생할 수 있습니다.

따라서 메시지를 읽었다는 offset commit 시기를 직접 조절하여 메시지 유실을 대비할 수 있습니다. (consume과정에서 에러 발생시, offset을 롤백하여 다음에 다시 읽어옵니다.)

 

❗주의점

일반적으로 auto commit을 비활성화하려면 enable.auto.commit=false를 설정하면 된다고 생각하기 쉽습니다.

하지만 enable.auto.commit=false를 설정해도 해당 토픽에 lag이 없는 것을 확인할 수 있는데, 이는 Spring이 자동으로 배치를 실행하면서 commit을 수행하기 때문입니다.

따라서 Spring 환경에서는 AckMode 도 함께 설정해야 합니다.

 

사용

	@Bean
	public ConsumerFactory<String, Object> batchConsumerFactory() {
		Map<String, Object> props = new HashMap<>();
		...
		props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
		// auto commit 비활성화
		// false로 설정하여도 spring이 자동으로 커밋함
		// 완전 수동 커밋을 위해서는 AckMode를 MANUAL_IMMEDIATE로 설정해야함
		props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

		// consume 트랜젝션 수준 설정
		props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
		return new DefaultKafkaConsumerFactory<>(props);
	}

	@Bean(name="kafkaKpiBatchListenerContainerFactory")
	public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaKpiBatchListenerContainerFactory() {
		ConcurrentKafkaListenerContainerFactory<String, Object> factory =
				new ConcurrentKafkaListenerContainerFactory<>();
		factory.setConsumerFactory(batchConsumerFactory());
		//DTL에 전달하기 위해 수동 커밋
		//전달되면 커밋하여 데이터 받았다고 표시
		factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
		//에러 핸들러 설정
		// 2.7.1 버전에서는 consume 레벨에서는 batch 에러 핸들링에 한계가 있음(retry, DLT 처리 불가)
		//factory.setBatchErrorHandler(new SeekToCurrentBatchErrorHandler());
		factory.setBatchListener(true);
		factory.getContainerProperties().setPollTimeout(1000);
		return factory;
	}

📖 AckMode에서 사용 가능한 설정들은 아래와 같습니다

AcksMode
설명
RECORD
레코드 단위로 프로세싱 이후 커밋한다.
BATCH (default)
poll() 메서드로 호출된 레코드가 모두 처리된 이후 커밋한다.
TIME
특정 시간 이후에 커밋한다.이 옵션을 사용할 경우에는 시간 간격을 선언하는 `AckTime`옵션을 설정해야 한다.
COUNT
특정 개수만큼 레코드가 처리된 이후에 커밋한다.이 옵션을 사용할 경우에는 레코드 개수를 선언하는 `AckCount`옵션을 설정해야 한다.
COUNT_TIME
TIME, COUNT 옵션 중 맞는 조건이 하나라도 나올 경우 커밋한다.
MANUAL
Acknowledgement.acknowledge() 메서드가 호출되면 다음번 poll()때 커밋한다.매번 acknowledge() 메서드를 호출하면 BATCH 옵션과 동일하게 동작한다.이 옵션을 사용할 경우 AcknowledgingMessageListener 또는 BatchAcknowledgingMessageListener를 리스너로 사용해야 한다.
MANUAL_IMMEDIATE
Acknowledgement.acknowledge() 메서드를 호출한 즉시 커밋한다.이 옵션을 사용할 경우 AcknowledgingMessageListener 또는 BatchAcknowledgingMessageListener를 리스너로 사용해야 한다.

 

@KafkaListener(topics = "topic", containerFactory = "kafkaKpiBatchListenerContainerFactory")
    public void listen(List<ConsumerRecord<String, Object>> records, Acknowledgment acknowledgment) throws Exception {

        for (ConsumerRecord<String, Object> record : records) {
            try {

                Object value = record.value();
                //헤더 정보 중 traceparent값 가져오기
                //offset의 설정이 ealest이면 모든 데이터를 읽어오고, 해당 traceparent값을 이용하여 데이터를 가져왔는지 확인
                //위 경우에는 해당 데이터를 consume했다는 상태를 저장할 DB 필요(redis가 가장 유력)
                //현재는 latest이므로 주석 처리
                //String traceparentValue = new String(record.headers().lastHeader("traceparent").value(), StandardCharsets.UTF_8);

                //처리 로직 추가
            } catch (Exception e) {
                try{
                    //process하는 과정에서 실패한 메시지 DLT에 저장
                    sendToDLQ("Process Error Message", convertObjectToJsonNode(record.value()));
                }catch (Exception ex){
                    //DLT에 저장 실패한 경우 데이터를 로그로 남김
                    logger.error("Error Sending Message to DLT : {}", record.value(), ex);
                }
                // 해당 throw를 사용하여 자동으로 DLT 생성 및 produce
                // throw new BatchListenerFailedException("Failed to process", record);
            }
        }

        try {
						//이 부분에서 직접 offset commit
            acknowledgment.acknowledge();
        } catch (Exception e) {
            logger.error("Error saving to OpenSearch or sending to DLQ", e);

            //opensearch에 저장 실패한 데이터를 DLQ로 전송
            for (ConsumerRecord<String, Object> record : records) {
                try{
                    //process하는 과정에서 실패한 메시지 DLT에 저장
                    sendToDLQ("Error Message To Insert Opensearch", convertObjectToJsonNode(record.value()));
                }catch (Exception ex){
                    //DLT에 저장 실패한 경우 데이터를 로그로 남김
                    logger.error("Error Message To Insert Opensearch : {}", record.value(), ex);
                }

            }
        }
    }

🙇‍♂️Ref: https://docs.spring.io/spring-kafka/docs/1.0.0.M1/reference/html/_reference.html

https://simsim231.tistory.com/293

https://dkswnkk.tistory.com/744

https://mycup.tistory.com/437