12-18 14:29
Notice
Recent Posts
Recent Comments
관리 메뉴

Scientific Computing & Data Science

[MongoDB] Aggregation / Map Reduce 본문

Data Science/MongoDB

[MongoDB] Aggregation / Map Reduce

cinema4dr12 2014. 3. 2. 14:10

by Geol Choi | 

앞의 글에서 설명했던 집합(aggregation) 연산자인 count, distinct, group으로 할 수 있는 모든 것 뿐만 아니라 더 많은 일들을 맵리듀스를 통해 할 수 있습니다. 특히 다중의 서버를 통해 집합 연산자를 쉽게 병렬로 처리할 수 있습니다. 맵리듀스는 문제를 여러 개의 덩어리로 분할하고, 각 덩어리를 다양한 머쉰으로 전송하고, 각 머쉰이 문제의 각 부분을 해결하도록 합니다. 모든 머쉰에서 처리가 모두 마무리되면 솔루션 결과를 모두 모아서 전체적인 솔루션으로 합칩니다.

맵리듀스는 다음과 같은 절차로 처리됩니다.

(1) 첫번째 단계는 "맵(Map)"이며, 이는 연산을 컬렉션 내의 각 도큐먼트로 매핑하는 것입니다. 이 연산은 "아무 것도 하지 않는 것" 또는 " X값을 갖는 key들을 전송"하는 것입니다.

(2) 중간 단계는 "셔플(Shuffle)"이라 불리며, key들이 그룹화 되고 전송된 값의 리스트들이 각 key에 대해 생성되는 단계입니다. 리듀스는 이러한 값의 리스트를 취하고 이를 단일 엘리먼트로 줄입니다. 이 엘리먼트는 각 key가 단일 값을 포함하는 리스트를 가질 때까지 셔플 단계를 반복합니다.

다음 그림을 통해 맵리듀의 대략적인 개요를 이해하도록 하겠습니다.



"orders" 컬렉션은 "cust_id", "amount", "status" 3개의 key를 가지고 있으며, 그림과 같이 4개의 도큐먼트를 가지고 있다고 할 때, 처리 플로우를 살펴보겠습니다.

우선 query: {status: "A"}에 의해 "status"의 key 값이 "A"인 도큐먼트를 찾습니다.(query)

그런 다음, Map Function인 function() {emit( this.cust_id, this.amount); } 에 의해 동일한 "cust_id" 값을 갖는 도큐먼트끼리 그룹화 하되 이들이 가지고 있는 "amount" key 값과 연관짓습니다.

Reduce Function을 살펴보면 2개의 파라미터를 취하는데 첫번째 파라미터인 "key"는 "cust_id"이며, 두번째 파라미터인 "values"는 "amount"입니다. Return 값으로 values(amount)의 합을 Array 형으로 계산합니다. 최종 결과값은 "order_totals"이며, 각 "cust_id"로 그룹화 된 도큐먼트의 "amount"를 합한 값입니다.

맵리듀스를 사용에 있어 손해보는 부분은 속도이다: "group"도 그리 빠르지는 않지만, 맵리듀스는 이보다 느리며 실시간 처리로 사용하기에는 무리가 있습니다. 백그라운드로 맵리듀스를 실행하고, 결과 컬렉션을 생성한 후 실시간으로 이 컬렉션에 대한 쿼리를 실행할 수 있습니다.

맵리듀스 명령은 다음과 같은 형태를 갖습니다:

db.runCommand(
   {
     mapReduce: <collection>,
     map: <function>,
     reduce: <function>,
     out: <output>,
     query: <document>,
     sort: <document>,
     limit: <number>,
     finalize: <function>,
     scope: <document>,
     jsMode: <boolean>,
     verbose: <boolean>
   }
 )

주요 필드값에 대해 요약하면 다음과 같습니다:

 필드

 형태

 설명

 mapReduce

 collection

 맵리듀스를 실행할 컬렉션 이름.

 map

 JavaScript
 Function

 value와 key를 맵핑하고 key와 values 쌍을 전송(emit)하는 JavaScript 함수.

 reduce

 JavaScript
 Function

 하나의 오브젝트를 특정 key와 연관된 values로 "줄이는(reduce)"는 JavaScript 함수.

 out

 string

 출력 컬렉션의 이름. 

 query

 document

 도큐먼트를 map 함수로 전송하기 전 도큐먼트를 선택하는 기준.

 sort

 document

 도큐먼트를 map 함수로 전송하기 전 도큐먼트를 분류. (limit 옵션과 함께 사용하면 유용함.)

 limit

 number

 map 함수로 전송할 도큐먼트의 최대 개수.

 finalize

 JavaScript
 Function

 reduce의 결과로 보내는 최종 단계.
 scope document JavaScript 코드에서 사용되는 변수.
 jsMode

 Boolean

 map 함수와 reduce 함수 실행 사이에 중간 데이터를 BSON 포맷을 변환할 지 여부 결정.

