Skip to content

Recommendation Task Queue

작성: 0chil

Recommendation Task Queue (명시적 실행 구조)

Section titled “Recommendation Task Queue (명시적 실행 구조)”

product.recommendation.queue는 DB 기반 큐를 사용하지만, 현재는 범용 TaskHandler 프레임워크에 태워서 추상화하지 않고 추천 도메인에 맞는 구체적인 스케줄러 + 핸들러 구조로 운영한다.

배경:

  • 기존 TaskHandler / TaskTopic / TaskQueuePoller 조합은 범용성이 높지만, 현재 사용처가 사실상 recommendation 하나라서 과한 추상화가 됨
  • 팀 유지보수 시 “실제로 언제 무엇을 어떻게 처리하는지”를 파악하는 허들이 생길 수 있음
  • recommendation은 트리거 관리(방문 기반 재추천 스케줄)까지 포함되어 도메인 로직이 명시적으로 드러나는 편이 유리함
MemberVisitedEvent
└─ RecommendationTaskMemberVisitedEventListener (AFTER_COMMIT)
└─ RecommendationTaskMemberVisitTrigger upsert
- memberId (PK)
- lastVisitedAt
- triggerAt (다음 추천 예정 시각)
RecommendationTaskTriggerCheckJob (@Scheduled, 1분)
└─ triggerAt <= now 인 row를 pick (임시 lease 적용, limit=100)
├─ RecommendationTaskQueueItem 생성 (PENDING)
├─ 다음 triggerAt 계산 후 갱신 (CAS)
└─ 더 이상 다음 트리거가 없으면 trigger row 삭제 (CAS)
RecommendationTaskHandler (@Scheduled, 30초)
└─ RecommendationTaskQueueItemRepository.pickRunnablePendingItems(limit=10)
├─ recommendationService.saveRecommendedProducts(...)
├─ 성공 → SUCCEEDED
└─ 실패 → FAILED

파일: src/main/kotlin/dev/vingle/product/recommendation/queue/service/RecommendationTaskMemberVisitedEventListener.kt

  • 이벤트: MemberVisitedEvent
  • 실행 시점: @TransactionalEventListener(AFTER_COMMIT)
  • 동작:
    • lastVisitedAt = now
    • triggerAt = now + 1시간 (첫 추천 시점)
    • sessionTimeout = 1시간
    • upsertRefreshingAfterSessionTimeout(...) 호출

의미:

  • row가 없으면 insert
  • row가 있고, 마지막 방문이 세션 타임아웃(1시간) 이전이면 lastVisitedAt, triggerAt 갱신
  • 세션 타임아웃 이내 재방문이면 no-op

2. trigger check job (방문 기반 재추천 스케줄링)

Section titled “2. trigger check job (방문 기반 재추천 스케줄링)”

파일: src/main/kotlin/dev/vingle/product/recommendation/queue/service/RecommendationTaskTriggerCheckJob.kt

  • 스케줄: @Scheduled(fixedDelay = 1분, scheduler = "batchTaskScheduler")
  • 배치 크기: triggerPickLimit = 100
  • 임시 lease: triggerLeaseDuration = 5분

동작 순서:

  1. now, leasedTriggerAt = now + 5분 계산
  2. pickByTriggerAtBefore(triggerAtBefore = now, leasedTriggerAt = leasedTriggerAt, limit = 100)
  3. pick된 trigger들로 RecommendationTaskQueueItem(PENDING) 일괄 삽입
  4. 각 trigger에 대해 다음 시각 계산
    • 있으면 updateTriggerAtIfMatch(...)
    • 없으면 deleteIfMatch(...)

3. recommendation queue handler (명시적 poller)

Section titled “3. recommendation queue handler (명시적 poller)”

파일: src/main/kotlin/dev/vingle/product/recommendation/queue/service/RecommendationTaskHandler.kt

  • 스케줄: @Scheduled(initialDelay = 30초, fixedDelay = 30초, scheduler = "batchTaskScheduler")
  • pick limit: 10
  • 동시 처리: 5
  • 실행기: virtualThreadExecutor 주입

동작 순서:

  1. pickRunnablePendingItems(10) 으로 실행 가능한 큐 아이템 가져오기
  2. 5개씩 chunk로 나눠 병렬 실행 (CompletableFuture + Executor)
  3. 각 아이템마다
    • execute -> recommendationService.saveRecommendedProducts(...)
    • 성공 시 finish(..., succeeded = true)
    • 실패 시 finish(..., succeeded = false)

파일: src/main/kotlin/dev/vingle/product/recommendation/queue/domain/RecommendationTaskMemberVisitTrigger.kt

