밑바닥부터 시작하는 추천시스템 with Kubeflow

chrisjune
21 min readNov 10, 2023

--

백엔드 엔지니어로서 밑바닥부터 만든 추천시스템에 대한 경험을 나누고자 합니다.

많은 온라인 서비스에서 추천시스템은 중요한 요소가 되었습니다. 유튜브의 알고리즘, 넷플릭스의 영화 추천, 아마존의 상품 추천등은 이제는 너무나 익숙한 일상이 되었습니다.

저는 백엔드 엔지니어로서, 추천시스템을 구현하는 경험을 하게 되었습니다. 제 전문분야는 백엔드 서비스 아키텍쳐를 설계하고 개발하는 것이지만, 기술역량을 확장하기 위하여 추천 서비스를 개발에 도전하게 되었습니다.

이 글을 통해 바닥에서부터 추천시스템을 구축하기 위해 사용한 도구, 기술과 어려움을 소개하고 이를 해결한 방법에 대하여 소개합니다.

서비스 소개

제가 일하는 곳은 리셀 플랫폼입니다. 현재는 스니커즈 카테고리를 넘어 테크와 라이프로 카테고리를 확장하고 있습니다. 국내에 첫 리셀서비스를 오픈하였으며 현재 가장 높은 점유율을 차지하고 있습니다.

서비스에 지금까지는 없었던 함께 본 상품(View tegether구좌)최근 본 상품과 비슷한 상품(개인화 추천)구좌를 추가하였습니다.

프로젝트의 목적

이 프로젝트의 목적은 유저의 상품 탐색 경험을 높이는 것입니다. 기존까지 베스트 랭킹을 기준으로 상품을 추천해주었고 많은 구매전환을 일으켜왔습니다. 하지만, 상품카테고리가 확장되고 활성화 유저가 많아지면서 랭킹추천의 변화가 필요해졌습니다

주요 목표는 유저가 관심이 있을 만한 상품을 개인화 추천하고, 상품별 다양한 연관상품을 추천하는 것이었습니다. 이 프로젝트의 목적 구좌는 정해져있지만, 추후 상품이 노출되는 컬렉션의 대부분을 개인화된 데이터로 점진 대체할 예정입니다.

처음부터 완벽하고 성능좋은 추천시스템을 만드는 것은 프로젝트 목적이 아니었습니다. 인기순 상품 구좌를 상품 추천구좌로 전환하고 이를 점진적으로 고도화하기로 하였습니다.

사용한 기술스택

추천 서비스를 만들기 위하여 단순히 잘 짜여진 알고리즘과 모델보다 이를 제공하기 위한 기술과 인프라에 대한 고민을 많이 하게 되었습니다. 간략하게 사용한 기술스택과 이유에 대하여 설명드리겠습니다.

Kafka: 데이터 스트림 기반으로 실시간 데이터 스트림을 관리하고 처리하는데 중요한 역할을 합니다. 특히 하나의 유저 이벤트에 대하여 여러 컨슈머를 통해 각 서비스 프로바이더별로 데이터를 독립적으로 처리할 수 있습니다. 앱과 웹에서 유저 이벤트가 발생되면 이벤트 프록시를 거쳐 Kafka에 프로듀싱 되고, 이벤트 워커에서 멀티 컨슈밍을 통해 HBase, Amplitude 그리고 Braze에 데이터를 적재하고 있습니다.

HBase: NoSQL 데이터베이스인 HBase는 큰 데이터를 저장하고 검색하기에 적합하고, 빠른 읽기 쓰기 작업에도 이상적이었습니다. 유저 이벤트 데이터의 Raw데이터를 적재할 때 사용하였습니다.

Airflow: 데이터 워크플로우를 관리하고, 여러 작업을 예약, 모니터링 그리고 자동화 할 수 있습니다. 사내에서 제공중인 Airflow를 한번 더 추상화한 CQuery라는 서비스를 이용하여 HBase에 저장된 데이터를 전처리하여 Hadoop에 저장하는데 사용하였습니다.

Hadoop: MapReduce를 활용하여 대규모 데이터셋에서 데이터를 추출하는데 효율적입니다. Raw 데이터를 여러 형태로 가공된 데이터를 저장하고, 모델 학습시 하둡 볼륨을 Alluxio를 통해 직접 마운트하여 데이터셋으로 활용하였습니다.