false인 경우:

  • map 함수가 전송한 JavaScript 오브젝트를 BSON 포맷으로 변환함.
  • 맵리듀스 연산은 중간 BSON 오브젝트를 임시 저장소에 저장.

true인 경우:

  • map 함수가 전송한 JavaScript 오브젝트는 원래의 형태를 유지.

 verbose

 Boolean

 서버 로그에서 더 많은 verbose 결과를 사용할지 여부 결정.


다음 예제를 통해 맵리듀스에 대해 더 자세하게 이해하도록 합니다 (예제는 MongoDB의 공식사이트의 맵리듀스 도큐먼트를 최대한 참고하여 약간의 재구성을 하였습니다).

우선 "orders" 라는 이름으로 다음과 같은 형태의 컬렉션을 준비합니다:

{
     _id: ObjectId("50a8240b927d5d8b5891743c"),
     cust_id: "abc123",
     ord_date: new Date("Oct 04, 2012"),
     status: 'A',
     price: 25,
     items: [ { sku: "mmm", qty: 5, price: 2.5 },
              { sku: "nnn", qty: 5, price: 2.5 } ]
}

그러나, 데이터를 위와 같은 형태로 하나씩 만드는 것은 꽤나 번거로운 일이므로 독자의 편의를 위해 다음의 내용을 MongoDB 쉘에 Copy + Paste합니다:

> db.orders.insert({cust_id: "abc123", ord_date: new Date("Oct 04, 2012"), status: 'A', price: 25, items: [ { sku: "mmm", qty: 5, price: 2.5 }, { sku: "nnn", qty: 4, price: 2.5 } ]})
> db.orders.insert({cust_id: "abc123", ord_date: new Date("Nov 03, 2011"), status: 'B', price: 40, items: [ { sku: "mmm", qty: 7, price: 4.5 }, { sku: "nnn", qty: 3, price: 3.5 } ]})
> db.orders.insert({cust_id: "def123", ord_date: new Date("Feb 03, 2014"), status: 'C', price: 15, items: [ { sku: "mmm", qty: 5, price: 4.0 }, { sku: "nnn", qty: 8, price: 3.0 } ]})
> db.orders.insert({cust_id: "def123", ord_date: new Date("Mar 03, 2014"), status: 'D', price: 17, items: [ { sku: "mmm", qty: 6, price: 2.0 }, { sku: "nnn", qty: 4, price: 1.7 } ]})


> db.orders.find().pretty()
{
  "_id" : ObjectId("531213442c4a45d0bbc868f4"),
  "cust_id" : "abc123",
  "ord_date" : ISODate("2012-10-03T15:00:00Z"),
  "status" : "A",
  "price" : 25,
  "items" : [
    {
      "sku" : "mmm",
      "qty" : 5,
      "price" : 2.5
    },
    {
      "sku" : "nnn",
      "qty" : 4,
      "price" : 2.5
    }
  ]
}
{
  "_id" : ObjectId("531213452c4a45d0bbc868f5"),
  "cust_id" : "abc123",
  "ord_date" : ISODate("2011-11-02T15:00:00Z"),
  "status" : "B",
  "price" : 40,
  "items" : [
    {
      "sku" : "mmm",
      "qty" : 7,
      "price" : 4.5
    },
    {
      "sku" : "nnn",
      "qty" : 3,
      "price" : 3.5
    }
  ]
}
{
  "_id" : ObjectId("531213462c4a45d0bbc868f6"),
  "cust_id" : "def123",
  "ord_date" : ISODate("2014-02-02T15:00:00Z"),
  "status" : "C",
  "price" : 15,
  "items" : [
    {
      "sku" : "mmm",
      "qty" : 5,
      "price" : 4
    },
    {
      "sku" : "nnn",
      "qty" : 8,
      "price" : 3
    }
  ]
}
{
  "_id" : ObjectId("531213472c4a45d0bbc868f7"),
  "cust_id" : "def123",
  "ord_date" : ISODate("2014-03-02T15:00:00Z"),
  "status" : "D",
  "price" : 17,
  "items" : [
    {
      "sku" : "mmm",
      "qty" : 6,
      "price" : 2
    },
    {
      "sku" : "nnn",
      "qty" : 4,
      "price" : 1.7
    }
  ]
}


