03-29 08:50
Notice
Recent Posts
Recent Comments
관리 메뉴

Scientific Computing & Data Science

[Data Science / Posts] 사용자 관점에서의 R 병렬 컴퓨팅 본문

Data Science/Posts

[Data Science / Posts] 사용자 관점에서의 R 병렬 컴퓨팅

cinema4dr12 2017. 1. 15. 22:10

이 글은 본래 Capital of Statistic에 중국어로 간행된 것이며  많은 훌륭한 조언을 해준 He Tong에게 감사를 전한다.

이 글에 수록된 모든 코드는 GitHub[]에 있다.

데이터 과학자들은 RSASSPSSMATLAB 등과 같은 통계 소프트웨어에 이미 익숙해있다; 그러나, 일들 중 일부는 병렬 컴퓨팅에 상대적으로 미숙하다. 그래서 이 글에서 R에서 병렬 컴퓨팅 사용에 대한 기본개념을 소개하고자 한다.


병렬 컴퓨팅이란 무엇인가?

렬 컴퓨팅은 분명히 고성능 컴퓨터와 병렬 소프트웨어를 포함해야 한다. 고성능 컴퓨터의 피크(Peak) 성능은 급증하고 있다. 최근의 세계 500대 수퍼컴퓨터 랭킹에서 중국의 Sunway TaihuLight은 93 PFLOPS로 최고에 올라있다(링크)대부분의 개인 및 중소기업에게 고성능 컴퓨터는 너무 비싸다. 그래서 주로 국방, 군사, 항공우주와 연구 분야에 한정되었으나, 최근 몇년간 멀티코어 CPU, 값싼 클러스터, 다양한 가속기 (NVIDIA GPUIntel Xeon PhiFPGA)의 발전에 힘입어 개인용 컴퓨터는 고성능 컴퓨터에 견줄만한 수준이 되었다.



반면, 소프트웨어의 변화는 하드웨어의 변화를 따라가지 못한다. 여러분은 병렬 연산을 지원하는 소프트웨어로 무엇을 사용하고 있는가: 크롬, 비주어 스튜디오, 아니면 R?



소프트웨어 병렬화는 더많은 연구와 개발 지원을 필요로한다. 이를 직렬 코드에서 병렬 코드로 변경하는 과정에 대한 코드 모더나이제이션(Code Modernization)이라 하는데, 매우 흥미로운 작업처럼 들린다. 그러나, 실제로는 엄청난 버그 수정, 데이터 구조 재작성, 불확실한 소프트웨어 거동, 크로스-플랫폼 이슈로 인해 막대한 소프트웨 개발과 유지비용이 증가한다.


왜 R은 병렬 컴퓨팅을 필요로 하는가?

R로 돌아가자. 가장 인기있는 통계 소프트웨어 중 하나인 R은 풍부한 통계 모델, 데이터 처리 도구, 강력한 시각화 능력을 제공하는 많은 장점을 가지고 있다. 그러나, 데이터의 양이 증가함에 따라 R의 메모리 사용과 연산 모드는 R의 스케일에 제한을 가한다. 메모리 관점에서 R은 인-메모리(In-memory) 연산 모드를 사용한다. 모든 데이터는 메인 메모리(RAM)에서 처리되어야 한다. 분명, R의 장점은 높은 연산 효율과 속도이지만 R이 처리할 수 있는 문제의 크기(<RAM)는 매우 제한되어 있다. 두번째, R 코어는 싱글-스레드(Single-thread) 프로그램이다. 따라서, 모던 멀티-코어 프로세서에서 R은 모든 컴퓨팅 코어를 효율적으로 사용할 수 없다. 만약 R이 260개의 컴퓨팅 코어를 갖는 Sunway CPU에서 작동한다면, 싱글-스레드 R은 단지 컴퓨팅 파워의 1/260만을 활용하여 다른 259/260의 컴퓨팅 코어를 낭비하고 있는 셈이다.


해결책? 병렬 컴퓨팅!