Matrix Factorization: 행렬분해는 추천 알고리즘중 대중적으로 많이 사용합니다. 확장성과 속도면에서 우수한 편입니다. 유저와 상품간 잠재적인 패턴을 학습하고, 이를 활용하여 상품의 연관상품과 유저별 개인화 상품을 추천하는데 사용하였습니다.

Kubernetes & Kubeflow: 확장성있는 서비스배포는 Kubernetes와 Kubeflow를 활용하였습니다. Kubernetes는 컨테이너를 안정적으로 서빙할 수 있도록 해줍니다. Kubeflow는 머신러닝과 파이프라인등의 Kubernetes상에서 실행되는 프로그램들의 ML 프레임워크입니다. Kubeflow의 notebook으로 스몰데이터 모델 학습과 검증, katib을 활용하여 hyperparameter tuning, pipeline을 활용하여 모델 학습과 서빙을 하였습니다.

Locust: kubernetes에 여러 POD로 배포하여 실제 유저들의 부하와 유사하게 부하테스트를 할 수 있습니다. 서비스 오픈 직전 라이브와 동일한 스펙으로 deploy하여 부하테스트를 하는데 사용하였습니다.

FastAPI: Uvicorn을 활용하여 비동기 웹서비스 요청을 처리할 수 있습니다. fastapi는 개발이 쉽고, 학습된 모델을 서빙하는데 사용하고 있습니다.

지금부터 사용한 기술스택을 데이터 전처리, 알고리즘, 배포, 서빙으로 나누어 자세히 설명드리겠습니다.

데이터 수집과 처리

플랫폼에서 발생한 유저 이벤트 데이터를 수집하고 이를 가공하여 Hadoop에 저장하였습니다.

유저의 이벤트가 발생하면 클라이언트에서는 이벤트 프록시 서버를 호출하고 해당서버에서 Kafka에 이벤트를 producing합니다. 이벤트 워커 서버는 이벤트를 멀티 컨슈밍하여 Amplitude, Hbase, Braze에 저장합니다. Kafka를 활용한 이벤트 수집과 처리는 이미 동료인 Key🔑가 만들어주셔서 쉽게 사용할 수 있었습니다.

HBase에 저장된 데이터는 Airflow를 통한 스케줄링으로 10분마다 Hadoop에 전처리하여 저장하도록 하였습니다. 전처리 과정중 불필요한 데이터는 제거하고 시간변환작업을 진행합니다. 실제로 Hadoop ecosystem을 깊이 있게 이해하기보단, 대용량 데이터를 저장할 곳을 찾는것이 목적이었습니다.

데이터 저장소로 Hadoop이 아니라 DB를 먼저 고려하습니다. 처음엔 DB에 저장하고 이를 쿼리로 가공하여 저장하게 하였습니다. 하지만 속도와 확장성 측면에서 부담이 많았습니다. 이후에는 확장성을 고려하여 Kubernetes에 배포되는 Mysql인 vitess를 사용하였습니다. 하지만 한번에 처리할 수 있는 데이터 수의 제약이 있었고, 복잡한 조인쿼리가 불가능한 케이스가 있었습니다.

Airflow는 직접운용하지 않고, 사내에서 제공하는 구현서비스인 Cquery를 활용하여 간단한 쿼리를 통해 데이터를 전처리할 수 있었고, 무엇보다 아주 쉽게 스케줄링할 수 있었습니다. 자세한 내용은 링크를 보시면 좋습니다.

학습 알고리즘: Matrix Factorization

Matrix factorization은 대중적으로 많이 사용하는 추천 알고리즘으로서, 비교적 높은 정확도와 병렬학습이 가능한 장점이 있습니다. 학습데이터로부터 유저와 상품간 선형적 잠재관계를 파악하여 추천시스템에 사용할 수 있습니다.

해당 모델을 사용한 이유는, MF 알고리즘은 잘 구현된 오픈소스 라이브러리들이 많이 있고 사용하기 쉬웠습니다. 따라서 베이스라인모델로 사용하기에 적합하다고 판단하였습니다.