[예제 1 : 고객 ID별 전체 구매가 계산]

상기 준비한 4개의 도큐먼트들에는 고객의 이름 대신 고객의 아이디(cust_id)로 고객 관리를 한다고 가정하였습니다 (고객의 이름이 동일할 수도 있으니 ID로 관리하는 것은 당연합니다). 첫번째 맵리듀스 예제는 각 고객이 주문한 제품의 총 구매가를 계산하는 것입니다. 준비된 도큐먼트들은 총 두명의 고객("abc123", "def123")이 각각 두 개의 제품을 주문하였습니다. 각 고객별로 주문한 제품의 합계를 계산해 보도록 하겠습니다.


1. 각 입력된 도큐먼트를 처리할 "map" 함수를 정의합니다:

  • 함수에서 "this"는 맵리듀스 함수가 처리하고 있는 도큐먼트를 참조합니다.
  • 함수는 각각의 도큐먼트에 대해 "price"를 "cust_id"로 맵핑한 후, "cust_id"와 "price" 짝을 방출("emit")합니다.
var mapFunction1 = function() {
  emit(this.cust_id, this.price);
};


2. "keyCustID"와 "valuesPrices", 두 개의 인자를 취하는 "reduce" 함수를 정의합니다:

  • "valuesPrices"는 "price" 값으로 구성된 배열(array)이며, "map" 함수가 방출하고 이를 "keyCustId"로 그룹화 한 값입니다.
  • 함수는 " valuesPrices" 배열의 요소 합을 계산합니다.
var reduceFunction1 = function(keyCustId, valuesPrices) {
  return Array.sum(valuesPrices);
};


3. 1번과 2번 과정에서 정의된 "map" 함수(mapFunction1)과 "reduce" 함수(reduceFunction1)을 이용하여 "orders" 컬렉션 내의 모든 도큐먼트들에 대해 맵-리듀스를 실행합니다:

db.orders.mapReduce(
    mapFunction1,
    reduceFunction1,
    { out: "map_reduce_example" }
)

실행 결과는 out인 "map_reduce_example" 컬렉션으로 저장됩니다.

3번까지 실행한 후 "getCollectionNames()" 메써드를 실행하여 현재 DB의 컬렉션 리스트를 출력하면 "map_reduce_example" 컬렉션이 생성되었음을 확인할 수 있을 것입니다:

> db.getCollectionNames()
[ "map_reduce_example", "orders", "system.indexes", "system.users" ]

이제 "map_reduce_example" 컬렉션 내의 도큐먼트 내용을 살펴보겠습니다:

> db.map_reduce_example.find().pretty()
{ "_id" : "abc123", "value" : 65 }
{ "_id" : "def123", "value" : 32 }

예상대로 "cust_id: abc123"의 주문 아이템의 "price" 합이 65, "cust_id: def123"에 대해서는 32의 계산 결과를 확인할 수 있습니다.




[예제 2 : 각 아이템에 대한 평균, 주문 및 전체 수량 계산하기]

두번째 예제에서는 "orders" 컬렉션의 주문일자(ord_date)가 01/01/2012 이후인 모든 도큐먼트에 대해 맵리듀스를 실행하도록 하겠습니다. "item.sku" 필드로 그룹화하고, 각 "sku"에 대해 주문횟수와 총 주문수량을 계산합니다. 각 "sku" 값에 대해 주문 당 평균 수량을 계산하는 것으로 마무리합니다.


1. 각 입력된 도큐먼트를 처리할 "map" 함수를 정의합니다:

  • 함수에서 "this"는 map-reduce 함수가 처리하고 있는 도큐먼트를 참조합니다.
  • 각 아이템에 대해, 함수는 "sku"와 새로운 오브젝트인 "value"와 연동한다. "value"는 "count" 값 1과  주문에 대한 아이템 수량인 "qty"를 가지며, "sku"와 "value" 쌍을 방출합니다. 
var mapFunction2 = function() {
  for (var idx = 0; idx < this.items.length; idx++) {
    var key = this.items[idx].sku;
    var value = {
      count: 1,
      qty: this.items[idx].qty
    };

    emit(key, value);
  }
};

 