병렬 컴퓨팅 기술은 싱글-코어 및 메모리 용량이 어플리케이션 요구를 만족하지 못하는 문제를 해결할 수 있다. 따라서, 병렬 컴퓨팅 기술은 R의 사용을 엄청나게 확장시킬 것이다. R 2.14 (2012년 2월)부터 parallel‘ 패키지는 기본으로 설치되어 있다. 분명 R의 핵심 개발팀은 병렬화에 대한 중요성을 인식하고 있다.


병렬 컴퓨팅을 사용하는 방법?

사용자의 시각에서 R의 병렬 컴퓨팅을 암시적 컴퓨팅 모드와 명시적 컴퓨팅 모드로 나눌 수 있다.


암시적 모드(Implicit Mode)

암시적 컴퓨팅은 사용자에게 대부분의 상세한 설정을 숨긴다. 하드웨어 리소스를 어떻게 할당하는지, 작업량을 어떻게 분산하는지, 또 결과는 어떻게 수집되는지에 대하여 알 필요가 없다. 켬퓨팅은 현재 하드웨어 리소스에 따라 자동으로 시작된다. 분명 이 모드는 가장 선호하는 모드이다. 우리는 컴퓨팅 모드와 코드를 변경할 필요없이 보다 고성능의 컴퓨팅을 할 수 있다. 공통적인 암시적 병렬 모드는 다음 내용을 포함한다:


  • 병렬 라이브러리 사용

Intel MKLNVIDIA cuBLAS,  OpenBLAS 등과 같은 병렬 라이브러리들은 보통 하드웨어 제조사가 해당 하드웨어를 기반으로 철저한 최적화를 하여 제공되므로, 이들 라이브러리의 성능은 R의 라이브러리 보다 훨씬 우수하다. 컴파일 시 고성능 R 라이브러리를 선택하거나 런타임에서 LD_PRELOAD를 로딩하는 것을 권장한다. BLAS 라이브러리에 대한 컴파일, 로딩, 사용법에 대한 자세한 내용을 확인하려면 여기를 클릭한다. 행렬 연산 실험을 수행한 첫번째 다이어그램은, 1개 또는 2개의 CPU에서의 병렬 라이브러리가 R의 기본 라이브러리보다 100배 가량 우수함을 보여준다. 두번째 다이어그램에서 GPU 수학 라이브러리가 몇몇 벤치마킹 분석 알고리즘에 대하여 주목할 만한 속도를 보여준다.



아래의 예제에서 deepnet 패키지에 포함된 DBN(Deep Belief Network), SRBM(Stacked Restricted Boltzmann Machine)에 의한 MNIST 데이터세트를 학습시킨다. 이 예제는 training MNIST data with the package deepnet”를 참고하며 필자는 학습 데이터에 대해 0.004%의 오차율과 테스트 데이터에 대해 2%의 오차율을 얻었다. 원래의 네트워크 c(500,500,250,125)는 실행하기에 너무 크기 때문에, 네트워크 아키텍쳐와 deepnet_mnist.R 코드를 여기를 통해 단순화하였다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#install.packages("data.table")
#install.packages("deepnet")
 
library(data.table)
library(deepnet)
 
# download MNIST dataset in below links
mnist.train <- as.matrix(fread("./train.csv", header=F))
mnist.test  <- as.matrix(fread("./test.csv", header=F))
 
# V785 is the label
x <- mnist.train[, 1:784]/255
y <- model.matrix(~as.factor(mnist.train[, 785])-1)
 
system.time(
    nn <- dbn.dnn.train(x,y,
                        hidden=c(64),
                        #hidden=c(500,500,250,125),
                        output="softmax",
                        batchsize=128,
                        numepochs=100,
                        learningrate = 0.1)
)


이 코드를 두 번 실행하였다. Intel SandyBridge E-2670에서 Intel MKL과 OpenBLAS 라이브러리로 3.7X 및 2.5X 속도향상을 얻었다 (2581초 vs 693초 1213).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
> R CMD BATCH deepnet_mnist.R
> cat deepnet_mnist.Rout
deep nn has been trained.
     user system   elapsed
 2574.013  1.404  2581.882
 
> env LD_PRELOAD=/.../tools/OpenBLAS/lib/libopenblas.so R CMD BATCH deepnet_mnist.R
> cat deepnet_mnist.Rout
deep nn has been trained.
     user    system  elapsed
 4752.005 25881.221 1213.644
 