MF 모델을 학습하기 위한 다양한 최적화 알고리즘으로는 SVD, ALS, BPR, SGD등 다양한 선택지가 있습니다. 그 중, 빠른 학습을 제공하는 ALS와 BPR을 선택하였고, 처음엔 일반적으로 성능이 좋은 BPR로 선택하였습니다. 하지만 학습 데이터가 누적됨에 따라 최신성이 떨어진 데이터에 Decay가 필요하였고, binary score로 학습하는 bpr에서 ordinal score를 계산할 수 있는 ALS를 선택하였습니다.

Kubernetes 및 Kubeflow를 통한 확장 및 배포

Kubeflow는 머신러닝 워크플로우를 제공하여 줍니다. Python 언어를 활용해 도커라이징된 컴포넌트를 실행할 수 있습니다. 이러한 컴포넌트들을 엮은 것이 파이프라인입니다. 그럼 파이프라인의 핵심인 컴포넌트를 만드는 예시를 공유하도록 하겠습니다.

워크플로우 구성요소

1. 실행할 코드가 담겨있는 도커컨테이너를 만들어야 합니다. 예를들어 데이터 학습과 서빙 두개의 컴포넌트를 만들어야 한다면, 각각의 도커컨테이너가 필요합니다. 아래 예시는 cuda 이미지에 python과 서드파티 라이브러리들을 설치하고 코드를 복사하는 예시입니다.

FROM nvidia/cuda:11.8.0-runtime-ubuntu18.04

# Install python
RUN apt-get install -y python3.9 python3-pip

# Copy source code
WORKDIR /app

COPY requirements.txt .
RUN pip3 install --upgrade pip
RUN pip3 install --no-cache-dir -r requirements.txt

COPY ./src /app/src
ENV PYTHONPATH /app/src

2. 컴포넌트의 배포를 위한 메니페스트를 yaml로 정의합니다.
1번에서 만든 컨테이너를 실행할 커멘드를 yaml파일로 정의해줍니다. 그리고 yaml파일을 “kfp.components.load_component_from_text” 명령어로 컴포넌트화 해줍니다. 실행할 때 사용할 PVC도 이때 마운트를 할 수 있습니다. 아래 예시는 실제 파이프라인 코드이며, “kfp.dsl.VolumeOp” 명령어로 볼륨을 생성하고 컴포넌트에 마운트하여 실행시키는 예시입니다.


import kfp
from kfp import dsl
from kfp.onprem import use_k8s_secret
from kubernetes import client as k8s_client

@dsl.pipeline(name="Pipeline", description="Train an model"):
recommend_train_yaml = """
name: Recommend Trainer
description: recommend train

inputs:
- {name: Dataset Path, type: String, default: '', description: 'Dataset Path'}
- {name: Preprocessed Dataset Path, type: String, default: '', description: 'Dataset Path'}
- {name: Model Inference Path, type: String, default: '', description: 'Model Inference Path'}
outputs:
- {name: Model Output Path, type: String, description: 'Model path'}

implementation:
container:
image: dockerhub.com/chrisjune/model-trainer:latest
command: [
python3, /app/src/model_trainer.py,
--dataset_path, {inputValue: Dataset Path},
--model_inference_path, {inputValue: Model Inference Path},
--model_output_path, {outputPath: Model Output Path},
]
"""
data_volume = kfp.dsl.VolumeOp(
name="Dataest",
resource_name="resource_name",
size="5Gi",
storage_class="alluxio",
modes=["ReadOnlyMany"],
annotations={
"hdfs/path": "/{hdfs_dataset_path}",
},
).add_pod_annotation(name="pipelines.kubeflow.org/max_cache_staleness", value="P0D")

recsys_train_op = kfp.components.load_component_from_text(recommend_train_yaml)
recsys_train_task = (
recsys_train_op(
dataset_path=recsys_dataset_path,
model_inference_path=model_inference_path,
)
.apply(use_k8s_secret(secret_name="k8s_secret"))
.add_pod_annotation("sidecar.istio.io/inject", "false")
.add_volume(
k8s_client.V1Volume(
name="volume_name",
persistent_volume_claim=k8s_client.V1PersistentVolumeClaimVolumeSource(
claim_name=data_volume.outputs["name"]
),
)
)
.add_volume_mount(
k8s_client.V1VolumeMount(
mount_path=recsys_dataset_path, name="dataset"
)
)
.set_cpu_limit("8")
.set_cpu_request("8")
.set_memory_limit("100Gi")
.set_memory_request("100Gi")
.set_gpu_limit(1)
)