2. "keySKU"와 "countObjVals", 두 개의 인자를 취하는 "reduce" 함수 정의합니다:

  • "countObjVals"는 배열(array) 값이며, 이 배열의 요소들은 "map" 함수에서 "reduce" 함수로 전달되는 "keySKU" 값으로 그룹화 되는 오브젝트들입니다.
  • 함수는 "countObjVals" 배열을 "count"와 "qty" 필드를 포함하는 "reducedValue"라는 단일 오브젝트로 줄입니다.
  • "reducedVal"에서 "count" 필드는 각각의 배열 요소로부터 "count" 필드들의 합(결국 전체 주문횟수를 의미)을 포함하며, "qty" 필드는 역시 각각의 배열 요소로부터 "qty" 필드들의 합(결국 전체 주문수량을 의미)을 포함합니다.

var reduceFunction2 = function(keySKU, countObjVals) {
  var reducedVal = { count: 0, qty: 0 };

  for (var idx = 0; idx < countObjVals.length; idx++) {
    reducedVal.count += countObjVals[idx].count;
    reducedVal.qty += countObjVals[idx].qty;
  }

  return reducedVal;
};


3. "key"와 "reduceVal"의 두 개의 인자를 취하는 "finalize" 함수를 정의한다. 이 함수는 "reduceVal" 오브젝트를 수정하여 "avg"라는 계산된 필드를 추가하고 이렇게 수정된 오브젝트를 반환합니다:

var finalizeFunction2 = function (key, reducedVal) {
  reducedVal.avg = reducedVal.qty/reducedVal.count;

  return reducedVal;
};


4. "mapFunction2", "reduceFunction2", "finalizeFunction2" 함수를 이용하여 "order" 컬렉션에 대해 map-reduce를 연산을 수행합니다:

db.orders.mapReduce(
  mapFunction2,
  reduceFunction2,
  {
    out: { merge: "map_reduce_example" },
    query: { ord_date: { $gt: new Date('01/01/2012') }},
    finalize: finalizeFunction2
  }
)


예제 1과 마찬가지로 결과로 저장할 컬렉션(out)은 "map_reduce_example"로 동일한데, 예제 1을 수행한 후 "map_reduce_example"이 이미 존재하므로 이것에 예제 2결과를 추가하도록 "merge" 옵션을 사용하였습니다. "out" 옵션에는 "replace", "merge", "reduce" 등 세 가지 옵션이 있습니다. "place"는 새로운 내용으로 대체, "merge"는 새로운 내용 추가, "reduce"는 "merge"와 기능이 전반적으로 유사하지만 기존의 도큐먼트와 새 도큐먼트 모두에 "reduce" 함수를 적용하는 기능이 추가적으로 존재합니다.

그 외 "out" 옵션을 잘 활용하면, 현재의 DB가 아닌 다른 이름의 DB로도 결과를 저장할 수 있으며, "sharding"인 경우에도 맵리듀스 결과를 보낼 수 있습니다.

"query"에는 주문일자(ord_date)가 2012년1월1일(01/01/2012, 날짜 정의 순서는 월/일/년, 즉 MM/DD/YYYY) 이후의 아이템에 대해서만 처리하는 쿼리를 요청하였습니다.


이렇게 1- 4번을 실행한 결과는 다음과 같습니다.

우선 "getCollectionNames()" 명령으로 현재 DB의 컬렉션 리스트를 살펴 보겠습니다:

> db.getCollectionNames()
[ "map_reduce_example", "orders", "system.indexes", "system.users" ]

반드시 "map_reduce_example"이 존재해야 합니다. 그 다음, "map_reduce_example" 컬렉션의 내용을 살펴보겠습니다:

> db.map_reduce_example.find().pretty()
{ "_id" : "abc123", "value" : 65 }
{ "_id" : "def123", "value" : 32 }
{
  "_id" : "mmm",
  "value" : {
    "count" : 3,
    "qty" : 16,
    "avg" : 5.333333333333333
  }
}
{
  "_id" : "nnn",
  "value" : {
    "count" : 3,
    "qty" : 16,
    "avg" : 5.333333333333333
  }
}

위와 같이 예제 1 결과 다음에 예제 2 결과 내용이 추가되었음을 확인할 수 있습니다. 앞서 준비한 도큐먼트 아이템에서 2012년1월1일 이후의 아이템에 대해 "items > sku: mmm"과 "items > sku: nnn"에 대해 주문횟수(count), 주문수량(qty), 주문수량 평균(avg)를 직접 계산하여 결과와 비교해 보길 바라겠습니다.

'Data Science > MongoDB' 카테고리의 다른 글

[MongoDB] Capped Collections  (0) 2014.03.06
[MongoDB] Advanced Topics / DB Commands 정리  (0) 2014.03.04
[MongoDB] Aggregation / The Basic  (0) 2014.02.16
[MongoDB] Query / $snapshot  (0) 2014.02.09
[MongoDB] Query / Other Query Operations  (0) 2014.02.08
Comments