# Compiled with Intel Compiler and MKL
> R CMD BATCH deepnet_mnist.R
> cat deepnet_mnist.Rout
deep nn has been trained.
      user  system elapsed
 10770.641 290.486 693.146


  • 멀티스레드 함수 사용하기

OpenMP는어플리케이션 속도 향상을 위하여 공유 메모리 아키텍쳐를 기반으로 하는 멀티스레드 라이브러리이다. 최신 버전의 R은 Linux에서 컴파일 시 OpenMP 옵션 (-fopenmp)을 오픈하였으며, 이는 계산의 일부가 멀티스레드 모드로 실행될 수 있음을 의미한다. 예를 들어, dist는 OpenMP를 이용하여 멀티스레드로 실행된다. 아래 코드는 이에 대한 예제 코드이다(ImplicitParallel_MT.R):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# Comparison of single thread and multiple threads run
# using Internal function to set thread numbers, not very grace, but don't find a good way till now.
# Ang suggestion?
setNumThreads <- function(nums=1) {
 .Internal(setMaxNumMathThreads(nums))
 .Internal(setNumMathThreads(nums))
}
 
# dataset from 2^6 to 2^11
for(i in 6:11) {
 ORDER <- 2^i
 m <- matrix(rnorm(ORDER*ORDER),ORDER,ORDER)
 setNumThreads(1)
 res <- system.time(d <- dist(m))
 print(res)
 setNumThreads(20)
 res <- system.time(d <- dist(m))
 print(res)
}



  • 병렬 패키지 사용하기

R High-Performance and Parallel Computing with R의 리스트에 많은 병렬 패키지와 도구가 있다. 이들 병렬 패키지들은 다른 R 패키지처럼 빠르고 편하게 사용할 수 있다. R 유저들은 항상 문제 자체에 집중하고, 병렬 실행 및 성능 이슈에 많이 고민할 필요가 없다.

예를 들어 H2O.ai를 생각해 보면, 이는 멀티-스레드와 멀티-노드 컴퓨팅을 수행하도록 백엔드(Backend)로 Java를 선택한다. 유저들은 패키지를 불러와서 스레드 개수로 H2O를 초기화하기만 하면 된다. 이후 GBM, GLM, DeepLearning 알고리즘과 같은 후속 연산은 자동으로 멀티-스레드와 멀티-코어로 할당된다. 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
library(h2o)
h2o.init(nthreads = 4)
# Connection successful!
# R is connected to the H2O cluster:
# H2O cluster uptime: 1 hours 53 minutes
# H2O cluster version: 3.8.3.3
# H2O cluster name: H2O_started_from_R_patricz_ywj416
# H2O cluster total nodes: 1
# H2O cluster total memory: 1.55 GB
# H2O cluster total cores: 4
# H2O cluster allowed cores: 4
# H2O cluster healthy: TRUE
# H2O Connection ip: localhost
# H2O Connection port: 54321
# H2O Connection proxy: NA
# R Version: R version 3.3.0 (2016-05-03)

 

명시적 모드(Explicit Mode)

명시적 병렬 컴퓨팅은 데이터 파티션, 작업 분배, 최종 결과 수집을 포함하는 상세 설정을 유저가 설정할 수 있도록 한다. 유저는 각자의 알고리즘을 이해해야 할 뿐만 아니라 하드웨어와 소프트웨어 스택을 확실히 이해해야 한다. 그래서 이 모드는 유저에게 다소 어렵다.

다행히, parallel,Rmpiforeach 등의 R의 병렬 컴퓨팅 프레임웍은 맵핑 구조(Mapping Structure)에 의한 간단한 병렬 프로그래밍 방법을 제공한다. R 유저들은 *apply 또는 for의 형태로 코드를 변환하기만 하고 이들을 mc*apply 또는 foreach 등과 같은 병렬 API로 교체하기만 하면 된다. 보다 복잡한 컴퓨팅 플로우(Flow)에 대하여 유저는 Map-and-reduce를 반복할 수 있다.



