Recommendation Task Queue
작성: 0chilRecommendation Task Queue (명시적 실행 구조)
Section titled “Recommendation Task Queue (명시적 실행 구조)”product.recommendation.queue는 DB 기반 큐를 사용하지만, 현재는 범용 TaskHandler 프레임워크에 태워서 추상화하지 않고
추천 도메인에 맞는 구체적인 스케줄러 + 핸들러 구조로 운영한다.
배경:
- 기존
TaskHandler/TaskTopic/TaskQueuePoller조합은 범용성이 높지만, 현재 사용처가 사실상 recommendation 하나라서 과한 추상화가 됨 - 팀 유지보수 시 “실제로 언제 무엇을 어떻게 처리하는지”를 파악하는 허들이 생길 수 있음
- recommendation은 트리거 관리(방문 기반 재추천 스케줄)까지 포함되어 도메인 로직이 명시적으로 드러나는 편이 유리함
현재 구조 (요약)
Section titled “현재 구조 (요약)”MemberVisitedEvent └─ RecommendationTaskMemberVisitedEventListener (AFTER_COMMIT) └─ RecommendationTaskMemberVisitTrigger upsert - memberId (PK) - lastVisitedAt - triggerAt (다음 추천 예정 시각)
RecommendationTaskTriggerCheckJob (@Scheduled, 1분) └─ triggerAt <= now 인 row를 pick (임시 lease 적용, limit=100) ├─ RecommendationTaskQueueItem 생성 (PENDING) ├─ 다음 triggerAt 계산 후 갱신 (CAS) └─ 더 이상 다음 트리거가 없으면 trigger row 삭제 (CAS)
RecommendationTaskHandler (@Scheduled, 30초) └─ RecommendationTaskQueueItemRepository.pickRunnablePendingItems(limit=10) ├─ recommendationService.saveRecommendedProducts(...) ├─ 성공 → SUCCEEDED └─ 실패 → FAILED핵심 컴포넌트
Section titled “핵심 컴포넌트”1. 방문 이벤트 → trigger upsert
Section titled “1. 방문 이벤트 → trigger upsert”파일: src/main/kotlin/dev/vingle/product/recommendation/queue/service/RecommendationTaskMemberVisitedEventListener.kt
- 이벤트:
MemberVisitedEvent - 실행 시점:
@TransactionalEventListener(AFTER_COMMIT) - 동작:
lastVisitedAt = nowtriggerAt = now + 1시간(첫 추천 시점)sessionTimeout = 1시간upsertRefreshingAfterSessionTimeout(...)호출
의미:
- row가 없으면 insert
- row가 있고, 마지막 방문이 세션 타임아웃(1시간) 이전이면
lastVisitedAt,triggerAt갱신 - 세션 타임아웃 이내 재방문이면 no-op
2. trigger check job (방문 기반 재추천 스케줄링)
Section titled “2. trigger check job (방문 기반 재추천 스케줄링)”파일: src/main/kotlin/dev/vingle/product/recommendation/queue/service/RecommendationTaskTriggerCheckJob.kt
- 스케줄:
@Scheduled(fixedDelay = 1분, scheduler = "batchTaskScheduler") - 배치 크기:
triggerPickLimit = 100 - 임시 lease:
triggerLeaseDuration = 5분
동작 순서:
now,leasedTriggerAt = now + 5분계산pickByTriggerAtBefore(triggerAtBefore = now, leasedTriggerAt = leasedTriggerAt, limit = 100)- pick된 trigger들로
RecommendationTaskQueueItem(PENDING)일괄 삽입 - 각 trigger에 대해 다음 시각 계산
- 있으면
updateTriggerAtIfMatch(...) - 없으면
deleteIfMatch(...)
- 있으면
3. recommendation queue handler (명시적 poller)
Section titled “3. recommendation queue handler (명시적 poller)”파일: src/main/kotlin/dev/vingle/product/recommendation/queue/service/RecommendationTaskHandler.kt
- 스케줄:
@Scheduled(initialDelay = 30초, fixedDelay = 30초, scheduler = "batchTaskScheduler") - pick limit:
10 - 동시 처리:
5 - 실행기:
virtualThreadExecutor주입
동작 순서:
pickRunnablePendingItems(10)으로 실행 가능한 큐 아이템 가져오기5개씩 chunk로 나눠 병렬 실행 (CompletableFuture + Executor)- 각 아이템마다
execute->recommendationService.saveRecommendedProducts(...)- 성공 시
finish(..., succeeded = true) - 실패 시
finish(..., succeeded = false)
데이터 모델
Section titled “데이터 모델”RecommendationTaskMemberVisitTrigger
Section titled “RecommendationTaskMemberVisitTrigger”파일: src/main/kotlin/dev/vingle/product/recommendation/queue/domain/RecommendationTaskMemberVisitTrigger.kt
필드:
memberId: Long(PK)lastVisitedAt: LocalDateTimetriggerAt: LocalDateTimecreatedAt,updatedAt(BaseEntityKt)
인덱스:
ix_rtmvt_trigger_at(triggerAt)
설계 포인트:
memberId를 PK로 사용 (회원당 1 row)Persistable<Long>구현 (isNew() = createdAt == null)triggerAt가 다음 추천 예정 시각이며 trigger check job의 조회 기준
RecommendationTaskQueueItem
Section titled “RecommendationTaskQueueItem”파일: src/main/kotlin/dev/vingle/product/recommendation/queue/domain/RecommendationTaskQueueItem.kt
필드:
id: Long(PK, auto increment)memberId: LongcandidateGenerationType: G0 | G1 | G2status: PENDING | RUNNING | SUCCEEDED | FAILEDpickedAt: LocalDateTime?finishedAt: LocalDateTime?
인덱스:
ix_rtqi_status(status)
설계 포인트:
- 기존
runAfter는 제거됨 - 실행 시점 제어는
RecommendationTaskMemberVisitTrigger.triggerAt에서 담당 - queue item은 “즉시 실행 가능한 작업”만 표현
트리거 스케줄 규칙
Section titled “트리거 스케줄 규칙”RecommendationTaskTriggerCheckJob는 방문 시점(lastVisitedAt) 기준 오프셋 목록으로 다음 트리거를 계산한다.
오프셋 목록:
1h4h12h, 24h, 36h, 48h, 60h, 72h(3일까지 12시간 간격)96h, 120h, 144h, 168h(7일까지 24시간 간격)- 이후 없음 (
null-> trigger row 삭제)
다음 트리거 계산 방식:
recommendationDurationsAfterLastVisit를 순회lastVisitedAt + duration > now인 첫 시각을 다음triggerAt로 사용
장점:
- 분기문보다 스케줄 규칙이 직접 드러남
- 지연 실행되어도 과거 시각을 다시 쓰지 않음
Candidate Generation Type 규칙
Section titled “Candidate Generation Type 규칙”RecommendationTaskTriggerCheckJob.findCandidateGenerationType(...)
- 방문 후
<= 3일:G2 - 방문 후
<= 7일:G1 - 그 이후:
G0
주의:
- 현재는
now기준으로 계산한다. - 스케줄러 지연이 큰 경우 경계 시점(특히 3일/7일)에 기대 타입과 달라질 수 있으므로 필요 시
triggerAt기준 계산으로 변경 가능.
동시성 / 실패 처리 전략
Section titled “동시성 / 실패 처리 전략”trigger pick 중복 방지 (임시 lease)
Section titled “trigger pick 중복 방지 (임시 lease)”pickByTriggerAtBefore(...)는 내부에서 짧은 트랜잭션으로:
trigger_at <= now대상member_id후보 조회 (락 없이,LIMIT N)- 조회된
member_id후보를IN (...) FOR UPDATE SKIP LOCKED로 row lock - pick된 row의
trigger_at를leasedTriggerAt(예:now + 5분)으로 즉시 변경
효과:
- 동일한 due trigger row가 동시에 다시 pick되어 큐에 중복 삽입되는 문제를 크게 줄임
finalize 단계 CAS (조건부 갱신/삭제)
Section titled “finalize 단계 CAS (조건부 갱신/삭제)”큐 삽입 후 최종 처리 시:
updateTriggerAtIfMatch(...)deleteIfMatch(...)
두 메서드 모두 memberId + expectedLastVisitedAt + expectedTriggerAt를 조건으로 사용한다.
효과:
- trigger row가 그 사이에 다른 이벤트(예: 새 방문)로 갱신된 경우 덮어쓰지 않음
실패 시 동작 성질
Section titled “실패 시 동작 성질”- queue insert 후 프로세스가 죽으면
leasedTriggerAt가 남을 수 있음 - lease 시간(현재 5분) 이후 다시 pick되어 재시도 가능
- 즉, trigger 처리 자체는 at-least-once 성질을 가짐
운영 파라미터 (현재값)
Section titled “운영 파라미터 (현재값)”| 항목 | 값 | 위치 |
|---|---|---|
| trigger check 주기 | 1분 | RecommendationTaskTriggerCheckJob |
| trigger pick limit | 100 | RecommendationTaskTriggerCheckJob |
| trigger lease duration | 5분 | RecommendationTaskTriggerCheckJob |
| queue handler 주기 | 30초 | RecommendationTaskHandler |
| queue pick limit | 10 | RecommendationTaskHandler |
| queue 동시 처리 | 5 | RecommendationTaskHandler |
참고: legacy taskqueue 추상화
Section titled “참고: legacy taskqueue 추상화”패키지 dev.vingle.taskqueue (TaskHandler, TaskTopic, TaskQueuePoller, TaskQueuePollerRegistrar)는 여전히 코드베이스에 남아 있을 수 있다.
하지만 recommendation 처리 경로는 더 이상 이 추상화에 의존하지 않고, 위 문서의 명시적 스케줄러/핸들러 구조를 기준으로 이해하면 된다.