저번에 개념만 정리했던 worker thread pool을 실제 프로젝트에 구현해보겠다.
현재 코드는 아래와 같다.

사용자가 받은 전체 letter들의 단어를 모두 불러와 letter에 쓰인 단어 통계를 내는 기능이다.
성능 비교를 위해 해당 코드 놔두고 새로 worker사용하는 걸로 구현해보겠다.
코드 길어져서 service디렉터리 만들어주고 WordRank 서비스 클래스로 빼서 구현하였다.
(src/letters/service/letter.wordRankService.ts)
이전 개념 설명할 때 구현했던 worker pool은 새로 만들기만 하고 pool관리에 관한 내용이 빠져있어다.
이를 추가한 전체 코드 보며 하나씩 정리하겠다.
변수 선언 및 초기화

먼저 사용할 변수들이다.
poolCount : 현재 생성된 워커스레드의 수를 추적하는 변수
readyPool : 사용가능한(대기중인) 스레드를 저장하는 배열
workingPool : 현재 작업 중인 스레드를 저장하는 set(중복 방지, 작업 효율)
isReady : worker 스레드 사용 여부를 추적하는 이벤트 발생기
이벤트 발생기 (EventEmitter)
Nodejs 에서 이벤트를 발생시키고 처리하는 기능으로 비동기 작업의 상태를 추적할 때 사용된다.
이벤트 발생 : emit('evnetname', arg1, arg2, ..)
리스너 등록 : on('evnetname', () => {처리로직})
한 번만 실행되는 리스너 등록 : once('evnetname', () => {처리로직})
generateWordRankByPostBoxWorker() 메서드

컨트롤러에서 호출하는 메서드로 기본이 되는 메서드이다.
검색할 postboxid를 매개변수로 받고 dto 리스트를 반환해 준다.
getworker로 worker값을 전달하고
assignWorker로 result 값 전달한다.
getworker() 메서드

사용 가능한 워커스레드가 있을 경우(readyPool에 워커 있음) popWorker() 메서드 호출
사용가능한 워커스레드 없으면
next 이벤트를 감지하는 리스너 생성, 이벤트 감지되면 popWorker() 메서드 호출
전체 worker수가 10개 미만일 경우 새로운 worker를 만들어준다.
아직 아무 worker 없는 상태라 하면 이벤트 리스너 만들어져서 이벤트 기다리고 있고
코드는 아래 if 문으로 내려가 creatWorker를 호출, 여기서 next이벤트 발생
resolve에 popWorker로 스레드 전달. 이런 식으로 실행된다.
(Promise를 반환하기에 promise객체 resolve가 호출되어야 작업 완료)
createWorker() 메서드

workerPath를 통해 새로운 Worker객체 생성
exit이벤트를 감지하는 리스너 만들어주고 감지하면 에러를 출력한다.
readyPool에 worker를 저장하고 poolCOunt를 증가시킨다.
그리고 next 이벤트를 발생시킨다 -> 위의 getWorker()에서 만든 리스너가 이를 감지한다.
popWorker() 메서드

readyPool에 있는 worker를 추출하여 workingPool에 넣어준다.
이 과정을 거치면 generateWordRankByPostBoxWorker()에 worker가 들어온다.
그다음은 이렇게 가져온 worker와 추출한 letters을 매개변수로 assignWorker를 호출한다.
assignWorker() 메서드

worker에 작업을 할당하기 위해 postMessage로 처리할 정보인 letters를 보낸다.
반환값은 Promise((resolve, reject))로 두 결과를 받아야 반환한다.
next()는 worker가 작업 완료를 알리는 메서드로
완료된 worker를 readyPool에 넣어주고, workingPool에서 삭제한다.
그리고 next 이벤트를 발생시킨다.
이 Next 이벤트는 getWorker()의 리스너에 감지된다.
그리고 assignWorker는 message, error이벤트를 감지하는 리스너인
messageHadler, errorHandler를 생성한다.
각각의 핸들러는 promise에 들어갈 반환값을 넣어주는데
성공했을 경우는 messageHandler, 에러 발생하면 errorHandler가 실행된다.
이때 나머지 하나의 핸들러는 필요 없기에 removeListener로 삭제해준다.
worker코드

실제 로직을 동작하는 worker코드는 위와 같다.
현재 스레드가 main이면 에러를 출력하고 메인 스레드와 통신하는 parentPort를 확인한다.
parentPort.on으로 메인 스레드로부터 메세지를 수신하는 핸들러 등록한다
결과를 message 이베트에 담아 발생시켜 다시 메인스레드로 보낸다.

ranker메서드는 이전 로직 코드 가져와서 사용해 주면 된다.
응답속도 확인
그럼 과연 worker thread pool이 효과가 있을까?
worker thread pool 적용한 메서드랑 안 한 메서드 속도를 전에 구현한 logger 통해 확인해 보겠다.

각 메서드 5번씩 호출한 결과이다.
우선 맨 처음 thread pool을 만들 때 제외하고는 조금 빠른 느낌이긴 하다
병렬처리 하기엔 데이터 양도 적고 엄청 복잡한 로직도 아니어서 많은 효과가 없는 거 같다
nestjs Bull 사용
위에서는 pool작업을 직접 코드로 구현해주었는데
이 작업을 nestjs에서 Redis기반의 작업 큐인 bull을 사용하여 할 수 있다.
이에 대해선 다음에 따로 정리해 보도록 하겠다.
[Nestjs] Worker Thread와 Bull을 사용한 큐의 병렬 처리
어떤 리스트를 불러와 랭킹 알고리즘을 바탕으로 정렬을 할 때, 보통 이 정렬하는 로직은 동기적으로 이루어진 코드 비중이 높을 것이며 리스트의 길이가 길어질수록 cpu의 연산량도 많아진다.
velog.io
https://inpa.tistory.com/entry/NODE-%F0%9F%93%9A-workerthreads-%EB%AA%A8%EB%93%88
[NODE] 📚 Worker_Threads 모듈 (멀티 쓰레드 구현)
Worker Thread 이해하기 Node.js가 시작되면, 다음이 실행됩니다. 하나의 프로세스 : 어디서든 접근 가능한 전역 객체이자 그 순간 실행되고 있는 것들의 정보를 가지고 있는 프로세스 하나의 스레드 :
inpa.tistory.com
[Nestjs] Nestjs에서의 Worker Thread 사용
Nestjs 프로젝트에서 nest 커맨드를 이용해 프로젝트를 시작할 경우 항상 컴파일이 이루어지기 때문에 ts-node만을 사용할 때처럼 타입스크립트 워커 파일을 읽기 위한 js파일이나 allowJS 옵션에 대해
velog.io
'Nest' 카테고리의 다른 글
| [Nest] nest winston 로그 관리 (0) | 2024.04.20 |
|---|---|
| [Nest] Bull사용한 병렬처리 (0) | 2024.04.19 |
| [Nest] Nestjs logger와 middleware, intercepter (0) | 2024.04.16 |
| [Nest] Nestjs 멀티 스레드 (Worker) (0) | 2024.04.13 |
| [Nest] NestFactory 동작 (0) | 2024.03.30 |