이제 *apply과 for 스타일의 2차 방정식을 푸는 병렬 예제를 보여주려 한다. 전체 코드는 ExplicitParallel.R에 있다. 우선, 방정식에 대한 벡터가 아닌 형식을 보여주고자 하는데, 이는 두번째 2차 계수가 0이거나, 두번째와 첫번쩨 2차항이 0이거나 제곱근의 수가 음수인 경우 등 다양한 경우에 대해 다룰 수 있다.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# Not vectorized function
# Quadratic Equation: a*x^2 + b*x + c = 0
solve.quad.eq <- function(a, b, c)
{
 # Not validate eqution: a and b are almost ZERO
 if(abs(a) < 1e-8 && abs(b) < 1e-8) return(c(NA, NA) )
  
 # Not quad equation
 if(abs(a) < 1e-8 && abs(b) > 1e-8) return(c(-c/b, NA))
  
 # No Solution
 if(b*b - 4*a*c < 0) return(c(NA,NA))
  
 # Return solutions
 x.delta <- sqrt(b*b - 4*a*c)
 x1 <- (-b + x.delta)/(2*a)
 x2 <- (-b - x.delta)/(2*a)
  
 return(c(x1, x2))
}


그리고나서, 랜덤으로 세 개의 큰 벡터를 생성하여 세 개의 변수에 저장하였다.

1
2
3
4
5
6
7
# Generate data
len <- 1e8
a <- runif(len, -10, 10)
a[sample(len, 100,replace=TRUE)] <- 0
 
b <- runif(len, -10, 10)
c <- runif(len, -10, 10)


*apply 실행:


우선 직렬 코드를 살펴보자. 데이터는 lapply에 의한 솔버 함수 solve.quad.eq에 맵핑되고 결과는 최종적으로 list에 저장된다.

1
2
3
4
# serial code
system.time(
res1.s <- lapply(1:len, FUN = function(x) { solve.quad.eq(a[x], b[x], c[x]) })
)


다음 parallel 패키지에서 mcLapply (멀티고어)의 함수를 사용하여 lapply의 계산을 병렬화한다. API 인터페이스로부터 mcLapply의 사용법은 코어 개수를 명시하는 것과 추가하여 lapply와 매우 유사하다. mcLapply는 Linux form 메커니즘에 기반하여 현재 R 세션의 다중 복사본을 생성하고 입력 인덱스를 고려하여 다중 프로세스로 균등하게 컴퓨팅 작업을 할당한다. 결국 (Master-Slave 관계에서의)마스터 R 세션은 모든 워커 세션으로부터 결과를 수집할 것이다. 만약 두 개의 워커 프로세스를 명시한다면, 하나의 프로세스는 1:(len/2)에 대하여 계산하는 반면 다른 하나는 (len/2+1):len에 대한 계산을 하고 결국 두 개의 결과가 res1.p로 합쳐진다. 그러나, Linux 메커니즘의 특성 상 이 버전은 Windows 플랫폼에서 실행될 수 없다.

1
2
3
4
5
6
7
8
# parallel, Linux and MAC platform
library(parallel)
# multicores on Linux
system.time(
  res1.p <- mclapply(1:len,
                     FUN = function(x) { solve.quad.eq(a[x], b[x], c[x]) },
                     mc.cores = 4)
)


Linux 외 다른 OS 유저들은 병렬화를 위해 parallel 패키지의 parLapply 함수를 사용할 수 있다. parLapply 함수는 Windows, Linux, Mac 등 다양한 플랫폼을 지원하지만 사용방법은 mclapply 보다 약간 더 복잡하다. parLapply 함수를 사용하기 전에 우선 컴퓨팅 그룹 (클러스터)을 생성할 필요가 있다. 컴퓨팅 그룹은 소프트웨어-수준 개념인데, 몇 개의 R 작업 프로세스들을 생성해야 하는가를 의미한다 (Note: par*apply 패키지는 mc*apply로부터 R 마스터 프로세스의 복사본 보다는 여러 개의 새로운 R 프로세스를 생성할 것이다). 이론상으로는, 컴퓨팅 그룹의 규모는 하드웨어 설정에 영향을 받지는 않는다. 예를 들어, 어떤 머신이든 1000개의 R 작업 프로세스를 갖는 그룹을 생성할 수 있다. 실무에서는 보통 하드웨어 리소스(물리적 코어 등)를 갖는 컴퓨팅 그룹과 동일한 규모를 사용하여 R의 각 작업 프로세스를 물리적 코어에 대응시킬 수 있다.