필드:

  • memberId: Long (PK)
  • lastVisitedAt: LocalDateTime
  • triggerAt: LocalDateTime
  • createdAt, updatedAt (BaseEntityKt)

인덱스:

  • ix_rtmvt_trigger_at (triggerAt)

설계 포인트:

  • memberId를 PK로 사용 (회원당 1 row)
  • Persistable<Long> 구현 (isNew() = createdAt == null)
  • triggerAt가 다음 추천 예정 시각이며 trigger check job의 조회 기준

파일: src/main/kotlin/dev/vingle/product/recommendation/queue/domain/RecommendationTaskQueueItem.kt

필드:

  • id: Long (PK, auto increment)
  • memberId: Long
  • candidateGenerationType: G0 | G1 | G2
  • status: PENDING | RUNNING | SUCCEEDED | FAILED
  • pickedAt: LocalDateTime?
  • finishedAt: LocalDateTime?

인덱스:

  • ix_rtqi_status (status)

설계 포인트:

  • 기존 runAfter는 제거됨
  • 실행 시점 제어는 RecommendationTaskMemberVisitTrigger.triggerAt에서 담당
  • queue item은 “즉시 실행 가능한 작업”만 표현

RecommendationTaskTriggerCheckJob는 방문 시점(lastVisitedAt) 기준 오프셋 목록으로 다음 트리거를 계산한다.

오프셋 목록:

  • 1h
  • 4h
  • 12h, 24h, 36h, 48h, 60h, 72h (3일까지 12시간 간격)
  • 96h, 120h, 144h, 168h (7일까지 24시간 간격)
  • 이후 없음 (null -> trigger row 삭제)

다음 트리거 계산 방식:

  • recommendationDurationsAfterLastVisit를 순회
  • lastVisitedAt + duration > now 인 첫 시각을 다음 triggerAt로 사용

장점:

  • 분기문보다 스케줄 규칙이 직접 드러남
  • 지연 실행되어도 과거 시각을 다시 쓰지 않음

RecommendationTaskTriggerCheckJob.findCandidateGenerationType(...)

  • 방문 후 <= 3일: G2
  • 방문 후 <= 7일: G1
  • 그 이후: G0

주의:

  • 현재는 now 기준으로 계산한다.
  • 스케줄러 지연이 큰 경우 경계 시점(특히 3일/7일)에 기대 타입과 달라질 수 있으므로 필요 시 triggerAt 기준 계산으로 변경 가능.

pickByTriggerAtBefore(...)는 내부에서 짧은 트랜잭션으로:

  1. trigger_at <= now 대상 member_id 후보 조회 (락 없이, LIMIT N)
  2. 조회된 member_id 후보를 IN (...) FOR UPDATE SKIP LOCKED로 row lock
  3. pick된 row의 trigger_atleasedTriggerAt(예: now + 5분)으로 즉시 변경

효과:

  • 동일한 due trigger row가 동시에 다시 pick되어 큐에 중복 삽입되는 문제를 크게 줄임

finalize 단계 CAS (조건부 갱신/삭제)

Section titled “finalize 단계 CAS (조건부 갱신/삭제)”

큐 삽입 후 최종 처리 시:

  • updateTriggerAtIfMatch(...)
  • deleteIfMatch(...)

두 메서드 모두 memberId + expectedLastVisitedAt + expectedTriggerAt를 조건으로 사용한다.

효과:

  • trigger row가 그 사이에 다른 이벤트(예: 새 방문)로 갱신된 경우 덮어쓰지 않음
  • queue insert 후 프로세스가 죽으면 leasedTriggerAt가 남을 수 있음
  • lease 시간(현재 5분) 이후 다시 pick되어 재시도 가능
  • 즉, trigger 처리 자체는 at-least-once 성질을 가짐
항목위치
trigger check 주기1분RecommendationTaskTriggerCheckJob
trigger pick limit100RecommendationTaskTriggerCheckJob
trigger lease duration5분RecommendationTaskTriggerCheckJob
queue handler 주기30초RecommendationTaskHandler
queue pick limit10RecommendationTaskHandler
queue 동시 처리5RecommendationTaskHandler

패키지 dev.vingle.taskqueue (TaskHandler, TaskTopic, TaskQueuePoller, TaskQueuePollerRegistrar)는 여전히 코드베이스에 남아 있을 수 있다.

하지만 recommendation 처리 경로는 더 이상 이 추상화에 의존하지 않고, 위 문서의 명시적 스케줄러/핸들러 구조를 기준으로 이해하면 된다.