03-29 08:50
Notice
Recent Posts
Recent Comments
관리 메뉴

Scientific Computing & Data Science

[Scientific Computing / Image Processing] R에서 병렬처리 하기 본문

Scientific Computing/Image Processing

[Scientific Computing / Image Processing] R에서 병렬처리 하기

cinema4dr12 2017. 1. 15. 20:38

이번 글에서는 R에서 병렬처리를 하는 방법에 대하여 소개하도록 한다.

R-bloggers의 글, How-to go parallel in R - basics + tips의 내용을 최대한 참고하여 정리해 보았다.


[목차]

1. lapply에 대하여

2. parallel 패키지

3. foreach 패키지

4. 디버깅

5. 캐싱(Caching)

6. 계산 부하 밸런싱

7. 이미지 프로세싱 예제


1. lapply에 대하여

R이 기본적으로 제공하는 함수들 중 가장 많이 사용되는 함수가 lapply일 것이다.

이와 유사한 함수로 apply, sapply, tapply 등이 있는데 각자의 쓰임새가 있으며, 자신이 R유저라고 자신한다면 이 함수들이 어느 상황에 적절히 쓰이는지 이해하고 자유자재로 다룰 수 있어야 한다고 생각한다.


다음 코드를 실행해 보자:

> base::lapply(X = 1:3, FUN = function(x) c(x, x^2, x^3))
[[1]]
[1] 1 1 1

[[2]]
[1] 2 4 8

[[3]]
[1]  3  9 27

주어지는 X의 요소를 순차적으로 불러내 1제곱, 2제곱, 3제곱한 계산 결과를 보여준다.


또한 함수의 파라미터 값을 설정할 수 있다:

> base::lapply(X = (1:3)/3, FUN = round, digits = 3)
[[1]]
[1] 0.333

[[2]]
[1] 0.667

[[3]]
[1] 1

R의 빌트인(Built-in) 함수인 round의 소수점 이하 3자리에서 반올림한 결과를 보여준다.

주목할 점은, 각 계산은 독립적으로 이루어지는데, 즉, 첫번째 연산, 두번째 연산, 세번째 연산이 서로 계산 결과에 영향을 미치지 않는다는 것이다.

참고로 lapply의 Parallel Version으로 mcapply("mc"는 Multi-Core를 의미)가 있었으나, 이를 제공하던 multicore 패키지는 CRAN에서 삭제되고 이후 섹션에서 살펴볼 parallel 패키지에 흡수되었다.


2. parallel 패키지

parallel는 섹션 1에 언급된 계산을 병렬로 처리할 수 있도록 하는 R 패키지이다.


parallel 패키지를 이용하여 병렬로 계산하는데에는 기본적인 라이프 사이클(Life Cycle)이 있다.

(1) parallel 패키지를 로드한다.

(2) CPU 코어 개수를 획득한다.

(3) 획득된 CPU 코어개수만큼 클러스터를 등록한다.

(4) 병렬 연산을 수행한다.
(5) 클러스터를 중지한다.


# load packages
if(!require(parallel)) {
  install.packages("parallel")
}
library(parallel)

# 코어 개수 획득
numCores <- parallel::detectCores() - 1

# 클러스터 초기화
myCluster <- parallel::makeCluster(numCores)

# CPU 병렬처리
parallel::parLapply(cl = myCluster, X = 2:4, fun = function(x) {2^x})

# 클러스터 중지
parallel::stopCluster(myCluster)

위의 코드를 실행하면 다음과 같이 결과가 출력된다.

[[1]]
[1] 4

[[2]]
[1] 8

[[3]]
[1] 16

변수 스코프(Scope)

변수를 일종의 전역변수처럼 선언하여 parApply() 함수에서 사용할 수 있다고 생각할지 모르지만 CPU 병렬처리에 있어서 변수를 미리 등록하지 않으면 parApply() 함수 밖에서 선언하였다고 하더라도 사용할 수 없다.

가령, 다음 코드와 같이 전역변수처럼 base를 선언했다고 가정한다.


# load packages
if(!require(parallel)) {
  install.packages("parallel")
}
library(parallel)

# 코어 개수 획득
numCores <- parallel::detectCores() - 1

# 클러스터 초기화
myCluster <- parallel::makeCluster(numCores)

# 변수 등록
base <- 2

# CPU 병렬처리
parallel::parLapply(cl = myCluster,
                    X = 2:4,
                    fun = function(x) {
                      base^x
                    })