아래의 예에서, detectCores 함수를 실행하여 머신의 컴퓨팅 코어 수를 결정하고 있다. detectCores()가 실제 물리적 코어라기 보다 Hyper-Threading의 개수를 반환하는데 주목할 필요가 있다. 예를 들어, 나의 랩탑에 두 개의 물리적 코어가 있는데 각 코어는 두 개의 Hyper-Threading을 시뮬레이션 할 수 있으므로 detectCores()의 반환값은 4이다. 그러나, 대부분의 컴퓨팅 집약적인 작업에 있어 Hyper-Threading은 성능 개선에 큰 도움이 되지 않으므로, 파라미터 logical=FALSE를 이용하여 물리적 코어의 실제 개수를 얻어 동일한 수의 그룹을 생성한다. 그룹 내의 작업 프로세스들은 새로운 R 세션이기 때문에, 부모 프로세스의 데이터와 함수들은 보이지 않는다. 따라서, clusterExport 함수를 통해 모든 작업 프로세스에 데이터와 함수를 브로드캐스팅해야 한다. 결국 parLapply는 작업들을 모든 R 작업 프로세스에 균등하게 할당하고 결과를 수집한다.

1
2
3
4
5
6
7
8
# cluster on Windows
cores <- detectCores(logical = FALSE)
cl <- makeCluster(cores)
clusterExport(cl, c('solve.quad.eq', 'a', 'b', 'c'))
system.time(
   res1.p <- parLapply(cl, 1:len, function(x) { solve.quad.eq(a[x], b[x], c[x]) })
)
stopCluster(cl)


for 실행:


for의 컴퓨팅 방식은 *apply와 매우 유사하다. 다음의 직렬 실행에서 저장결과에 대한 행렬을 생성하고 내부 루프(Inner Loop)에서 하나씩 결과를 업데이트 하였다.

1
2
3
4
5
6
7
# for style: serial code
res2.s <- matrix(0, nrow=len, ncol = 2)
system.time(
    for(i in 1:len) {
        res2.s[i,] <- solve.quad.eq(a[i], b[i], c[i])
    }
)


루프 병렬화를 위해 foreach 패키지의 %dopar%를 사용하여 컴퓨팅 작업을 다중 R 작업자에게 분배할 수 있다. foreach 패키지는 데이터 맵핑 방법을 제공하지만, 컴퓨팅 그룹 구성을 포함하지는 않는다. 따라서 doParallel 또는 doMC 패키지를 통해 컴퓨팅 그룹을 생성할 필요가 있다. registerDoParallel룰 통해 컴퓨팅의 백엔드(Backend)를 설정하는 것을 제외하면 컴퓨팅 그룹을 생성하는 방법은 이전과 동일하다.

이제 데이터 분해에 대해 논의해 보자. 사실 각 R 작업자 프로세스가 연속적인 컴퓨팅 작업을 했으면 한다. 두 개의 R 작업자 프로세스가 있는데 프로세스 1은 1:len/2에 대하여 다른 프레세스는 (len/2+1):len에 대한 작업을 한다고 가정해 보자. 따라서, 아래의 예제 코드에서는 벡터를 컴퓨팅 그룹으로 균등하게 분배하여 각 프로세스는 chunk.size의 사이즈를 계산한다. 이외의 중요한 스킬은 로컬 행렬을 이용하여 각 프로세스에 결과의 일부분을 저장하는 것이다. 마지막으로 로컬 결과를 .combine='rbind' 파라미터를 통해 합친다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# foreach, work on Linux/Windows/Mac
library(foreach)
library(doParallel)
 
# Real physical cores in my computer
cores <- detectCores(logical = FALSE)
cl <- makeCluster(cores)
registerDoParallel(cl, cores=cores)
 