위의 예시를 활용하면 데이터셋 로드와 이를 학습하거나 가공하는 파이프라인을 만들 수 있으며, 아래에는 실제 학습파이프라인과 데이터 전처리 파이프라인입니다.

실제 학습 파이프라인

FastAPI를 활용한 API 개발

FastAPI는 Python 기반으로 작성된 비동기 웹프레임워크입니다. 따라서 고성능 API를 개발하기에 적합합니다.

Pydantic을 활용하여 request 데이터의 유효성 검증을 자동처리하고, API Document를 자동으로 생성해줍니다. 무엇보다 비동기를 지원하기 때문에 Django 3.x나 Flask보다 성능이 월등하였습니다.

처음에는 BentoML로 서빙하려고 하였지만, container빌드시 Bentoml이 미리 설치되어있어야 하고 무엇보다 디버깅이 어려운 단점이 있습니다. Bentoml은 두개의 프로세스로 되어있어 하나는 Http 리퀘스트 처리하고 다른 하나는 추론을 해줍니다. 이 추론 단계에서 에러가 발생할 때 디버깅하기 어려운 경우가 있습니다.

따라서, 무난하게 사용하기 좋은 FastAPI를 선택하였습니다.

from fastapi import FastAPI
from pydantic import BaseModel

### 모델 로드 #################################
model = None
async def init_model():
global model
with open(MODEL_PATH, "rb") as f:
model = pickle.load(f)

app = FastAPI()
@app.on_event("startup")
async def load_model_and_data():
await init_model()
#############################################


### API Endpoint ############################
class RecommendByUserId(BaseModel):
user_id: int
top_k: Optional[int] = TOP_K

async def get_recommendations_for_user(user_id, top_k):
pass

@app.get("/recommend/{user_id}")
async def get_recommendations(request: RecommendByUserId):
recommendations = await get_recommendations_for_user(request.user_id, request.top_k)
return {"recommendations": recommendations}
#############################################

성능테스트 및 하이퍼파라미터 튜닝

하이퍼파라미터 튜닝

평가지표로는 NDCG와 MAP 값으로 파라미터 튜닝을 진행하였습니다.

하이퍼 파라미터 튜닝은 Kubeflow에서 제공하는 Katib(AutoML)기능을 사용하였습니다. 해당 기능을 사용하기 위해서 하이퍼 파라라미터를 POD 기동시 외부변수로 받을 수 있도록 수정해주고 학습파이프라인을 만들어주고 최적의 파라미터를 선택해줍니다.

처음에는 튜닝을 위해 iteration을 그래프로 그리고 최적점을 찾아보려고했으나 고려해야할 메트릭이 두개 이상이다보니 단순 플로팅을 통해 찾는 것이 어려웠습니다.

1차 시도

단순한 하나의 파라미터를 플로팅

2차 시도

여러 파라미터를 하나로 플로팅

3차 시도

여러 파라미터를 변경하여 3개의 지표를 하나로 플로팅

마지막 Katib을 활용한 hyper parameter 튜닝.

성능테스트

성능테스트는 Locust를 사용하였습니다. 많이 사용하는 k6, jmeter는 로컬머신의 한계 이상으로 서버호출이 어렵기 때문에 여러개의 POD를 띄워 용이한 locust를 선택하였습니다.

cpu 80%이상이 될 경우 auto scaling한 설정이 잘 동작하는 걸 볼 수 있습니다.

모니터링 및 성능개선

모니터링

모니터링은 Grafana를 통하여 CPU 사용률, 메모리, Request, TPS, Throttling, Networking 정보를 확인할 수 있습니다.

성능개선

오프라인 추론을 최대한 활용하여 응답속도를 10배이상 빠르게 개선하였습니다.

기존에 API의 응답속도도 50ms 이하로 느리진 않았지만, 백엔드 서비스를 거쳐 상품정보를 DB에서 조회하여 응답이 되기 때문에 응답속도가 예상보다 더 빨랐었야 했습니다.