# 클러스터 중지
parallel::stopCluster(myCluster)


이 코드를 실행하면 다음의 에러 메시지가 나올 것이다.


Error in checkForRemoteErrors(val) : 
  3 nodes produced errors; first error: 객체 'base'를 찾을 수 없습니다


병렬 코드에서는 모든 CPU 코어가 변수를 인식할 수 있도록 다음과 같이 변수를 등록한다:


# 변수 등록
base <- 2
parallel::clusterExport(myCluster, "base")


clusterExport() 함수에서 변수 이름은 반드시 따옴표 안에 정의해야 함에 유념하도록 한다.

또한 위의 두 줄이 서로 순서가 바뀌어도 안 된다.

parSapply

parSapply() 함수는 Sapply() 함수의 병렬계산 버전이며, Vector/Matrix 형태로 처리가 가능하다.

다음 코드를 살펴보도록 하자:


# load packages
if(!require(parallel)) {
  install.packages("parallel")
}
library(parallel)

# 코어 개수 획득
numCores <- parallel::detectCores() - 1

# 클러스터 초기화
myCluster <- parallel::makeCluster(numCores)

# 변수 등록
base <- 2
parallel::clusterExport(myCluster, "base")

# CPU 병렬처리
parSapply(myCluster, as.character(2:4), 
          function(exponent){
            x <- as.numeric(exponent)
            c(base = base^x, self = x^x)
          })

# 클러스터 중지
parallel::stopCluster(myCluster)

c(base = base^x, self = x^x) 처럼 계산결과를 2-by-3 Matrix 형태로 리턴할 수 있다.

실행 결과는 다음과 같다:


     2  3   4
base 4  8  16
self 4 27 256


3. foreach 패키지

foreach 패키지는 루프(Loop)와 lapply() 함수를 융합한 것으로 R의 병렬처리에 있어 매우 인기가 많다.

foreach() 함수의 .combine 속성은 결과를 어떻게 결합시킬 것인가를 정의한다.

가령, Vector 형태의 결과를 출력하려면


# load packages
if(!require(foreach)) {
  install.packages("foreach")
}
library(foreach)

if(!require(doParallel)) {
  install.packages("doParallel")
}
library(doParallel)

# 코어 개수 획득
numCores <- parallel::detectCores() - 1

# 클러스터 초기화
myCluster <- parallel::makeCluster(numCores)
doParallel::registerDoParallel(myCluster)

# 변수 등록
base <- 2
parallel::clusterExport(myCluster, "base")

# CPU 병렬처리
foreach::foreach(exponent = 2:4, .combine = c)  %dopar% {
  base^exponent
}

# 클러스터 중지
parallel::stopCluster(myCluster)

와 같이 .combine 속성을 c로 한다. 실행결과는 다음과 같다:


[1]  4  8 16


만약 Matrix 형태의 출력 결과를 원한다면 .combine 속성을 rbind로 한다.


# CPU 병렬처리
foreach::foreach(exponent = 2:4, .combine = rbind)  %dopar% {
  base^exponent
}


실행한 출력결과는


         [,1]
result.1    4
result.2    8
result.3   16


의 형태이며, .combine 속성을 cbind로 하는 경우


     result.1 result.2 result.3
[1,]        4        8       16


로 출력된다.

List로 출력을 원한다면 .combine 속성을 list로 한다.


# CPU 병렬처리
foreach::foreach(exponent = 2:4,
                 .combine = list,
                 .multicombine = TRUE) %dopar% {
                   base^exponent
                 }


이에 대한 실행 결과는 다음과 같다:


[[1]]
[1] 4

[[2]]
[1] 8

[[3]]
[1] 16


위의 코드에서 정의된 속성 중 .multicombine은 Nested List에 대한 옵션이며, 정의하지 않으면 기본값으로 FALSE로 지정된다.

이를 FALSE로 지정하여 실행해 보면 다음과 같이 다중 Nested List 형태로 결과가 출력됨을 확인할 수 있다.


[[1]]
[[1]][[1]]
[1] 4

[[1]][[2]]
[1] 8

[[2]]
[1] 16

변수 스코프(Scope)

foreach() 함수는 parallel 패키지가 제공하는 함수와는 달리 foreach() 외부에 정의된 변수들을 인정한다는 것이다.

다음 코드를 살펴 보도록 한다.


# load packages
if(!require(foreach)) {
  install.packages("foreach")
}
library(foreach)

