JpaPagingItemReader와 Dirty Checking

4 min read

이번 글에서는 JpaPagingItemReader로 엔티티를 조회한 뒤, 값을 변경할 경우 발생하는 문제점을 살펴보고, 해결 방법에 대해서 알아보겠습니다.

이 글에서 사용된 예제는 Github에서 확인할 수 있습니다.

JpaPagingItemReader

JpaPagingItemReader는 다음과 같이 동작합니다.

  1. EntityManagerFactory를 사용해서 별도의 EntityManager 인스턴스를 생성해서 사용한다.
protected void doOpen() throws Exception {
  // ...
  entityManager = entityManagerFactory.createEntityManager(jpaPropertyMap);
  // ...
}
  1. 페이지를 읽을때 트랜잭션을 시작하고, EntityManagerflush() 메소드를 호출한다.
protected void doReadPage() {

  EntityTransaction tx = null;

  if (transacted) {
    tx = entityManager.getTransaction();
    tx.begin();

    entityManager.flush();
    entityManager.clear();
  }
  // ...
}
  1. 페이지를 조회한 뒤 트랜잭션을 커밋한다.

이로 인해서 다음과 같은 특성을 갖습니다.

  1. JpaPagingItemReader를 사용해서 조회한 엔티티와, JpaRepository를 사용해서 조회한 엔티티는 동일하지 않다.
  2. 마지막 페이지에 속한 엔티티의 변경사항은 데이터베이스에 반영되지 않는다.
  3. Chunk 단위의 트랜잭션과 다르게 동작한다.

문제점

실제로 발생할 수 있는 상황을 예로 들어보겠습니다. 다음과 같이 JpaPagingItemReader를 정의합니다.

@Bean
public ItemReader<Product> jpaPagingItemReader(EntityManagerFactory emf) {
  return new JpaPagingItemReaderBuilder<Product>()
      .queryString("SELECT p FROM Product p")
      .pageSize(PAGE_SIZE)
      .entityManagerFactory(emf)
      .name("product-reader")
      .build();
}

스텝을 다음과 같이 구성합니다.

  • 데이터베이스에 상태값이 NEW인 상품이 7개 존재한다.
  • 스텝의 chunkSize는 5로 지정한다.
  • ItemReader : JpaPagingItemReader를 사용해서 5개씩 상품을 읽는다.
  • ItemProcessor : 상품의 상태값을 PROCESS로 변경한다.
  • ItemWriter : 상품의 상태값을 DONE으로 변경한다.

Chunk 단위의 트랜잭션이 커밋되는 시점에, Dirty Checking을 통해서 상품의 변경사항이 DB에 반영되기를 의도했습니다.

스텝을 실행하면 다음과 같은 순서로 실행됩니다.

Executing step: [jpa-paging-item-reader-step]

// 첫번째 Chunk
↓ Chunk start
// 첫번째 페이지 조회
Process : Product(id=1, state=NEW) > PROCESS
Process : Product(id=2, state=NEW) > PROCESS
Process : Product(id=3, state=NEW) > PROCESS
Process : Product(id=4, state=NEW) > PROCESS
Process : Product(id=5, state=NEW) > PROCESS
Write   : Product(id=1, state=PROCESS) > DONE
Write   : Product(id=2, state=PROCESS) > DONE
Write   : Product(id=3, state=PROCESS) > DONE
Write   : Product(id=4, state=PROCESS) > DONE
Write   : Product(id=5, state=PROCESS) > DONE
↑ Chunk end

// 두번째 Chunk
↓ Chunk start
// 두번째 페이지를 읽기 전에 flush 실행되면서 변경사항이 DB에 반영됨
Updated : Product(id=1, state=DONE)
Updated : Product(id=2, state=DONE)
Updated : Product(id=3, state=DONE)
Updated : Product(id=4, state=DONE)
Updated : Product(id=5, state=DONE)

Process : Product(id=6, state=NEW) > PROCESS
Process : Product(id=7, state=NEW) > PROCESS
Write   : Product(id=6, state=PROCESS) > DONE
Write   : Product(id=7, state=PROCESS) > DONE
↑ Chunk end