# clusterSplit are very convience to split data but it takes lots of extra memory
# chunks <- clusterSplit(cl, 1:len)
 
# split data by ourselves
chunk.size <- len/cores
 
system.time(
 res2.p <- foreach(i=1:cores, .combine='rbind') %dopar%
 {
    # local data for results
    res <- matrix(0, nrow=chunk.size, ncol=2)
    for(x in ((i-1)*chunk.size+1):(i*chunk.size)) {
        res[x - (i-1)*chunk.size,] <- solve.quad.eq(a[x], b[x], c[x])
    }
    # return local results
    res
 }
)
 
stopImplicitCluster()
stopCluster(cl)


결국 4개의 스레드를 갖는 Linux 플랫폼에서 코드를 테스트 하였는데 모든 병렬 실행에 대하여 3X 이상의 속도향상을 얻을 수 있었다!


도전과제와 전망

도전과제: 현실적으로 병렬 컴퓨팅으로 해결해야 하는 문제가 이 글에 소개된 예제들만큼 단순하지는 않다. R과 R의 생태계를 병렬화하는 것은 여전히 어려운 문제이다. 왜냐하면:

  • R은 다수의 개발자에 의해 개발되며 비상업적 소프트웨어이다

R은 동맹 조직이나 회사에 의해 개발되지 않는 반면 대부분의 R 패키지들은 유저들에 의해 개발되고 있다. 이는 동일한 철학으로 소프트웨어 아키텍쳐와 설계를 조정하거나 통일하는 것이 어렵다는 의미이다. 반면, MATLAB 등과 같이 통일된 개발, 유지보수, 관리가 되는 상용 소프트웨어는 상대적으로 구조를 다시 만들기가 쉽다. 따라서, 여러 번의 업데이트를 통해 상용 소프트웨어는 고도의 병렬화를 하게 될 것이다.

  • R의 인프라구조 설계는 여전히 싱글-스레드이다

R은 본래 싱글-스레스로 설계되었기 때문에 근본적인 데이터 구조와 함수들이 스레스-세이프하지 않다. 따라서, 고수준의 병렬 알고리즘을 위해 많은 코드들을 재작성하거나 수정해야 할 필요가 있다. 그러나, 이는 원래의 디자인 패턴을 손상시킬 가능성이 있다.

  • 패키지는 매우 의존적이다

R에서 패키지 B를 사용하는데 B는 패키지 A의 일부 함수에 의존한다고 가정해 보자. 만약 패키지 B가 멀티스레드 구조로 먼저 개선되면 패키지 A 또한 병렬화를 위한 개선이 되어야 한다. 따라서 패키지 B 사용 시 하이브리드 병렬 구조 형태를 띠게 될 것 같다. 이는 개발하는 동안 포괄적인 설계와 테스트를 거치지 않는다면 예상치 못한 많은 에러(버그들)와 성능 저하를 불러오게 될 것이다.


전망: R의 병렬화의 미래는 어떠할 것인가?

  • 상업 조직과 연구 기관에 의한 고성능 컴포넌트

기본적으로 소프트웨어 개발은 인력과 자금 투자로부터 분리될 수 없다. H2OMXNetIntel DAAL 등과 같은 패키지들은 장기간 지원에 의해 병렬화를 위한 성능 개선을 이룰 것이다.

  • 클라우드 플랫폼

클라우드 컴퓨팅의 출현으로 Data Analyst as a Services (DAAS)와 Machine Learning as a Service (MLAS)는 더더욱 인기를 얻을 것이다. 주요 클라우드 업체들은 하드웨어 디플로이(Deployment)로부터 데이터베이스, 고수준 알고리즘에까지 이르는 R을 포함하는 툴들을 최적화하고 어플리케이션 수준에서 더욱 병렬화를 시도할 것이다. 예를 들어, Microsoft는 최근 클라우드에 R에 대한 지원 시리즈를 런칭하였다 (여기)따라서 R의 병렬화는 더욱 분명하다. 유저가 실제로 이 병렬 작업을 하지 않더라도 실제 컴퓨팅에서는 클라우드로 분산 처리될 것이다.

Comments