처음 서빙 API를 만들때 model에서 추론하여 응답하였지만, 예상 외로 타임아웃 현상이 너무 많이 발생하였습니다. 어떻게 개선할지 고민하다가 “차루 아가르왈”이 얘기했던 가능한 오프라인 연산을 미리하여 온라인 서빙에서는 최대한의 성능을 제공하라는 것을 떠올렸습니다. 따라서, 모델 학습시 상품별 추론결과를 미리하고 dictionary형태로 만들고 pickling하여 저장하게 하였습니다. 그랬더니 당연히 성능은 10배 이상 좋아졌습니다.

배운 교훈과 인사이트

하다보면 어떻게든 된다

처음 프로덕션 레벨의 추천시스템을 만들며 가졌던 여러 두려움들이 있었습니다. 제대로된 결과를 만들 수 있을지, 잘 모르는데 동료들이 우습게 생각하면 어떡하지 등등

하지만 혹자가 했던 “어려운 것이 아니라 익숙하지 않은 것 뿐이다”라는 말처럼 풀어야할 문제가 있고, 여러 방향을 찾다보면 정답이 아닐 지라도 어느정도 괜찮은 길을 찾게 되는 것 같습니다.

ML서비스는 백엔드 서비스처럼 깔끔한 아키텍쳐로 만드는 것이 어려웠습니다.

구글에서 발표한 머신러닝 시스템의 숨겨진 기술부채(Hidden Technical Debt in Machine Learning Systems) 라는 논문에서 아래와 같은 다이어그램으로 구성요소와 아키텍쳐를 보여줍니다.

https://proceedings.neurips.cc/paper_files/paper/2015/file/86df7dcfd896fcaf2674f757a2463eba-Paper.pdf

머신러닝 시스템의 어려움은 대부분 코드보단 시스템 레벨에 존재합니다. ML핵심 코드는 정말 일부분이라고합니다. 따라서 외부 파이프라인의 의존도가 높기 때문에 코드만 격리하여 따로 추상화하기 어려웠습니다. 그리고 순차처리로 인하여 코드들이 서로 tightly-coupled 되어있습니다. 따라서 하나의 코드, 하나의 하이퍼 파라미터만 바뀌면 결과가 완전히 달라지게 됩니다.

백엔드 서비스에선 의존성 주입과 단일 책임 원칙등을 고려하여 이쁜 코드개발과 테스트코드로 빠른 주기의 테스트가 가능했지만, ML서비스에서는 테스트를 돌리기 위해서는 작은 데이터셋으로 처음부터 끝까지 코드의 유기적인 Full test가 필요하기 때문에 그로 인한 어려움이 있었습니다.

처음 시작하는 ML 서비스를 준비하는 개발자를 위한 Tips

해결하려는 문제가 꼭 ML, DL 프레임워크를 써야하는지 고민해보면 좋을 것 같습니다. 예를 들어 상품추천 서비스를 만들기 위해선 꼭 딥러닝 프레임워크를 써야할 필요는 없습니다. 상품 SKU가 적은 경우엔 데이터베이스 쿼리로도 충분히 추천서비스를 만들 수 있습니다.

user_id, item_id, 이벤트 수 count 컬럼이 있는 event 테이블이 있다고 가정

-- 이벤트 데이터를 normalize
create table normalized_event
select user_id, item_id, w / sqrt(sum(w * w) over partition by user_id) w
from (select user_id, item_id, ln(count) + 1 w from event) t;

-- Item based cosine 유사도를 계산한 결과를 테이블에 저장
create table event_cosine_sim as
select a_item, b_item, sim
from (select a_item, b_item, sim, row_number() over (partition by a_item order by sim desc) rownum
from (select a.item_id a_item, b.item_id b_item, sum(a.w * b.w) sim
from normalized_event a
inner join normalized_event b on a.user_id = b.user_id and a.item_id != b.item_id
group by a.item_id, b.item_id) t) t2
where rownum < 10;

-- 연관상품 추천
select b_item "추천된 상품"
from event_cosine_sim
where a_item = "{상품번호}"
order by sim desc;

결론

  • 지금까지 데이터 파이프라인과 학습파이프라인에 대하여 소개해드렸고, 배포 전 부하테스트 그리고 배포 후 모니터링과 성능 개선에 대하여 설명드렸습니다.
  • 틀린 부분이나 개선이 필요한 부분이 있으면 가감없이 조언부탁드립니다. 긴글 읽어 주셔서 감사합니다.

참고 자료와 자원

--

--