// 두번째 Chunk 종료 이후에 flush 실행되지 않음
// 6번, 7번 상품의 상태 변경이 누락됨

// 상품 상태 출력
Step Finished
Product(id=1, state=DONE)
Product(id=2, state=DONE)
Product(id=3, state=DONE)
Product(id=4, state=DONE)
Product(id=5, state=DONE)
Product(id=6, state=NEW)
Product(id=7, state=NEW)

Step: [jpa-paging-item-reader-step] executed in 11ms

상품의 최종 상태를 보면, 6번과 7번 상품의 상태를 DONE으로 변경했지만, DB에 반영되지 않은 것을 확인할 수 있습니다.

해결 방법

스프링 배치에서 제공하는 RepositoryItemReader를 사용합니다.

@Bean
public ItemReader<Product> repositoryItemReader() {
  return new RepositoryItemReaderBuilder<Product>()
      .repository(productRepository)
      .methodName("findAll")
      .pageSize(PAGE_SIZE)
      .saveState(false)
      .sorts(Collections.singletonMap("id", Sort.Direction.ASC))
      .name("repository-item-reader")
      .build();
}

ProductRepositoryJpaRepository를 상속받아서 정의합니다.

public interface ProductRepository extends JpaRepository<Product, Integer> {
}

JpaRepositoryPagingAndSortingRepository를 상속받기 때문에 RepositoryItemReader 에서 사용할 수 있습니다.

public interface JpaRepository<T, ID> extends PagingAndSortingRepository<T, ID>, QueryByExampleExecutor<T> {
  // ...
}

RepositoryItemReader를 사용해서 동일한 스텝을 실행하면 다음과 같은 순서로 실행이 됩니다.

Executing step: [repository-item-reader-step]

// 첫번째 Chunk
↓ Chunk start
Process : Product(id=1, state=NEW) > PROCESS
Process : Product(id=2, state=NEW) > PROCESS
Process : Product(id=3, state=NEW) > PROCESS
Process : Product(id=4, state=NEW) > PROCESS
Process : Product(id=5, state=NEW) > PROCESS
Write   : Product(id=1, state=PROCESS) > DONE
Write   : Product(id=2, state=PROCESS) > DONE
Write   : Product(id=3, state=PROCESS) > DONE
Write   : Product(id=4, state=PROCESS) > DONE
Write   : Product(id=5, state=PROCESS) > DONE
Updated : Product(id=1, state=DONE)
Updated : Product(id=2, state=DONE)
Updated : Product(id=3, state=DONE)
Updated : Product(id=4, state=DONE)
Updated : Product(id=5, state=DONE)
↑ Chunk end

// 두번째 Chunk
↓ Chunk start
Process : Product(id=6, state=NEW) > PROCESS
Process : Product(id=7, state=NEW) > PROCESS
Write   : Product(id=6, state=PROCESS) > DONE
Write   : Product(id=7, state=PROCESS) > DONE
// 6번, 7번 상품의 상태도 DB에 반영됨
Updated : Product(id=6, state=DONE)
Updated : Product(id=7, state=DONE)
↑ Chunk end

// 상품 상태 출력
Step Finished
Product(id=1, state=DONE)
Product(id=2, state=DONE)
Product(id=3, state=DONE)
Product(id=4, state=DONE)
Product(id=5, state=DONE)
Product(id=6, state=DONE)
Product(id=7, state=DONE)
Step: [repository-item-reader-step] executed in 13ms

전체 상품의 상태값이 DONE으로 변경된 것을 확인할 수 있습니다.

결론

JpaPagingItemReader는 엔티티를 조회할 경우만 사용합니다. 조회한 엔티티의 값을 변경할 필요가 있을 경우 RepositoryItemReader를 사용합니다.

주의! RepositoryItemReader를 사용할 때, 엔티티의 조회의 조건문에 사용되는 필드의 값이 변경되는 경우, 전체 데이터가 처리되지 않는 문제가 발생하므로 주의해서 사용해야합니다.

참고

© 2022 Raegon Kim