if(!require(doParallel)) {
  install.packages("doParallel")
}
library(doParallel)

# 코어 개수 획득
numCores <- parallel::detectCores() - 1

# 클러스터 초기화
myCluster <- parallel::makeCluster(numCores)
doParallel::registerDoParallel(myCluster)

# 변수 등록
base <- 2

# CPU 병렬처리
foreach::foreach(exponent = 2:4,
                 .combine = c)  %dopar% {
                   base^exponent
                 }

# 클러스터 중지
parallel::stopCluster(myCluster)


실행결과는 다음과 같고,


[1]  4  8 16


base라는 변수에 값을 할당 후 별다른 등록 과정을 하지 않았음에도 foreach() 함수가 이 값을 이용하여 계산을 하였음을 알 수 있다.

하지만, foreach() 함수를 별도의 외부 함수로 정의할 경우 오류가 난다. 다음 코드를 살펴보자.


# CPU 병렬처리
test <- function (exponent) {
  foreach::foreach(exponent = 2:4,
                   .combine = c)  %dopar% {
                     base^exponent
  }
}

test()

이에 대한 실행결과는


Error in { : task 1 failed - "객체 'base'를 찾을 수 없습니다"


와 같이 객체 base를 찾을 수 없다는 에러 메시지가 출력된다.

이러한 외부변수를 사용하는데 있어 불편함을 해소하기 위하여 foreach() 함수는 다음과 같은 옵션을 제공한다.


# CPU 병렬처리
test <- function (exponent) {
  foreach::foreach(exponent = 2:4, 
                   .combine = c,
                   .export = "base") %dopar% {
                      base^exponent
                     }
}

test()

메모리 다루기

다중 컴퓨터를 사용하거나 Windows를 사용하거나 Windows Machine을 사용하여 코드를 다른 사람과 공유할 계획이 없다면 parallel 패키지의 makeCluter() 함수의 type 옵션으로 FORK를 사용할 것을 권장한다. 이 옵션은 동일한 메모리 주소로 연결되도록 한다.

makeCluter() 함수의 type 옵션의 기본값은 PSOCK인데 이를 사용하는 경우, PSOCK으로 내보내진 변수에 대한 메모리 주소가 동일하지 않음을 알 수 있다. 다음 코드를 살펴보도록 한다:


# load packages if(!require(parallel)) { install.packages("parallel") } library(parallel) if(!require(pryr)) { install.packages("pryr"); } library(pryr) # Used for memory analyses # 코어 개수 획득 numCores <- parallel::detectCores() - 1 # 클러스터 초기화 myCluster <- parallel::makeCluster(numCores, type = "PSOCK") # 변수 등록 base <- 2 parallel::clusterExport(myCluster, "base") # 클러스터 평가 parallel::clusterEvalQ(myCluster, library(pryr)) # CPU 병렬처리 parallel::parSapply(myCluster, X = 1:10, function(x) {pryr::address(a)}) == pryr::address(a)

# 클러스터 중지 parallel::stopCluster(myCluster)

pryr 패키지는 메모리 분석을 위해 로딩하였다.

실행 결과를 출력해 보면, 다음과 같이 메모리 주소가 상이함을 알 수 있다.


[1] FALSE FALSE FALSE FALSE FALSE FALSE FALSE FALSE FALSE FALSE


type을 FORK로 지정하면 FORK 클러스터로 실행되며


# 클러스터 초기화
myCluster <- parallel::makeCluster(numCores, type = "FORK")

결과는


[1] TRUE TRUE TRUE TRUE TRUE TRUE TRUE TRUE TRUE TRUE


와 같이 동일한 메모리 주소가 링크되어 있음을 알 수 있다. 그러나, Windows에서는 FORK 클러스터는 지원되지 않으며,

Windows에서 실행한 경우 다음과 같은 에러 메시지가 출력된다.


> myCluster <- parallel::makeCluster(numCores, type = "FORK")
Error in makeForkCluster(spec, ...) : 
  포크 클러스터들은 윈도우즈에서 지원되지 않습니다


만약 변수를 처리한 값이 계속 메모리에 저장되도록 하려면 어떻게 해야 하는지 살펴보도록 하자.

다음 코드를 실행해 보도록 한다.


# load packages if(!require(parallel)) { install.packages("parallel") } library(parallel) # 코어 개수 획득 numCores <- parallel::detectCores() - 1 # 클러스터 초기화 myCluster <- parallel::makeCluster(numCores, type = "PSOCK") # 변수 등록 b <- 0 parallel::clusterExport(myCluster, "b") # CPU 병렬처리 parallel::parSapply(myCluster, X = 1:10, function(x) {b <- b + 1; b}) # 클러스터 중지 parallel::stopCluster(myCluster)

변수 b를 이 값에 1을 더하는 연산을 수행하였는데 결과는 다음과 같이 출력된다.


[1] 1 1 1 1 1 1 1 1 1 1


만약 다음과 같이 코드를 수정하면


parallel::parSapply(myCluster, X = 1:10, function(x) {b <<- b + 1; b})

다음과 같은 결과를 얻는다.


[1] 1 2 3 1 2 3 4 1 2 3


위의 코드를 반복적으로 실행하면


> parallel::parSapply(myCluster, X = 1:10, function(x) {b <<- b + 1; b})
 [1] 1 2 3 1 2 3 4 1 2 3
> parallel::parSapply(myCluster, X = 1:10, function(x) {b <<- b + 1; b})
 [1] 4 5 6 5 6 7 8 4 5 6
> parallel::parSapply(myCluster, X = 1:10, function(x) {b <<- b + 1; b})
 [1]  7  8  9  9 10 11 12  7  8  9
> parallel::parSapply(myCluster, X = 1:10, function(x) {b <<- b + 1; b})
 [1] 10 11 12 13 14 15 16 10 11 12
> parallel::parSapply(myCluster, X = 1:10, function(x) {b <<- b + 1; b})
 [1] 13 14 15 17 18 19 20 13 14 15


위의 출력 결과와 같이 변수 b가 정적 변수와 같이 계속 업데이트 됨을 알 수 있다.


4. 디버깅

병렬처리에 있어 디버깅은 쉽지 않은 작업이다. 보통 R에서 값을 확인하기 위해 사용하는 print(), cat() 등의 함수가 동작하지 않기 때문이다.

tryCatch - list 방법

하나의 클러스터가 Fail이 났다고 해서 전체 연산을 중지시키는 것은 좋은 아이디어가 아니다.

따라서, try를 이용하여 에러를 catch하고 에러의 발생원인을 설명하는 텍스트를 반환하도록 한다.


# load packages
if(!require(parallel)) {
  install.packages("parallel")
}
library(parallel)

if(!require(foreach)) {
  install.packages("foreach")
}
library(foreach)

# 코어 개수 획득
numCores <- parallel::detectCores() - 1

# 클러스터 초기화
myCluster <- parallel::makeCluster(numCores, type = "PSOCK")

# CPU 병렬처리
foreach(x=list(1, 2, "a"))  %dopar% {
  tryCatch({
    c(1/x, x, 2^x)
  }, error = function(e) {
      return(paste0("The variable '", x, "'", " caused the error: '", e, "'"))
    })
}

# 클러스터 중지
parallel::stopCluster(myCluster)

위의 코드에서 tryCatch() 함수를 이용하여 에러의 원인을 출력하도록 한다.

실행한 결과, 다음과 같이 에러 메시지가 출력되었다.


[[1]]
[1] 1 1 2

[[2]]
[1] 0.5 2.0 4.0

[[3]]
[1] "The variable 'a' caused the error: 'Error in 1/x: 이항연산자에 수치가 아닌 인수입니다\n'"

공통 출력 파일 생성하기

각 Worker에 대한 콘솔을 설정할 수 없지만 출력 결과를 쓸 수 있는 공유 파일을 만들 수 있다.

다음 코드를 살펴보도록 한다.


# load packages
if(!require(parallel)) {
  install.packages("parallel")
}
library(parallel)

if(!require(foreach)) {
  install.packages("foreach")
}
library(foreach)

if(!require(doParallel)) {
  install.packages("doParallel")
}
library(doParallel)

# 코어 개수 획득
numCores <- parallel::detectCores() - 1

# 클러스터 초기화
myCluster <- parallel::makeCluster(numCores, outfile = "./debug.txt")
doParallel::registerDoParallel(myCluster)

# CPU 병렬처리
foreach(x=list(1, 2, "a"))  %dopar%  
{
  print(x)
}

# 클러스터 중지
parallel::stopCluster(myCluster)

코드를 실행하면 Working Directory 내에 debug.txt 파일이 생성된다.


[debug.txt]

starting worker pid=10740 on localhost:11360 at 14:05:43.092
starting worker pid=11152 on localhost:11360 at 14:05:43.269
starting worker pid=12120 on localhost:11360 at 14:05:43.505
[1] 2
[1] 1
[1] "a"

노드 별 디버깅 파일 생성하기

공통의 디버깅 파일을 생성하는 것보다 노드 별로 디버깅 파일을 생성하는 것이 좀 더 나을 것이다.

다음과 같이 cat() 함수를 이용하여 노드 별로 디버깅 파일을 생성할 수 있다.


# load packages
if(!require(parallel)) {
  install.packages("parallel")
}
library(parallel)

if(!require(foreach)) {
  install.packages("foreach")
}
library(foreach)

if(!require(doParallel)) {
  install.packages("doParallel")
}
library(doParallel)

# 코어 개수 획득
numCores <- parallel::detectCores() - 1

# 클러스터 초기화
myCluster <- parallel::makeCluster(numCores, outfile = "./debug.txt")
doParallel::registerDoParallel(myCluster)

# CPU 병렬처리
foreach::foreach(x=list(1, 2, "a")) %dopar% {
  base::cat(base::dput(x), file = paste0("debug_file_", x, ".txt"))
}

# 클러스터 중지
parallel::stopCluster(myCluster)


이 코드를 실행하면 입력된 변수만큼 debug 파일이 생성된다:


debug_file_1.txt
debug_file2.txt
debug_file3.txt


각 파일에는 각 노드에서 계산된 결과가 저장되어 있다.


5. 캐싱(Caching)

대규모 연산을 처리할 경우 캐싱을 활용할 것을 적극 권장한다. 캐싱을 할 이유가 많이 있지만 특히 시간 절약을 할 수 있다는 이유가 가장 크다.

digest 패키지를 통해 캐싱할 수 있는 코드를 작성할 수 있다.


# load packages
if(!require(parallel)) {
  install.packages("parallel")
}
library(parallel)

if(!require(foreach)) {
  install.packages("foreach")
}
library(foreach)

if(!require(doParallel)) {
  install.packages("doParallel")
}
library(doParallel)

if(!require(digest)) {
  install.packages("digest")
}

# 코어 개수 획득
numCores <- parallel::detectCores() - 1

# 클러스터 초기화
myCluster <- parallel::makeCluster(numCores)
doParallel::registerDoParallel(myCluster)

# CPU 병렬처리 정의
cacheParallel <- function(){
  vars <- 1:2
  tmp <- parallel::clusterEvalQ(myCluster, library(digest))
  
  parallel::parSapply(myCluster, vars, function(var){
    fn <- function(a) {a^2}
    dg <- digest(list(fn, var))
    cache_fn <- base::sprintf("Cache_%s.Rdata", dg)
    
    if (base::file.exists(cache_fn)){
      base::load(cache_fn)
    }else{
      var <- fn(var); 
      base::Sys.sleep(5)
      base::save(var, file = cache_fn)
    }
    return(var)
  })
}

# cacheParallel 함수 처음 실행
base::system.time(out <- cacheParallel())
base::cat(out)

# caching 된 cacheParallel 함수 실행
base::system.time(out <- cacheParallel())
base::cat(out)

# To clean up the files just do:
base::file.remove(base::list.files(pattern = "Cache.+\\.Rdata"))

# 클러스터 중지
parallel::stopCluster(myCluster)

cacheParallel() 함수를 처음 실행하였을 때에는 5초 간 정지하였다가 File을 캐싱한다.

그 다음 실행하였을 때에는 캐싱된 파일을 불러와서 실행한다.

이렇게해서 연산의 시간을 단축할 수 있는데 실행된 결과를 살펴보면 다음과 같다.


> # cacheParallel 함수 처음 실행
> base::system.time(out <- cacheParallel())
 사용자  시스템 elapsed 
   0.00    0.00    5.02 
> base::cat(out)


5.02초가 걸렸는데 실제로는 5초를 제외하면 0.02초가 걸린 것이지만 캐싱을 설명하기 위해 5초 간 휴지 시간을 넣은 것이다.

캐싱된 이후 한번더 실행하면


> base::system.time(out <- cacheParallel())
 사용자  시스템 elapsed 
      0       0       0 
> base::cat(out)
1 4


순식간에 결과가 출력된다.

계산이 단순하여 큰 효과를 보기는 어려우나 대규모 데이터를 처리한다면 확실한 효과를 경험할 수 있을 것이다.


6. 계산 부하 밸런싱

코어 간 유사하게 계산을 밸런싱하고 메모리 자원 확보를 위해 서로 경쟁하지 않도록 하는 것은 성공적인 병렬화를 위한 핵심이다.

작업 부하(Work Load)

parLapply()foreach()는  모두 랩핑 함수(Wrapper Function)이다. 즉, 이 함수들이 실제 병렬계산을 직접 관여하고 있지는 않으며 다만 병렬처리를 하는 다른 함수에 의존하고 있다는 의미이다.

parLapply() 함수는 다음과 같이 정의되어 있다:


parLapply <- function (cl = NULL, X, fun, ...) 
{
    cl <- defaultCluster(cl)
    do.call(c, clusterApply(cl, x = splitList(X, length(cl)), fun = lapply, fun, ...), quote = TRUE)
}


splitList(X, length(cl))는 작업을 동등하게 분할하여 작업자에게 전달한다. 많은 부분들이 캐싱되어 있거나 작업 간에 연산량의 차이가 클 경우 다른 작업자들은 놀고 단 하나의 작업자만 일하는 사태가 발생할 수 있다. 이 사태를 방지하려면 캐싱할 때 모든 작업을 균등하게 분할하도록 해야 한다.

가령, 신경망에서 최적의 뉴런 개수를 찾으려고 한다면 다음과 같이 코드를 수정할 수 있다.


# From the nnet example
parLapply(myCluster, c(10, 20, 30, 40, 50), function(neurons) {
	nnet(ir[samp,], targets[samp,], size = neurons)
})


을 다음과 같이 수정한다:


# From the nnet example
parLapply(myCluster, c(10, 50, 30, 40, 20), function(neurons) {
	nnet(ir[samp,], targets[samp,], size = neurons)
})

메모리 부하(Memory Load)

대규모 데이터셋(Dataset)을 병렬로 처리하는 것은 꽤나 골치아픈 일이다. 메모리가 부족해지면 시스템이 크래쉬나거나 성능이 매우 저하될 수 있다. 일반적으로 전자는 Linux에서 자주 발생하고 후자는 Windows에서 더 자주 발생한다. 따라서 이를 방지하려면 메모리가 한계에 이르지 않도록 모니터링하는 것이 필요하다.

FORK를 사용하는 것은 메모리 상한을 다루는 데 있어 중요한 도구이다. 메모리 상한이 원래의 변수 주소에 연결될 때 FORK는 변수를 내보내는데 대한 별도의 시간이 필요하지 않으며 추가로 메모리 공간을 차지하지도 않는다. 성능에 대한 영향은 매우 크다.


[메모리 팁]

  • rm()을 자주 사용하여 사용되지 않는 변수들을 제거한다.
  • 가비지 콜렉터(Garbage Collector)인 gc()를 자주 호출한다.
  • 초기화 비용 때문에 대규모로 병렬화하는 것이 유리하다고 해도 In-Memory 상황에서는 소규모로 병렬화하는 것이 바람직하다.
  • 이따금 코드를 병렬로 실행하고 결과를 캐싱하되 한계에 이르면 직렬로 전환한다.
  • 메모리가 충분치 않은 상황이라면 모든 코어를 사용하는 대신 코어 수를 수동으로 제한한다. 다음과 같은 식으로 코어 수를 계산한다.
    memory.limit()/memory.size() = max cores


이외에도 다른 팁이 있다:

[기타 팁]

  • 동일 반복 결과를 얻으려고 할 때 set.seed() 대신 clusterSetRNGStream()을 사용한다.
  • NVIDIA GPU 카드를 가지고 있다면 gputools 패키지를 활용하여 거대규모의 병렬 계산을 할 수 있다. 하지만 개발환경을 갖추는 것이 매우 까다롭다.
  • Cluster 설정 시 다양한 옵션에 대하여 알고 싶다면 여기를 참고한다.


7. 이미지 프로세싱 예제

이미지 프로세싱은 병렬처리 예제 중의 가장 좋은 예 중 하나라고 생각된다.

우선 이미지 픽셀의 개수(Dimension)가 매우 많으며 이미지 픽셀을 독립적으로 처리하는 경우가 많기 때문이다.

예제는 아래 이미지에서 특정 색 범위를 추출하는 것이며, 중간의 보라색 등고선을 추출하도록 하겠다.

참고로 이미지의 해상도는 740×739이며 각 픽셀은 (R, G, B)로 이루어져 있다.



보라색 등고선의 컬러는 대략 R = 155, G = 42, B = 202인데, [0, 1] 범위로 변환하면 (0.6078431, 0.1647059, 0.7921569)이다. 각 성분의 값은 각각 255(8 bit-depth)로 나눈 값이다.

이미지 처리를 위해 사용한 패키지는 imager이며, Main Idea는 다음과 같다:


(1) 각 코어는 이미지의 각 이미지 행(Row)의 픽셀에 대하여 처리하도록 한다.

(2) 각 이미지 행의 픽셀에 대하여 모든 열(Column)에 대하여 Reference 컬러(위의 이미지의 보라색 등고선)와 비교하되 R, G, B에 대한 각 차이의 절대값의 합이 0.5를 넘지 않는 경우 Reference 컬러(0.6078431, 0.1647059, 0.7921569)를 저장하고, 그렇지 않은 경우 검정색(0.0, 0.0, 0.0)을 저장시킨다.

(3) 각 코어에서 계산이 완료되면 이를 Matrix 형태로 저장하여 반환하고, 이 값을 순차적으로 R, G, B 값으로 나눈 후 이미지를 디스플레이한다.


이에 대한 이해를 돕기 위하여 아래 그림을 첨부한다.



결과는 out이라는 변수에 저장되며, Matrix는 2220 × 739의 크기를 갖는데, 아래 그림과 같이 R, G, B 컬러 성분별로 740 × 739의 크기를 갖는다. 각 컬러 성분의 Sub-matrix라고 생각하면 된다.

이미지를 디스플레이하기 위하여 img를 다른 변수인 resultImg에 복사하고 각 컬러 성분에 out의 각 컬러 성분(Sub-matrix)을 순차적으로 저장하면 된다.




전체 코드는 다음과 같다:


전체 R 소스 코드:

base::gc()
base::rm(list = ls())

# load packages
if(!require(parallel)) {
  install.packages("parallel")
}
library(parallel)

if(!require(foreach)) {
  install.packages("foreach")
}
library(foreach)

if(!require(doParallel)) {
  install.packages("doParallel")
}
library(doParallel)

if(!require(imager)) {
  install.packages("imager")
}
library(imager)

# 코어 개수 획득
numCores <- parallel::detectCores() - 1

# 클러스터 초기화
myCluster <- parallel::makeCluster(numCores)
doParallel::registerDoParallel(myCluster)

# 변수 등록
img <- imager::load.image("./test.png")
parallel::clusterExport(myCluster, "img")

# CPU 병렬처리 정의
cacheParallel <- function() {
  ref <- c(0.6078431, 0.1647059, 0.7921569)
  vars <- 1:dim(img)[2]

  parallel::parSapply(myCluster,
                      vars,
                      FUN = function(x){
                        imgCol <- img[ , x, , ]
                        
                        for(i in 1:dim(img)[1]) {
                          tmp <- imgCol[i,]
                          
                          if(sum(abs(tmp - ref)) < 0.5) {
                            imgCol[i, ] <- ref
                          } else {
                            imgCol[i, ] <- 0.0
                          }
                        }
                        
                        return(imgCol)
                        
                      })
}

# cacheParallel 함수 실행
elapse <- base::system.time(out <- cacheParallel())
print(elapse)

# 클러스터 중지
parallel::stopCluster(myCluster)

# 컬러 성분 분리 저장
resultImg <- img
offset <- dim(img)[1]
initRow <- 1
for(i in 1:dim(img)[4]) {
  resultImg[,,,i] <- out[initRow:(initRow + offset - 1), ]
  initRow <- initRow + offset
}

# 이미지 디스플레이
imager::display(resultImg)


위의 코드를 병렬로 계산하지 않을 경우 어마어마한 시간이 걸렸다(대략 10시간???? 위의 코드를 Sequential 코드로 계산해 보면 병렬처리의 위력을 실감할 수 있을 것이다).

코드의 실행 결과는 다음과 같다. 계산시간은 0.57초 걸렸다.




이로써 길고 길었던 R에서 CPU 기반으로 병렬처리하는 방법에 대하여 알아보았다.

CPU 기반 병렬처리를 이용하여 Machine Leaning에 적용한 글을 소개하고 이번 글을 마무리한다.

http://jakemdrew.com/blog/watson.htm



Comments